Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions store/api/store.api
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ public final class com/dropbox/android/external/store4/SourceOfTruth$Companion {
public static synthetic fun ofFlow$default (Lcom/dropbox/android/external/store4/SourceOfTruth$Companion;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/dropbox/android/external/store4/SourceOfTruth;
}

public final class com/dropbox/android/external/store4/SourceOfTruth$ReadException : java/lang/RuntimeException {
public fun <init> (Ljava/lang/Object;Ljava/lang/Throwable;)V
public fun equals (Ljava/lang/Object;)Z
public final fun getKey ()Ljava/lang/Object;
public fun hashCode ()I
}

public final class com/dropbox/android/external/store4/SourceOfTruth$WriteException : java/lang/RuntimeException {
public fun <init> (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Throwable;)V
public fun equals (Ljava/lang/Object;)Z
public final fun getKey ()Ljava/lang/Object;
public final fun getValue ()Ljava/lang/Object;
public fun hashCode ()I
}

public abstract interface class com/dropbox/android/external/store4/Store {
public abstract fun clear (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun clearAll (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,76 @@ interface SourceOfTruth<Key, Input, Output> {
realDeleteAll = deleteAll
)
}

/**
* The exception provided when a write operation fails in SourceOfTruth.
*
* see [StoreResponse.Error.Exception]
*/
class WriteException(
/**
* The key for the failed write attempt
*/
val key: Any?, // TODO why are we not marking keys non-null ?
/**
* The value for the failed write attempt
*/
val value: Any?,
/**
* The exception thrown from the [SourceOfTruth]'s [write] method.
*/
cause: Throwable
) : RuntimeException(
"Failed to write value to Source of Truth. key: $key",
cause
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as WriteException

if (key != other.key) return false
if (value != other.value) return false
if (cause != other.cause) return false
return true
}

override fun hashCode(): Int {
var result = key.hashCode()
result = 31 * result + value.hashCode()
return result
}
}

/**
* Exception created when a [reader] throws an exception.
*
* see [StoreResponse.Error.Exception]
*/
class ReadException(
/**
* The key for the failed write attempt
*/
val key: Any?, // TODO shouldn't key be non-null?
cause: Throwable
) : RuntimeException(
"Failed to read from Source of Truth. key: $key",
cause
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as ReadException

if (key != other.key) return false
if (cause != other.cause) return false
return true
}

override fun hashCode(): Int {
return key.hashCode()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,35 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
}
} else if (it is Either.Right) {
// right, that is data from disk
val (index, diskData) = it.value
if (diskData.value != null) {
val (_, diskData) = it.value
val diskValue = diskData.dataOrNull()
if (diskValue != null) {
emit(
StoreResponse.Data(
value = diskData.value,
value = diskValue,
origin = diskData.origin
)
)
} else if (diskData is StoreResponse.Error) {
// disk sent an error, send it down as well
emit(diskData.swapType())
}

// if this is the first disk value and it is null, we should enable fetcher
// TODO should we ignore the index and always enable?
if (index == 0 && (diskData.value == null || request.refresh)) {
networkLock.complete(Unit)
// If this is the first disk value and it is null, we should allow fetcher
// to start emitting values.
// If disk sent a read error, we should allow fetcher to start emitting values
// since there is nothing to read from disk.
// If disk sent a write error, we should NOT allow fetcher to start emitting
// values as we should always wait for the read attempt.
if (diskData is StoreResponse.Error.Exception) {
if (diskData.error is SourceOfTruth.ReadException) {
networkLock.complete(Unit)
}
// for other errors, don't do anything, wait for the read attempt
} else if (diskData is StoreResponse.Data) {
if (request.refresh || diskValue == null) {
networkLock.complete(Unit)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ package com.dropbox.android.external.store4.impl

import com.dropbox.android.external.store4.ResponseOrigin
import com.dropbox.android.external.store4.SourceOfTruth
import com.dropbox.android.external.store4.StoreResponse
import com.dropbox.android.external.store4.impl.operators.mapIndexed
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onStart
import java.util.concurrent.atomic.AtomicLong

/**
Expand Down Expand Up @@ -56,7 +60,7 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
*/
private val versionCounter = AtomicLong(0)

fun reader(key: Key, lock: CompletableDeferred<Unit>): Flow<DataWithOrigin<Output>> {
fun reader(key: Key, lock: CompletableDeferred<Unit>): Flow<StoreResponse<Output?>> {
return flow {
val barrier = barriers.acquire(key)
val readerVersion: Long = versionCounter.incrementAndGet()
Expand All @@ -65,24 +69,58 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
emitAll(barrier.asFlow()
.flatMapLatest {
val messageArrivedAfterMe = readerVersion < it.version
when (it) {
val writeError = if (messageArrivedAfterMe && it is BarrierMsg.Open) {
it.writeError
} else {
null
}
val readFlow = when (it) {
is BarrierMsg.Open -> delegate.reader(key).mapIndexed { index, output ->
if (index == 0 && messageArrivedAfterMe) {
DataWithOrigin(
origin = ResponseOrigin.Fetcher,
val firstMsgOrigin = if (writeError == null) {
// restarted barrier without an error means write succeeded
ResponseOrigin.Fetcher
} else {
// when a write fails, we still get a new reader because
// we've disabled the previous reader before starting the
// write operation. But since write has failed, we should
// use the SourceOfTruth as the origin
ResponseOrigin.SourceOfTruth
}
StoreResponse.Data(
origin = firstMsgOrigin,
value = output
)
} else {
DataWithOrigin(
StoreResponse.Data(
origin = ResponseOrigin.SourceOfTruth,
value = output
)
}
} as StoreResponse<Output?> // necessary cast for catch block
}.catch {
this.emit(StoreResponse.Error.Exception<Output>(
error = SourceOfTruth.ReadException(
key = key,
cause = it
),
origin = ResponseOrigin.SourceOfTruth))
}
is BarrierMsg.Blocked -> {
flowOf()
}
}
readFlow
.onStart {
// if we have a pending error, make sure to dispatch it first.
if (writeError != null) {
emit(
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
error = writeError
)
)
}
}
})
} finally {
// we are using a finally here instead of onCompletion as there might be a
Expand All @@ -96,8 +134,33 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
val barrier = barriers.acquire(key)
try {
barrier.send(BarrierMsg.Blocked(versionCounter.incrementAndGet()))
delegate.write(key, value)
barrier.send(BarrierMsg.Open(versionCounter.incrementAndGet()))
val writeError = try {
delegate.write(key, value)
null
} catch (throwable: Throwable) {
if (throwable !is CancellationException) {
throwable
} else {
null
}
}

barrier.send(
BarrierMsg.Open(
version = versionCounter.incrementAndGet(),
writeError = writeError?.let {
SourceOfTruth.WriteException(
key = key,
value = value,
cause = writeError
)
})
)
if (writeError is CancellationException) {
// only throw if it failed because of cancelation.
// otherwise, we take care of letting downstream know that there was a write error
throw writeError
}
} finally {
barriers.release(key, barrier)
}
Expand All @@ -115,7 +178,7 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
val version: Long
) {
class Blocked(version: Long) : BarrierMsg(version)
class Open(version: Long) : BarrierMsg(version) {
class Open(version: Long, val writeError: Throwable? = null) : BarrierMsg(version) {
companion object {
val INITIAL = Open(INITIAL_VERSION)
}
Expand All @@ -129,8 +192,3 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
private const val INITIAL_VERSION = -1L
}
}

internal data class DataWithOrigin<T>(
val origin: ResponseOrigin,
val value: T?
)
Loading