Subjects¶
This page covers LazyFlowSubject building block for on-demand data loading along
with the metadata system that threads cross-cutting information through containers.
Table of Contents¶
- LazyFlowSubject
- Basic Usage
- Listening and Caching Behaviour
- Reloading Data
- Pushing Values Directly
- Replacing the Loader
- Load Triggers
- ContainerConfiguration
- listenReloadable
- SubjectFactory
- Testability
- Convenience Factory Functions
- Metadata
- ContainerMetadata
- SourceType
- ReloadFunctionMetadata
- BackgroundLoadStateMetadata
- LoadTrigger
- Custom Metadata
- Flow Dependencies in Loader Functions
- dependsOnContainerFlow
- dependsOnFlow
- Key Stability
LazyFlowSubject¶
LazyFlowSubject<T> converts a suspend loader function into a
Flow<Container<T>>. The loader runs lazily and its result is automatically
cached for subsequent subscribers.
Basic Usage¶
class ProductRepository(
private val localDataSource: ProductsLocalDataSource,
private val remoteDataSource: ProductsRemoteDataSource,
) {
private val productsSubject = LazyFlowSubject.create {
// Step 1: emit local (cached) data immediately if available:
val local = localDataSource.getProducts()
if (local != null) emit(local)
// Step 2: fetch from remote and update the local cache:
val remote = remoteDataSource.getProducts()
localDataSource.save(remote)
emit(remote)
}
// ListContainerFlow<T> is an alias for Flow<Container<List<T>>>
fun listenProducts(): ListContainerFlow<Product> = productsSubject.listen()
fun reload() = productsSubject.reloadAsync()
}
The lambda passed to LazyFlowSubject.create is the loader function. Inside
it, you have access to the Emitter<T> receiver, which provides:
emit(value): emit a loaded valueloadTrigger: LoadTrigger: why the loader was called (see Load Triggers)metadata: ContainerMetadata: metadata from the triggering calldependsOnFlow { }/dependsOnContainerFlow { }: subscribe to external flows (see Flow Dependencies)
Listening and Caching Behaviour¶
- The loader starts running the first time
listen()is called and a subscriber begins collecting - The latest loaded value is cached; new subscribers receive it immediately without re-running the loader
- When the last subscriber stops collecting, a timer starts (default: 1 second). If no new subscriber appears before the timer expires, the cached value is cleared and the loader will run again on the next subscription
- You can customise the timeout:
Reloading Data¶
Re-run the previous loader without changing it:
// fire-and-forget:
subject.reloadAsync()
// or observe the reload result:
subject.reload().collect { value -> ... }
You can reload silently (the existing cached value is kept visible while the new load runs in the background):
Pushing Values Directly¶
Place a value into the subject without running the loader:
subject.updateWith(successContainer("immediate value"))
// update the existing container:
subject.updateWith { oldContainer ->
oldContainer.map { it + " (updated)" }
}
// shorthand that only runs if the current container is Success:
subject.updateIfSuccess { oldValue ->
oldValue.copy(isFavorite = true)
}
After updateWith, calling reload() re-runs the previous loader function.
Replacing the Loader¶
You can replace the loader function at any time:
// assign a new multi-value loader (returns a Flow<T> of the new results):
val flow: Flow<T> = subject.newLoad {
emit(step1())
emit(step2())
}
// fire-and-forget variant:
subject.newAsyncLoad { emit(loadData()) }
// single-value loader (suspends until the value is loaded):
val result: T = subject.newSimpleLoad { fetchData() }
// single-value loader (fire-and-forget):
subject.newSimpleAsyncLoad { fetchData() }
The newLoad / newSimpleLoad functions also accept a loadConfig argument and
an optional metadata argument:
loadConfig = LoadConfig.SilentLoading: the existing cached value stays visible while the new load runs in the background. Emitted containers carrybackgroundLoadState = BackgroundLoadState.LoadingwhenemitBackgroundLoadsis enabled (e.g. vialistenReloadable()).metadata: arbitrary data attached to this load call; accessible inside the loader function via themetadataproperty on theEmitterreceiver. See Metadata for available types and how to define custom ones.
subject.newAsyncLoad(
loadConfig = LoadConfig.SilentLoading,
metadata = SourceTypeMetadata(RemoteSourceType),
) {
emit(fetchRemote())
}
Load Triggers¶
Inside the loader function, loadTrigger tells you why the loader was
called. Use it to skip unnecessary steps, e.g. the local-cache check on
explicit reloads:
private val productsSubject = LazyFlowSubject.create {
if (loadTrigger != LoadTrigger.Reload) {
val local = localDataSource.getProducts()
if (local != null) emit(local)
}
val remote = remoteDataSource.getProducts()
localDataSource.save(remote)
emit(remote)
}
| Value | Meaning |
|---|---|
LoadTrigger.NewLoad |
Loader was set with newLoad() or create {} |
LoadTrigger.Reload |
Loader was re-triggered by reload() / reloadAsync() |
LoadTrigger.CacheExpired |
Cache timeout elapsed; the next subscriber triggered a fresh load |
ContainerConfiguration¶
Pass a ContainerConfiguration to listen() to control what extra metadata
is attached to emitted containers:
val flow: Flow<Container<List<Product>>> = subject.listen(
configuration = ContainerConfiguration(
emitReloadFunction = true, // attach a reload function to each container
emitBackgroundLoads = true, // set BackgroundLoadState metadata value to Loading while reloading silently
)
)
emitReloadFunction- each emittedContainer.Success/Container.Errorcarries areloadFunctionthat, when called, triggersreloadAsync()on the subject. Useful for UI components that need to offer a "retry" button without knowing about the subject directly.emitBackgroundLoads- while a silent reload is in progress, emitted containers havebackgroundLoadStatemetadata property. Useful when you want to display the current loaded data along with an additional indication that something is being loaded right now (for example, PullToRefresh behavior)
listenReloadable¶
listenReloadable() is a convenience shorthand that enables both flags:
// equivalent to listen(ContainerConfiguration(emitReloadFunction = true, emitBackgroundLoads = true))
fun listenProducts(): Flow<Container<List<Product>>> = subject.listenReloadable()
SubjectFactory¶
SubjectFactory is an interface for creating LazyFlowSubject instances.
Use it instead of calling LazyFlowSubject.create {} directly.
Testability¶
Injecting SubjectFactory via DI (e.g. Hilt) makes it straightforward to
replace the real factory with a test double that returns pre-configured or mock
subjects:
class ProductRepository(
private val subjectFactory: SubjectFactory = SubjectFactory,
) {
private val subject = subjectFactory.createSubject {
delay(1000)
emit("my-item")
}
fun listen(): ContainerFlow<String> = subject.listenReloadable()
}
In production, bind DefaultSubjectFactory with your chosen cache timeout:
@Provides
@Singleton
fun provideSubjectFactory(): SubjectFactory =
DefaultSubjectFactory(cacheTimeoutMillis = 60_000L)
In tests, replace it with any fake implementation of SubjectFactory or a mock.
The global default can also be overridden for tests:
Convenience Factory Functions¶
SubjectFactory provides several extension functions to reduce boilerplate:
// Create a LazyFlowSubject with a simple (single-value) loader:
val subject: LazyFlowSubject<String> = subjectFactory.createSimpleSubject(
sourceType = RemoteSourceType,
) { fetchString() }
// Create a StateFlow directly (backed by a LazyFlowSubject internally):
val flow: StateFlow<Container<String>> = subjectFactory.createFlow {
emit(fetchData())
}
// Create a reloadable StateFlow (emitReloadFunction + emitBackgroundLoads enabled):
val flow: StateFlow<Container<String>> = subjectFactory.createReloadableFlow {
emit(fetchData())
}
Metadata¶
ContainerMetadata¶
ContainerMetadata is an immutable bag attached to Container.Success and
Container.Error. Multiple metadata instances can be combined:
val meta = SourceTypeMetadata(RemoteSourceType) + ReloadFunctionMetadata { loadItems() }
val container = successContainer("data", meta)
When two metadata instances of the same type are combined, the second one replaces the first:
val combined = SourceTypeMetadata(LocalSourceType) + SourceTypeMetadata(RemoteSourceType)
// result: only RemoteSourceType is kept
Access a specific metadata type:
Or use the shorthand extension properties available on containers:
val source: SourceType = container.sourceType
val bgLoadState: BackgroundLoadState = container.backgroundLoadState
val reloadFn = container.reloadFunction
SourceType¶
SourceType communicates where the data came from. The built-in values are:
| Value | Meaning |
|---|---|
LocalSourceType |
Loaded from a local/on-device data source |
RemoteSourceType |
Fetched from a remote/network data source |
ImmediateSourceType |
Set directly, not via a loader |
FakeSourceType |
Provided by a test double or fake implementation |
UnknownSourceType |
Source is not known |
Emit with a source type inside the loader:
LazyFlowSubject.create {
emit(localDataSource.get(), LocalSourceType)
// isLastValue arg is optional, but it can improve performance a bit:
emit(remoteDataSource.get(), RemoteSourceType, isLastValue = true)
}
Set a source type on an existing container:
val container = successContainer("data", SourceTypeMetadata(RemoteSourceType))
// or update metadata on an existing container:
val updated = container.update { sourceType = RemoteSourceType }
Read the source type:
ReloadFunctionMetadata¶
Attaching a reload function to a container lets UI components trigger a reload without needing a direct reference to the subject / view-model, or any other components:
// The listenReloadable() shorthand does this automatically:
fun listenProducts() = productsSubject.listenReloadable()
// Or attach manually on an existing container:
val container = successContainer("data") + ReloadFunctionMetadata { loadMyData() }
// Or override the reload function in a flow:
val flow = source.containerUpdate {
val originalReload = reloadFunction
reloadFunction = {
println("Reloading...")
originalReload(it) // call the original reload function if needed
}
}
Calling the reload function from UI code:
val container: Container<String> = ...
container.fold(
onError = { ex -> Button(onClick = { container.reload() }) { Text("Retry") } },
onSuccess = { value -> /* ... */ },
)
BackgroundLoadStateMetadata¶
When a silent reload is in progress and emitBackgroundLoads = true is set,
emitted containers carry backgroundLoadState = Loading metadata value. UI can use this to
show an indicator (e.g. pull-to-refresh) while still displaying the stale data:
container.fold(
onSuccess = { value ->
if (backgroundLoadState == BackgroundLoadState.Loading) ShowRefreshIndicator()
ShowContent(value)
},
)
LoadTrigger¶
Available inside the loader function via loadTrigger: LoadTrigger:
LazyFlowSubject.create {
when (loadTrigger) {
LoadTrigger.NewLoad -> { /* first-ever load or newLoad() called */ }
LoadTrigger.Reload -> { /* explicit reload() call */ }
LoadTrigger.CacheExpired -> { /* fresh load after cache timed out */ }
}
}
Custom Metadata¶
You can define your own metadata types by implementing ContainerMetadata:
data class TimestampMetadata(val timestamp: Long) : ContainerMetadata
// attach:
val container = successContainer("data", TimestampMetadata(System.currentTimeMillis()))
// read:
val ts: Long? = container.metadata.get<TimestampMetadata>()?.timestamp
Implement ContainerMetadata.Hidden to prevent the metadata from being seen
by downstream collectors (it is still passed through internally and visible from
the loader function):
Flow Dependencies in Loader Functions¶
Starting from v2.0.0-beta13, loader functions can subscribe to external Kotlin flows. When the subscribed flow emits a new value, the loader function is automatically re-executed.
dependsOnContainerFlow¶
Use dependsOnContainerFlow to depend on a Flow<Container<T>>. If the
dependent flow emits Container.Error, the current load is failed with the
same exception. If it emits Container.Pending, the load waits:
interface SessionProvider {
fun getCurrentUserFlow(): Flow<Container<User>>
}
private val itemsSubject = LazyFlowSubject.create {
// The loader re-runs whenever the current user changes:
val currentUser: User = dependsOnContainerFlow("getCurrentUser") {
sessionProvider.getCurrentUserFlow()
}
val items = remoteDataSource.getItems(currentUser)
emit(items, RemoteSourceType)
}
dependsOnFlow¶
For plain Flow<T> dependencies (not wrapped in Container):
Key Stability¶
Every dependsOnFlow / dependsOnContainerFlow call must be given a stable
key (or key + arguments) that uniquely identifies the flow instance. The keys
are used to cache the subscribed flows across re-executions of the loader:
// simple key:
val user: User = dependsOnContainerFlow("getCurrentUser") {
sessionProvider.getCurrentUserFlow()
}
// key with arguments (important if the argument changes the flow):
val userId: String = sessionProvider.getCurrentUserId()
val user: User = dependsOnContainerFlow("getUserById", userId) {
userRepository.getUserById(userId)
}
If you call dependsOnFlow or dependsOnContainerFlow with the same key
twice within one loader execution, the second call is ignored and returns the
cached result from the first call: