Subjects & Cache
This page covers LazyFlowSubject, LazyCache, and FlowSubject (the
building blocks for on-demand data loading) along with the metadata system
that threads cross-cutting information through containers.
Table of Contents
LazyFlowSubject
LazyFlowSubject<T> converts a suspend loader function into a
Flow<Container<T>>. The loader runs lazily and its result is automatically
cached for subsequent subscribers.
Basic Usage
class ProductRepository(
private val localDataSource: ProductsLocalDataSource,
private val remoteDataSource: ProductsRemoteDataSource,
) {
private val productsSubject = LazyFlowSubject.create {
// Step 1: emit local (cached) data immediately if available:
val local = localDataSource.getProducts()
if (local != null) emit(local)
// Step 2: fetch from remote and update the local cache:
val remote = remoteDataSource.getProducts()
localDataSource.save(remote)
emit(remote)
}
// ListContainerFlow<T> is an alias for Flow<Container<List<T>>>
fun listenProducts(): ListContainerFlow<Product> = productsSubject.listen()
fun reload() = productsSubject.reloadAsync()
}
The lambda passed to LazyFlowSubject.create is the loader function. Inside
it, you have access to the Emitter<T> receiver, which provides:
emit(value): emit a loaded valueloadTrigger: LoadTrigger: why the loader was called (see Load Triggers)metadata: ContainerMetadata: metadata from the triggering calldependsOnFlow { }/dependsOnContainerFlow { }: subscribe to external flows (see Flow Dependencies)
Listening and Caching Behaviour
- The loader starts running the first time
listen()is called and a subscriber begins collecting - The latest loaded value is cached; new subscribers receive it immediately without re-running the loader
- When the last subscriber stops collecting, a timer starts (default: 1 second). If no new subscriber appears before the timer expires, the cached value is cleared and the loader will run again on the next subscription
- You can customise the timeout:
LazyFlowSubject.create(
cacheTimeoutMillis = 60_000L, // 60 seconds
) {
emit(loadData())
}
Reloading Data
Re-run the previous loader without changing it:
// fire-and-forget:
subject.reloadAsync()
// or observe the reload result:
subject.reload().collect { value -> ... }
You can reload silently (the existing cached value is kept visible while the new load runs in the background):
subject.reloadAsync(silently = true)
Pushing Values Directly
Place a value into the subject without running the loader:
subject.updateWith(successContainer("immediate value"))
// update the existing container:
subject.updateWith { oldContainer ->
oldContainer.map { it + " (updated)" }
}
// shorthand that only runs if the current container is Success:
subject.updateIfSuccess { oldValue ->
oldValue.copy(isFavorite = true)
}
After updateWith, calling reload() re-runs the previous loader function.
Replacing the Loader
You can replace the loader function at any time:
// assign a new multi-value loader (returns a Flow<T> of the new results):
val flow: Flow<T> = subject.newLoad {
emit(step1())
emit(step2())
}
// fire-and-forget variant:
subject.newAsyncLoad { emit(loadData()) }
// single-value loader (suspends until the value is loaded):
val result: T = subject.newSimpleLoad { fetchData() }
// single-value loader (fire-and-forget):
subject.newSimpleAsyncLoad { fetchData() }
The newLoad / newSimpleLoad functions also accept a silently flag and
an optional metadata argument:
silently = true: the existing cached value stays visible while the new load runs in the background. Emitted containers carryisLoadingInBackground = truewhenemitBackgroundLoadsis enabled (e.g. vialistenReloadable()).metadata: arbitrary data attached to this load call; accessible inside the loader function via themetadataproperty on theEmitterreceiver. See Metadata for available types and how to define custom ones.
subject.newAsyncLoad(
silently = true,
metadata = SourceTypeMetadata(RemoteSourceType),
) {
emit(fetchRemote())
}
Load Triggers
Inside the loader function, loadTrigger tells you why the loader was
called. Use it to skip unnecessary steps, e.g. the local-cache check on
explicit reloads:
private val productsSubject = LazyFlowSubject.create {
if (loadTrigger != LoadTrigger.Reload) {
val local = localDataSource.getProducts()
if (local != null) emit(local)
}
val remote = remoteDataSource.getProducts()
localDataSource.save(remote)
emit(remote)
}
| Value | Meaning |
|---|---|
LoadTrigger.NewLoad |
Loader was set with newLoad() or create {} |
LoadTrigger.Reload |
Loader was re-triggered by reload() / reloadAsync() |
LoadTrigger.CacheExpired |
Cache timeout elapsed; the next subscriber triggered a fresh load |
ContainerConfiguration
Pass a ContainerConfiguration to listen() to control what extra metadata
is attached to emitted containers:
val flow: Flow<Container<List<Product>>> = subject.listen(
configuration = ContainerConfiguration(
emitReloadFunction = true, // attach a reload function to each container
emitBackgroundLoads = true, // set isLoadingInBackground = true while reloading silently
)
)
emitReloadFunction- each emittedContainer.Success/Container.Errorcarries areloadFunctionthat, when called, triggersreloadAsync()on the subject. Useful for UI components that need to offer a "retry" button without knowing about the subject directly.emitBackgroundLoads- while a silent reload is in progress, emitted containers haveisLoadingInBackground = true. Useful when you want to display the current loaded data along with an additional indication that something is being loaded right now (for example, PullToRefresh behavior)
listenReloadable
listenReloadable() is a convenience shorthand that enables both flags:
// equivalent to listen(ContainerConfiguration(emitReloadFunction = true, emitBackgroundLoads = true))
fun listenProducts(): Flow<Container<List<Product>>> = subject.listenReloadable()
LazyCache
LazyCache<Arg, T> is a map of LazyFlowSubject instances keyed by an
argument. Each key has its own independent loading lifecycle.
Basic Usage
private val usersCache = LazyCache.create<Long, User> { id ->
val local = localDataSource.getUserById(id)
if (local != null) emit(local)
val remote = remoteDataSource.getUserById(id)
localDataSource.save(remote)
emit(remote)
}
fun getUser(id: Long): Flow<Container<User>> = usersCache.listen(id)
fun reloadUser(id: Long) = usersCache.reloadAsync(id)
For single-value loaders (no need to emit multiple times), use the simpler factory:
private val usersCache = LazyCache.createSimple<Long, User> { id ->
remoteDataSource.getUserById(id)
}
API Overview
All methods mirror LazyFlowSubject but take an additional arg parameter:
// Listen to the flow for a specific key:
usersCache.listen(id)
usersCache.listen(id, ContainerConfiguration(emitReloadFunction = true))
// Get the current snapshot without subscribing:
val current: Container<User> = usersCache.get(id)
// Reload:
usersCache.reload(id)
usersCache.reloadAsync(id)
usersCache.reloadAsync(id, silently = true)
// Push a value directly:
usersCache.updateWith(id, successContainer(user))
usersCache.updateWith(id) { oldContainer -> oldContainer.map { it.copy(name = newName) } }
usersCache.updateIfSuccess(id) { old -> old.copy(name = newName) }
// Subscriber counts:
val count: Int = usersCache.getActiveCollectorsCount(id)
val active: Boolean = usersCache.hasActiveCollectors(id)
// Remove all cached entries that have no active subscribers:
usersCache.reset()
FlowSubject
FlowSubject<T> gives you manual control over a finite Flow. It is
conceptually similar to PublishSubject from RxJava.
Unlike StateFlow, FlowSubject:
- Does not require an initial value
- Can be completed (successfully or with an error)
- Replays the latest value to new subscribers after completion
val subject = FlowSubject.create<String>()
subject.onNext("first")
subject.onNext("second")
subject.onComplete()
subject.flow().collect { value ->
println(value)
}
SubjectFactory
SubjectFactory is an interface for creating LazyFlowSubject and LazyCache
instances. Use it instead of calling LazyFlowSubject.create {} directly.
Testability
Injecting SubjectFactory via DI (e.g. Hilt) makes it straightforward to
replace the real factory with a test double that returns pre-configured or mock
subjects:
class ProductRepository(
private val subjectFactory: SubjectFactory = SubjectFactory,
) {
private val subject = subjectFactory.createSubject {
delay(1000)
emit("my-item")
}
fun listen(): ContainerFlow<String> = subject.listenReloadable()
}
In production, bind DefaultSubjectFactory with your chosen cache timeout:
@Provides
@Singleton
fun provideSubjectFactory(): SubjectFactory =
DefaultSubjectFactory(cacheTimeoutMillis = 60_000L)
In tests, replace it with any fake implementation of SubjectFactory or a mock.
The global default can also be overridden for tests:
SubjectFactory.setFactory(FakeSubjectFactory())
// ... run tests ...
SubjectFactory.resetFactory()
Convenience Factory Functions
SubjectFactory provides several extension functions to reduce boilerplate:
// Create a LazyFlowSubject with a simple (single-value) loader:
val subject: LazyFlowSubject<String> = subjectFactory.createSimpleSubject(
sourceType = RemoteSourceType,
) { fetchString() }
// Create a LazyCache with a simple loader:
val cache: LazyCache<Long, User> = subjectFactory.createSimpleCache(
sourceType = RemoteSourceType,
) { id -> fetchUser(id) }
// Create a StateFlow directly (backed by a LazyFlowSubject internally):
val flow: StateFlow<Container<String>> = subjectFactory.createFlow {
emit(fetchData())
}
// Create a reloadable StateFlow (emitReloadFunction + emitBackgroundLoads enabled):
val flow: StateFlow<Container<String>> = subjectFactory.createReloadableFlow {
emit(fetchData())
}
Metadata
ContainerMetadata
ContainerMetadata is an immutable bag attached to Container.Success and
Container.Error. Multiple metadata instances can be combined:
val meta = SourceTypeMetadata(RemoteSourceType) + ReloadFunctionMetadata { loadItems() }
val container = successContainer("data", meta)
When two metadata instances of the same type are combined, the second one replaces the first:
val combined = SourceTypeMetadata(LocalSourceType) + SourceTypeMetadata(RemoteSourceType)
// result: only RemoteSourceType is kept
Access a specific metadata type:
val sourceMeta: SourceTypeMetadata? = container.metadata.get<SourceTypeMetadata>()
Or use the shorthand extension properties available on containers:
val source: SourceType = container.source
val isLoading: Boolean = container.isLoadingInBackground
val reloadFn = container.reloadFunction
SourceType
SourceType communicates where the data came from. The built-in values are:
| Value | Meaning |
|---|---|
LocalSourceType |
Loaded from a local/on-device data source |
RemoteSourceType |
Fetched from a remote/network data source |
ImmediateSourceType |
Set directly, not via a loader |
FakeSourceType |
Provided by a test double or fake implementation |
UnknownSourceType |
Source is not known |
Emit with a source type inside the loader:
LazyFlowSubject.create {
emit(localDataSource.get(), LocalSourceType)
// isLastValue arg is optional, but it can improve performance a bit:
emit(remoteDataSource.get(), RemoteSourceType, isLastValue = true)
}
Set a source type on an existing container:
val container = successContainer("data", SourceTypeMetadata(RemoteSourceType))
// or update metadata on an existing container:
val updated = container.update { source = RemoteSourceType }
Read the source type:
val success: Container.Success<String> = ...
println(success.source) // RemoteSourceType
ReloadFunctionMetadata
Attaching a reload function to a container lets UI components trigger a reload without needing a direct reference to the subject / view-model, or any other components:
// The listenReloadable() shorthand does this automatically:
fun listenProducts() = productsSubject.listenReloadable()
// Or attach manually on an existing container:
val container = successContainer("data") + ReloadFunctionMetadata { loadMyData() }
// Or override the reload function in a flow:
val flow = source.containerUpdate {
val originalReload = reloadFunction
reloadFunction = {
println("Reloading...")
originalReload(it) // call the original reload function if needed
}
}
Calling the reload function from UI code:
val container: Container<String> = ...
container.fold(
onError = { ex -> Button(onClick = { container.reload(silently = false) }) { Text("Retry") } },
onSuccess = { value -> /* ... */ },
)
IsLoadingInBackgroundMetadata
When a silent reload is in progress and emitBackgroundLoads = true is set,
emitted containers carry isLoadingInBackground = true. UI can use this to
show an indicator (e.g. pull-to-refresh) while still displaying the stale data:
container.fold(
onSuccess = { value ->
if (isLoadingInBackground) ShowRefreshIndicator()
ShowContent(value)
},
)
LoadTrigger
Available inside the loader function via loadTrigger: LoadTrigger:
LazyFlowSubject.create {
when (loadTrigger) {
LoadTrigger.NewLoad -> { /* first-ever load or newLoad() called */ }
LoadTrigger.Reload -> { /* explicit reload() call */ }
LoadTrigger.CacheExpired -> { /* fresh load after cache timed out */ }
}
}
Custom Metadata
You can define your own metadata types by implementing ContainerMetadata:
data class TimestampMetadata(val timestamp: Long) : ContainerMetadata
// attach:
val container = successContainer("data", TimestampMetadata(System.currentTimeMillis()))
// read:
val ts: Long? = container.metadata.get<TimestampMetadata>()?.timestamp
Implement ContainerMetadata.Hidden to prevent the metadata from being seen
by downstream collectors (it is still passed through internally and visible from
the loader function):
data class TimestampMetadata(val timestamp: Long) : ContainerMetadata, ContainerMetadata.Hidden
Flow Dependencies in Loader Functions
Starting from v2.0.0-beta13, loader functions can subscribe to external Kotlin flows. When the subscribed flow emits a new value, the loader function is automatically re-executed.
dependsOnContainerFlow
Use dependsOnContainerFlow to depend on a Flow<Container<T>>. If the
dependent flow emits Container.Error, the current load is failed with the
same exception. If it emits Container.Pending, the load waits:
interface SessionProvider {
fun getCurrentUserFlow(): Flow<Container<User>>
}
private val itemsSubject = LazyFlowSubject.create {
// The loader re-runs whenever the current user changes:
val currentUser: User = dependsOnContainerFlow("getCurrentUser") {
sessionProvider.getCurrentUserFlow()
}
val items = remoteDataSource.getItems(currentUser)
emit(items, RemoteSourceType)
}
dependsOnFlow
For plain Flow<T> dependencies (not wrapped in Container):
val currentUser: User = dependsOnFlow("getUser") {
sessionProvider.getUserFlow()
}
Key Stability
Every dependsOnFlow / dependsOnContainerFlow call must be given a stable
key (or key + arguments) that uniquely identifies the flow instance. The keys
are used to cache the subscribed flows across re-executions of the loader:
// simple key:
val user: User = dependsOnContainerFlow("getCurrentUser") {
sessionProvider.getCurrentUserFlow()
}
// key with arguments (important if the argument changes the flow):
val userId: String = sessionProvider.getCurrentUserId()
val user: User = dependsOnContainerFlow("getUserById", userId) {
userRepository.getUserById(userId)
}
If you call dependsOnFlow or dependsOnContainerFlow with the same key
twice within one loader execution, the second call is ignored and returns the
cached result from the first call:
val a: String = dependsOnContainerFlow("key") { getFlow1() }
val b: String = dependsOnContainerFlow("key") { getFlow2() } // getFlow2 is ignored
// a == b, both refer to the results from the first call