Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,39 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(

override fun stream(request: StoreRequest<Key>): Flow<StoreResponse<Output>> =
flow<StoreResponse<Output>> {
val cached = if (request.shouldSkipCache(CacheType.MEMORY)) {
val cachedToEmit = if (request.shouldSkipCache(CacheType.MEMORY)) {
null
} else {
memCache?.get(request.key)
}

cached?.let {
cachedToEmit?.let {
// if we read a value from cache, dispatch it first
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
}
if (sourceOfTruth == null) {
val stream = if (sourceOfTruth == null) {
// piggypack only if not specified fresh data AND we emitted a value from the cache
val piggybackOnly = !request.refresh && cached != null
val piggybackOnly = !request.refresh && cachedToEmit != null
@Suppress("UNCHECKED_CAST")
emitAll(
createNetworkFlow(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreResponse<Output>> // when no source of truth Input == Output
)

createNetworkFlow(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreResponse<Output>> // when no source of truth Input == Output
} else {
emitAll(diskNetworkCombined(request, sourceOfTruth))
diskNetworkCombined(request, sourceOfTruth)
}
emitAll(stream.transform {
emit(it)
if (it is StoreResponse.NoNewData && cachedToEmit == null) {
// In the special case where fetcher returned no new data we actually want to
// serve cache data (even if the request specified skipping cache and/or SoT)
memCache?.get(request.key)?.let {
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
}
}
})
}.onEach {
// whenever a value is dispatched, save it to the memory cache
if (it.origin != ResponseOrigin.Cache) {
Expand Down Expand Up @@ -189,18 +198,6 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
if (it.value !is StoreResponse.Data) {
emit(it.value.swapType())
}

if (it.value is StoreResponse.NoNewData &&
request.shouldSkipCache(CacheType.MEMORY)) {
// In the special case where the request is skipping memory cache but the
// fetcher returned no new data we actaully want to serve cache and SoT
// data. In this case we do check the cache. If the request did not skip
// memory and no new data was returned from the fetcher we do not emit
// from cache again to avoid a emitting twice from the cache.
memCache?.get(request.key)?.let {
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
}
}
}
is Either.Right -> {
// right, that is data from disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
Expand Down Expand Up @@ -452,7 +453,7 @@ class FlowStoreTest {
}

@Test
fun `GIVEN no data from fetcher WHEN stream fresh data THEN fetch returns no data AND cached values are recevied`() =
fun `GIVEN SoT WHEN stream fresh data returns no data from fetcher THEN fetch returns no data AND cached values are recevied`() =
testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
Expand Down Expand Up @@ -481,12 +482,11 @@ class FlowStoreTest {
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
)

)
}

@Test
fun `GIVEN no data from fetcher WHEN stream cached data with refresh THEN cached values are recevied AND fetch returns no data`() =
fun `GIVEN SoT WHEN stream cached data with refresh returns NoNewData THEN cached values are recevied AND fetch returns no data`() =
testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
Expand Down Expand Up @@ -515,7 +515,72 @@ class FlowStoreTest {
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
)
)
}

@Test
fun `GIVEN no SoT WHEN stream fresh data returns no data from fetcher THEN fetch returns no data AND cached values are recevied`() =
testScope.runBlockingTest {
var createCount = 0
val pipeline = StoreBuilder.from<Int, String>(
fetcher = Fetcher.ofFlow {
if (createCount++ == 0) {
flowOf("remote-1")
} else {
flowOf()
}
}
)
.buildWithTestScope()

val firstFetch = pipeline.fresh(3) // prime the cache
assertThat(firstFetch).isEqualTo("remote-1")

assertThat(pipeline.stream(StoreRequest.fresh(3)))
.emitsExactly(
Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
),
Data(
value = "remote-1",
origin = ResponseOrigin.Cache
)
)
}

@Test
fun `GIVEN no SoT WHEN stream cached data with refresh returns NoNewData THEN cached values are recevied AND fetch returns no data`() =
testScope.runBlockingTest {
var createCount = 0
val pipeline = StoreBuilder.from<Int, String>(
fetcher = Fetcher.ofFlow {
if (createCount++ == 0) {
flowOf("remote-1")
} else {
flowOf()
}
}
)
.buildWithTestScope()

val firstFetch = pipeline.fresh(3) // prime the cache
assertThat(firstFetch).isEqualTo("remote-1")

assertThat(pipeline.stream(StoreRequest.cached(3, refresh = true)))
.emitsExactly(
Data(
value = "remote-1",
origin = ResponseOrigin.Cache
),
Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
)
)
}

Expand Down