Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildsystem/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ ext.versions = [

// Plugins
androidGradlePlugin : '4.0.0',
androidGradlePlugin : '4.0.0-beta05',
dokkaGradlePlugin : '0.10.0',
ktlintGradle : '9.1.1',
spotlessGradlePlugin : '3.26.1',
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Tue Jan 07 09:58:49 EST 2020
#Thu Jul 23 11:58:27 PDT 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-milestone-1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-milestone-1-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.flow.withIndex
import kotlin.time.ExperimentalTime

@ExperimentalTime
Expand Down Expand Up @@ -162,64 +161,70 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
val diskLock = CompletableDeferred<Unit>()
val networkLock = CompletableDeferred<Unit>()
val networkFlow = createNetworkFlow(request, networkLock)
if (!request.shouldSkipCache(CacheType.DISK)) {
val skipDiskCache = request.shouldSkipCache(CacheType.DISK)
if (!skipDiskCache) {
diskLock.complete(Unit)
}
val diskFlow = sourceOfTruth.reader(request.key, diskLock).onStart {
// wait for disk to latch first to ensure it happens before network triggers.
// after that, if we'll not read from disk, then allow network to continue
if (request.shouldSkipCache(CacheType.DISK)) {
if (skipDiskCache) {
networkLock.complete(Unit)
}
}
// we use a merge implementation that gives the source of the flow so that we can decide
// based on that.
return networkFlow.merge(diskFlow.withIndex()).transform {
return networkFlow.merge(diskFlow).transform {
// left is Fetcher while right is source of truth
if (it is Either.Left) {
if (it.value !is StoreResponse.Data) {
emit(it.value.swapType())
when (it) {
is Either.Left -> {
// left, that is data from network
when (it.value) {
is StoreResponse.Data ->
// unlocking disk only if network sent data so that fresh data request
// never receives disk data by mistake
diskLock.complete(Unit)
else ->
emit(it.value.swapType())
}
// network sent something
if (it.value is StoreResponse.Data) {
// unlocking disk only if network sent data so that fresh data request never
// receives disk data by mistake
diskLock.complete(Unit)
}
} else if (it is Either.Right) {
}
is Either.Right -> {
// right, that is data from disk
val (_, diskData) = it.value
val diskValue = diskData.dataOrNull()
if (diskValue != null) {
emit(
StoreResponse.Data(
value = diskValue,
origin = diskData.origin
)
)
} else if (diskData is StoreResponse.Error) {
// disk sent an error, send it down as well
emit(diskData.swapType())
}

// If this is the first disk value and it is null, we should allow fetcher
// to start emitting values.
// If disk sent a read error, we should allow fetcher to start emitting values
// since there is nothing to read from disk.
// If disk sent a write error, we should NOT allow fetcher to start emitting
// values as we should always wait for the read attempt.
if (diskData is StoreResponse.Error.Exception) {
if (diskData.error is SourceOfTruth.ReadException) {
networkLock.complete(Unit)
when (val diskData = it.value) {
is StoreResponse.Data -> {
val diskValue = diskData.value
if (diskValue != null) {
emit(
StoreResponse.Data(
value = diskValue,
origin = diskData.origin
)
)
}
// If the disk value is null or refresh was requested then allow fetcher
// to start emitting values.
if (request.refresh || diskData.value == null) {
networkLock.complete(Unit)
}
}
// for other errors, don't do anything, wait for the read attempt
} else if (diskData is StoreResponse.Data) {
if (request.refresh || diskValue == null) {
networkLock.complete(Unit)
is StoreResponse.Error -> {
// disk sent an error, send it down as well
emit(diskData.swapType())

// If disk sent a read error, we should allow fetcher to start emitting
// values since there is nothing to read from disk. If disk sent a write
// error, we should NOT allow fetcher to start emitting values as we
// should always wait for the read attempt.
if (diskData is StoreResponse.Error.Exception &&
diskData.error is SourceOfTruth.ReadException) {
networkLock.complete(Unit)
}
// for other errors, don't do anything, wait for the read attempt
}
}
}
}
}
}

private fun createNetworkFlow(
Expand Down