Subjects & Cache

This page covers LazyFlowSubject, LazyCache, and FlowSubject (the building blocks for on-demand data loading) along with the metadata system that threads cross-cutting information through containers.

Table of Contents

LazyFlowSubject

LazyFlowSubject<T> converts a suspend loader function into a Flow<Container<T>>. The loader runs lazily and its result is automatically cached for subsequent subscribers.

Basic Usage

class ProductRepository(
    private val localDataSource: ProductsLocalDataSource,
    private val remoteDataSource: ProductsRemoteDataSource,
) {

    private val productsSubject = LazyFlowSubject.create {
        // Step 1: emit local (cached) data immediately if available:
        val local = localDataSource.getProducts()
        if (local != null) emit(local)

        // Step 2: fetch from remote and update the local cache:
        val remote = remoteDataSource.getProducts()
        localDataSource.save(remote)
        emit(remote)
    }

    // ListContainerFlow<T> is an alias for Flow<Container<List<T>>>
    fun listenProducts(): ListContainerFlow<Product> = productsSubject.listen()

    fun reload() = productsSubject.reloadAsync()
}

The lambda passed to LazyFlowSubject.create is the loader function. Inside it, you have access to the Emitter<T> receiver, which provides:

  • emit(value): emit a loaded value
  • loadTrigger: LoadTrigger: why the loader was called (see Load Triggers)
  • metadata: ContainerMetadata: metadata from the triggering call
  • dependsOnFlow { } / dependsOnContainerFlow { }: subscribe to external flows (see Flow Dependencies)

Listening and Caching Behaviour

  • The loader starts running the first time listen() is called and a subscriber begins collecting
  • The latest loaded value is cached; new subscribers receive it immediately without re-running the loader
  • When the last subscriber stops collecting, a timer starts (default: 1 second). If no new subscriber appears before the timer expires, the cached value is cleared and the loader will run again on the next subscription
  • You can customise the timeout:
LazyFlowSubject.create(
    cacheTimeoutMillis = 60_000L, // 60 seconds
) {
    emit(loadData())
}

Reloading Data

Re-run the previous loader without changing it:

// fire-and-forget:
subject.reloadAsync()

// or observe the reload result:
subject.reload().collect { value -> ... }

You can reload silently (the existing cached value is kept visible while the new load runs in the background):

subject.reloadAsync(silently = true)

Pushing Values Directly

Place a value into the subject without running the loader:

subject.updateWith(successContainer("immediate value"))

// update the existing container:
subject.updateWith { oldContainer ->
    oldContainer.map { it + " (updated)" }
}

// shorthand that only runs if the current container is Success:
subject.updateIfSuccess { oldValue ->
    oldValue.copy(isFavorite = true)
}

After updateWith, calling reload() re-runs the previous loader function.

Replacing the Loader

You can replace the loader function at any time:

// assign a new multi-value loader (returns a Flow<T> of the new results):
val flow: Flow<T> = subject.newLoad {
    emit(step1())
    emit(step2())
}

// fire-and-forget variant:
subject.newAsyncLoad { emit(loadData()) }

// single-value loader (suspends until the value is loaded):
val result: T = subject.newSimpleLoad { fetchData() }

// single-value loader (fire-and-forget):
subject.newSimpleAsyncLoad { fetchData() }

The newLoad / newSimpleLoad functions also accept a silently flag and an optional metadata argument:

  • silently = true: the existing cached value stays visible while the new load runs in the background. Emitted containers carry isLoadingInBackground = true when emitBackgroundLoads is enabled (e.g. via listenReloadable()).
  • metadata: arbitrary data attached to this load call; accessible inside the loader function via the metadata property on the Emitter receiver. See Metadata for available types and how to define custom ones.
subject.newAsyncLoad(
    silently = true,
    metadata = SourceTypeMetadata(RemoteSourceType),
) {
    emit(fetchRemote())
}

Load Triggers

Inside the loader function, loadTrigger tells you why the loader was called. Use it to skip unnecessary steps, e.g. the local-cache check on explicit reloads:

private val productsSubject = LazyFlowSubject.create {
    if (loadTrigger != LoadTrigger.Reload) {
        val local = localDataSource.getProducts()
        if (local != null) emit(local)
    }
    val remote = remoteDataSource.getProducts()
    localDataSource.save(remote)
    emit(remote)
}
Value Meaning
LoadTrigger.NewLoad Loader was set with newLoad() or create {}
LoadTrigger.Reload Loader was re-triggered by reload() / reloadAsync()
LoadTrigger.CacheExpired Cache timeout elapsed; the next subscriber triggered a fresh load

ContainerConfiguration

Pass a ContainerConfiguration to listen() to control what extra metadata is attached to emitted containers:

val flow: Flow<Container<List<Product>>> = subject.listen(
    configuration = ContainerConfiguration(
        emitReloadFunction    = true,  // attach a reload function to each container
        emitBackgroundLoads   = true,  // set isLoadingInBackground = true while reloading silently
    )
)
  • emitReloadFunction - each emitted Container.Success / Container.Error carries a reloadFunction that, when called, triggers reloadAsync() on the subject. Useful for UI components that need to offer a "retry" button without knowing about the subject directly.
  • emitBackgroundLoads - while a silent reload is in progress, emitted containers have isLoadingInBackground = true. Useful when you want to display the current loaded data along with an additional indication that something is being loaded right now (for example, PullToRefresh behavior)

listenReloadable

listenReloadable() is a convenience shorthand that enables both flags:

// equivalent to listen(ContainerConfiguration(emitReloadFunction = true, emitBackgroundLoads = true))
fun listenProducts(): Flow<Container<List<Product>>> = subject.listenReloadable()

LazyCache

LazyCache<Arg, T> is a map of LazyFlowSubject instances keyed by an argument. Each key has its own independent loading lifecycle.

Basic Usage

private val usersCache = LazyCache.create<Long, User> { id ->
    val local = localDataSource.getUserById(id)
    if (local != null) emit(local)
    val remote = remoteDataSource.getUserById(id)
    localDataSource.save(remote)
    emit(remote)
}

fun getUser(id: Long): Flow<Container<User>> = usersCache.listen(id)

fun reloadUser(id: Long) = usersCache.reloadAsync(id)

For single-value loaders (no need to emit multiple times), use the simpler factory:

private val usersCache = LazyCache.createSimple<Long, User> { id ->
    remoteDataSource.getUserById(id)
}

API Overview

All methods mirror LazyFlowSubject but take an additional arg parameter:

// Listen to the flow for a specific key:
usersCache.listen(id)
usersCache.listen(id, ContainerConfiguration(emitReloadFunction = true))

// Get the current snapshot without subscribing:
val current: Container<User> = usersCache.get(id)

// Reload:
usersCache.reload(id)
usersCache.reloadAsync(id)
usersCache.reloadAsync(id, silently = true)

// Push a value directly:
usersCache.updateWith(id, successContainer(user))
usersCache.updateWith(id) { oldContainer -> oldContainer.map { it.copy(name = newName) } }
usersCache.updateIfSuccess(id) { old -> old.copy(name = newName) }

// Subscriber counts:
val count: Int  = usersCache.getActiveCollectorsCount(id)
val active: Boolean = usersCache.hasActiveCollectors(id)

// Remove all cached entries that have no active subscribers:
usersCache.reset()

FlowSubject

FlowSubject<T> gives you manual control over a finite Flow. It is conceptually similar to PublishSubject from RxJava.

Unlike StateFlow, FlowSubject: - Does not require an initial value - Can be completed (successfully or with an error) - Replays the latest value to new subscribers after completion

val subject = FlowSubject.create<String>()

subject.onNext("first")
subject.onNext("second")
subject.onComplete()

subject.flow().collect { value ->
    println(value)
}

SubjectFactory

SubjectFactory is an interface for creating LazyFlowSubject and LazyCache instances. Use it instead of calling LazyFlowSubject.create {} directly.

Testability

Injecting SubjectFactory via DI (e.g. Hilt) makes it straightforward to replace the real factory with a test double that returns pre-configured or mock subjects:

class ProductRepository(
    private val subjectFactory: SubjectFactory = SubjectFactory,
) {
    private val subject = subjectFactory.createSubject {
        delay(1000)
        emit("my-item")
    }

    fun listen(): ContainerFlow<String> = subject.listenReloadable()
}

In production, bind DefaultSubjectFactory with your chosen cache timeout:

@Provides
@Singleton
fun provideSubjectFactory(): SubjectFactory =
    DefaultSubjectFactory(cacheTimeoutMillis = 60_000L)

In tests, replace it with any fake implementation of SubjectFactory or a mock.

The global default can also be overridden for tests:

SubjectFactory.setFactory(FakeSubjectFactory())
// ... run tests ...
SubjectFactory.resetFactory()

Convenience Factory Functions

SubjectFactory provides several extension functions to reduce boilerplate:

// Create a LazyFlowSubject with a simple (single-value) loader:
val subject: LazyFlowSubject<String> = subjectFactory.createSimpleSubject(
    sourceType  = RemoteSourceType,
) { fetchString() }

// Create a LazyCache with a simple loader:
val cache: LazyCache<Long, User> = subjectFactory.createSimpleCache(
    sourceType = RemoteSourceType,
) { id -> fetchUser(id) }

// Create a StateFlow directly (backed by a LazyFlowSubject internally):
val flow: StateFlow<Container<String>> = subjectFactory.createFlow {
    emit(fetchData())
}

// Create a reloadable StateFlow (emitReloadFunction + emitBackgroundLoads enabled):
val flow: StateFlow<Container<String>> = subjectFactory.createReloadableFlow {
    emit(fetchData())
}

Metadata

ContainerMetadata

ContainerMetadata is an immutable bag attached to Container.Success and Container.Error. Multiple metadata instances can be combined:

val meta = SourceTypeMetadata(RemoteSourceType) + ReloadFunctionMetadata { loadItems() }
val container = successContainer("data", meta)

When two metadata instances of the same type are combined, the second one replaces the first:

val combined = SourceTypeMetadata(LocalSourceType) + SourceTypeMetadata(RemoteSourceType)
// result: only RemoteSourceType is kept

Access a specific metadata type:

val sourceMeta: SourceTypeMetadata? = container.metadata.get<SourceTypeMetadata>()

Or use the shorthand extension properties available on containers:

val source: SourceType  = container.source
val isLoading: Boolean  = container.isLoadingInBackground
val reloadFn            = container.reloadFunction

SourceType

SourceType communicates where the data came from. The built-in values are:

Value Meaning
LocalSourceType Loaded from a local/on-device data source
RemoteSourceType Fetched from a remote/network data source
ImmediateSourceType Set directly, not via a loader
FakeSourceType Provided by a test double or fake implementation
UnknownSourceType Source is not known

Emit with a source type inside the loader:

LazyFlowSubject.create {
    emit(localDataSource.get(), LocalSourceType)
    // isLastValue arg is optional, but it can improve performance a bit:
    emit(remoteDataSource.get(), RemoteSourceType, isLastValue = true)
}

Set a source type on an existing container:

val container = successContainer("data", SourceTypeMetadata(RemoteSourceType))

// or update metadata on an existing container:
val updated = container.update { source = RemoteSourceType }

Read the source type:

val success: Container.Success<String> = ...
println(success.source)  // RemoteSourceType

ReloadFunctionMetadata

Attaching a reload function to a container lets UI components trigger a reload without needing a direct reference to the subject / view-model, or any other components:

// The listenReloadable() shorthand does this automatically:
fun listenProducts() = productsSubject.listenReloadable()

// Or attach manually on an existing container:
val container = successContainer("data") + ReloadFunctionMetadata { loadMyData() }

// Or override the reload function in a flow:
val flow = source.containerUpdate {
    val originalReload = reloadFunction
    reloadFunction = {
        println("Reloading...")
        originalReload(it) // call the original reload function if needed
    }
}

Calling the reload function from UI code:

val container: Container<String> = ...
container.fold(
    onError   = { ex -> Button(onClick = { container.reload(silently = false) }) { Text("Retry") } },
    onSuccess = { value -> /* ... */ },
)

IsLoadingInBackgroundMetadata

When a silent reload is in progress and emitBackgroundLoads = true is set, emitted containers carry isLoadingInBackground = true. UI can use this to show an indicator (e.g. pull-to-refresh) while still displaying the stale data:

container.fold(
    onSuccess = { value ->
        if (isLoadingInBackground) ShowRefreshIndicator()
        ShowContent(value)
    },
)

LoadTrigger

Available inside the loader function via loadTrigger: LoadTrigger:

LazyFlowSubject.create {
    when (loadTrigger) {
        LoadTrigger.NewLoad      -> { /* first-ever load or newLoad() called */ }
        LoadTrigger.Reload       -> { /* explicit reload() call */ }
        LoadTrigger.CacheExpired -> { /* fresh load after cache timed out */ }
    }
}

Custom Metadata

You can define your own metadata types by implementing ContainerMetadata:

data class TimestampMetadata(val timestamp: Long) : ContainerMetadata

// attach:
val container = successContainer("data", TimestampMetadata(System.currentTimeMillis()))

// read:
val ts: Long? = container.metadata.get<TimestampMetadata>()?.timestamp

Implement ContainerMetadata.Hidden to prevent the metadata from being seen by downstream collectors (it is still passed through internally and visible from the loader function):

data class TimestampMetadata(val timestamp: Long) : ContainerMetadata, ContainerMetadata.Hidden

Flow Dependencies in Loader Functions

Starting from v2.0.0-beta13, loader functions can subscribe to external Kotlin flows. When the subscribed flow emits a new value, the loader function is automatically re-executed.

dependsOnContainerFlow

Use dependsOnContainerFlow to depend on a Flow<Container<T>>. If the dependent flow emits Container.Error, the current load is failed with the same exception. If it emits Container.Pending, the load waits:

interface SessionProvider {
    fun getCurrentUserFlow(): Flow<Container<User>>
}

private val itemsSubject = LazyFlowSubject.create {
    // The loader re-runs whenever the current user changes:
    val currentUser: User = dependsOnContainerFlow("getCurrentUser") {
        sessionProvider.getCurrentUserFlow()
    }
    val items = remoteDataSource.getItems(currentUser)
    emit(items, RemoteSourceType)
}

dependsOnFlow

For plain Flow<T> dependencies (not wrapped in Container):

val currentUser: User = dependsOnFlow("getUser") {
    sessionProvider.getUserFlow()
}

Key Stability

Every dependsOnFlow / dependsOnContainerFlow call must be given a stable key (or key + arguments) that uniquely identifies the flow instance. The keys are used to cache the subscribed flows across re-executions of the loader:

// simple key:
val user: User = dependsOnContainerFlow("getCurrentUser") {
    sessionProvider.getCurrentUserFlow()
}

// key with arguments (important if the argument changes the flow):
val userId: String = sessionProvider.getCurrentUserId()
val user: User = dependsOnContainerFlow("getUserById", userId) {
    userRepository.getUserById(userId)
}

If you call dependsOnFlow or dependsOnContainerFlow with the same key twice within one loader execution, the second call is ignored and returns the cached result from the first call:

val a: String = dependsOnContainerFlow("key") { getFlow1() }
val b: String = dependsOnContainerFlow("key") { getFlow2() } // getFlow2 is ignored
// a == b, both refer to the results from the first call