From 73440016ed6b8c3a319095ecd5a2081ab7446c3b Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Thu, 18 Jun 2020 17:36:39 -0700 Subject: [PATCH 1/6] reproduce #177 and fix reader errors --- .../android/external/store4/impl/RealStore.kt | 7 +- .../store4/impl/SourceOfTruthWithBarrier.kt | 19 +++-- .../store4/SourceOfTruthErrorsTest.kt | 68 +++++++++++++++ .../store4/SourceOfTruthWithBarrierTest.kt | 85 +++++++++++++++++-- .../store4/testutil/InMemoryPersister.kt | 17 +++- 5 files changed, 175 insertions(+), 21 deletions(-) create mode 100644 store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt index 2218f59dd..45a1781c0 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt @@ -189,10 +189,11 @@ internal class RealStore( } else if (it is Either.Right) { // right, that is data from disk val (index, diskData) = it.value - if (diskData.value != null) { + val diskValue = diskData.dataOrNull() + if (diskValue != null) { emit( StoreResponse.Data( - value = diskData.value, + value = diskValue, origin = diskData.origin ) ) @@ -200,7 +201,7 @@ internal class RealStore( // if this is the first disk value and it is null, we should enable fetcher // TODO should we ignore the index and always enable? - if (index == 0 && (diskData.value == null || request.refresh)) { + if (index == 0 && (diskValue == null || request.refresh)) { networkLock.complete(Unit) } } diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt index 4162b452d..5812650e9 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt @@ -17,6 +17,7 @@ package com.dropbox.android.external.store4.impl import com.dropbox.android.external.store4.ResponseOrigin import com.dropbox.android.external.store4.SourceOfTruth +import com.dropbox.android.external.store4.StoreResponse import com.dropbox.android.external.store4.impl.operators.mapIndexed import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -24,6 +25,7 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flow @@ -56,7 +58,7 @@ internal class SourceOfTruthWithBarrier( */ private val versionCounter = AtomicLong(0) - fun reader(key: Key, lock: CompletableDeferred): Flow> { + fun reader(key: Key, lock: CompletableDeferred): Flow> { return flow { val barrier = barriers.acquire(key) val readerVersion: Long = versionCounter.incrementAndGet() @@ -68,16 +70,20 @@ internal class SourceOfTruthWithBarrier( when (it) { is BarrierMsg.Open -> delegate.reader(key).mapIndexed { index, output -> if (index == 0 && messageArrivedAfterMe) { - DataWithOrigin( + StoreResponse.Data( origin = ResponseOrigin.Fetcher, value = output ) } else { - DataWithOrigin( + StoreResponse.Data( origin = ResponseOrigin.SourceOfTruth, value = output - ) + ) as StoreResponse } + }.catch { + this.emit(StoreResponse.Error.Exception( + error = it, + origin = ResponseOrigin.SourceOfTruth)) } is BarrierMsg.Blocked -> { flowOf() @@ -129,8 +135,3 @@ internal class SourceOfTruthWithBarrier( private const val INITIAL_VERSION = -1L } } - -internal data class DataWithOrigin( - val origin: ResponseOrigin, - val value: T? -) diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt new file mode 100644 index 000000000..c3475e63e --- /dev/null +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dropbox.android.external.store4 + +import com.dropbox.android.external.store4.testutil.FakeFetcher +import com.dropbox.android.external.store4.testutil.InMemoryPersister +import com.dropbox.android.external.store4.testutil.asSourceOfTruth +import com.dropbox.android.external.store4.testutil.assertThat +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +@FlowPreview +@RunWith(JUnit4::class) +class SourceOfTruthErrorsTest { + private val testScope = TestCoroutineScope() + + @Test + fun writeErrorCanBeCaught() = testScope.runBlockingTest { + val persister = ThrowingPersister() + val fetcher = FakeFetcher( + 3 to "three-1", + 3 to "three-2" + ) + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth()) + .scope(testScope) + .build() + val writeException = IllegalArgumentException("i fail") + persister.writeHandler = { key, value -> + throw writeException + } + assertThat( + pipeline.stream(StoreRequest.fresh(3)) + ).emitsExactly( + StoreResponse.Loading(ResponseOrigin.Fetcher), + StoreResponse.Error.Exception( + error = writeException, + origin = ResponseOrigin.SourceOfTruth + ) + ) + } + + class ThrowingPersister : InMemoryPersister() { + var writeHandler : ((Key, Output) -> Unit)?=null + override suspend fun write(key: Key, output: Output) { + writeHandler?.invoke(key, output) ?: super.write(key, output) + } + } +} \ No newline at end of file diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt index 1afe19bba..4488e2990 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt @@ -15,22 +15,26 @@ */ package com.dropbox.android.external.store4 -import com.dropbox.android.external.store4.impl.DataWithOrigin import com.dropbox.android.external.store4.impl.PersistentSourceOfTruth import com.dropbox.android.external.store4.impl.SourceOfTruthWithBarrier import com.dropbox.android.external.store4.testutil.InMemoryPersister +import com.dropbox.android.external.store4.testutil.assertThat import com.google.common.truth.Truth.assertThat import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.async +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.junit.Test + @FlowPreview @ExperimentalCoroutinesApi class SourceOfTruthWithBarrierTest { @@ -59,8 +63,12 @@ class SourceOfTruthWithBarrierTest { source.write(1, "a") assertThat(collector.await()).isEqualTo( listOf( - DataWithOrigin(ResponseOrigin.SourceOfTruth, null), - DataWithOrigin(ResponseOrigin.Fetcher, "a") + StoreResponse.Data( + origin = ResponseOrigin.SourceOfTruth, + value = null), + StoreResponse.Data( + origin = ResponseOrigin.Fetcher, + value = "a") ) ) assertThat(source.barrierCount()).isEqualTo(0) @@ -93,10 +101,77 @@ class SourceOfTruthWithBarrierTest { source.write(1, "b") assertThat(collector.await()).isEqualTo( listOf( - DataWithOrigin(ResponseOrigin.SourceOfTruth, "a"), - DataWithOrigin(ResponseOrigin.Fetcher, "b") + StoreResponse.Data( + origin = ResponseOrigin.SourceOfTruth, + value = "a"), + StoreResponse.Data( + origin = ResponseOrigin.Fetcher, + value = "b") ) ) assertThat(source.barrierCount()).isEqualTo(0) } + + @Test + fun `Given Source Of Truth WHEN read fails THEN error should propogate`() = + testScope.runBlockingTest { + val exception = RuntimeException("read fails") + persister.postReadCallback = { key, value -> + throw exception + } + assertThat( + source.reader(1, CompletableDeferred(Unit)) + ).emitsExactly( + StoreResponse.Error.Exception( + origin = ResponseOrigin.SourceOfTruth, + error = exception + ) + ) + } + + @Test + fun `Given Source Of Truth WHEN read fails but then succeeds THEN error should propogate but also the value`() = + testScope.runBlockingTest { + var hasThrown = false + val exception = RuntimeException("read fails") + persister.postReadCallback = { _, value -> + if (!hasThrown) { + hasThrown = true + throw exception + } + value + } + val reader = source.reader(1, CompletableDeferred(Unit)) + val collected = mutableSetOf>() + val collection = async { + reader.collect { + collected.add(it) + } + } + advanceUntilIdle() + assertThat(collected).containsExactly( + StoreResponse.Error.Exception( + origin = ResponseOrigin.SourceOfTruth, + error = exception + ) + ) + // make sure it is not cancelled for the read error + assertThat(collection.isActive).isTrue() + // now insert another, it should trigger another read and emitted to the reader + source.write(1, "a") + advanceUntilIdle() + assertThat(collected).containsExactly( + StoreResponse.Error.Exception( + origin = ResponseOrigin.SourceOfTruth, + error = exception + ), + StoreResponse.Data( + // this is fetcher since we are using the write API + origin = ResponseOrigin.Fetcher, + value = "a" + ) + ) + collection.cancelAndJoin() + } + } diff --git a/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt b/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt index 0c50a9244..79f6a9ded 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt @@ -5,15 +5,24 @@ import com.dropbox.android.external.store4.SourceOfTruth /** * An in-memory non-flowing persister for testing. */ -class InMemoryPersister { +open class InMemoryPersister { private val data = mutableMapOf() + var preWriteCallback : (suspend (key:Key, value : Output) -> Output)? = null + var postReadCallback : (suspend (key : Key, value : Output?) -> Output?)? = null @Suppress("RedundantSuspendModifier") // for function reference - suspend fun read(key: Key) = data[key] + suspend fun read(key: Key) :Output? { + val value = data[key] + postReadCallback?.let { + return it(key, value) + } + return value + } @Suppress("RedundantSuspendModifier") // for function reference - suspend fun write(key: Key, output: Output) { - data[key] = output + open suspend fun write(key: Key, output: Output) { + val value = preWriteCallback?.invoke(key, output) ?: output + data[key] = value } @Suppress("RedundantSuspendModifier") // for function reference From 8f39997d643551ed75e4f2d0f5827a2129f81570 Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Thu, 18 Jun 2020 20:34:38 -0700 Subject: [PATCH 2/6] dispatch write errors to receiver Now source of truth can unblock reader while also letting them know that an error happened while writing. This forces readers to first dispatch the error then whatever data they have. I've also added new public WriteException/ReadException classes to SourceOfTruth so that it is easy to diagnose these problems when it hits to the developer's code --- store/api/store.api | 15 + .../android/external/store4/SourceOfTruth.kt | 72 ++++ .../android/external/store4/impl/RealStore.kt | 17 +- .../store4/impl/SourceOfTruthWithBarrier.kt | 71 +++- .../store4/SourceOfTruthErrorsTest.kt | 386 ++++++++++++++++-- .../store4/SourceOfTruthWithBarrierTest.kt | 65 ++- 6 files changed, 583 insertions(+), 43 deletions(-) diff --git a/store/api/store.api b/store/api/store.api index ad27db175..8695fc44f 100644 --- a/store/api/store.api +++ b/store/api/store.api @@ -113,6 +113,21 @@ public final class com/dropbox/android/external/store4/SourceOfTruth$Companion { public static synthetic fun ofFlow$default (Lcom/dropbox/android/external/store4/SourceOfTruth$Companion;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/dropbox/android/external/store4/SourceOfTruth; } +public final class com/dropbox/android/external/store4/SourceOfTruth$ReadException : java/lang/RuntimeException { + public fun (Ljava/lang/Object;Ljava/lang/Throwable;)V + public fun equals (Ljava/lang/Object;)Z + public final fun getKey ()Ljava/lang/Object; + public fun hashCode ()I +} + +public final class com/dropbox/android/external/store4/SourceOfTruth$WriteException : java/lang/RuntimeException { + public fun (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Throwable;)V + public fun equals (Ljava/lang/Object;)Z + public final fun getKey ()Ljava/lang/Object; + public final fun getValue ()Ljava/lang/Object; + public fun hashCode ()I +} + public abstract interface class com/dropbox/android/external/store4/Store { public abstract fun clear (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun clearAll (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt b/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt index 2c1301ca2..c26ba69b9 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt @@ -126,4 +126,76 @@ interface SourceOfTruth { realDeleteAll = deleteAll ) } + + /** + * The exception provided when a Write operation fails in SourceOfTruth. + * + * see [StoreResponse.Error.Exception] + */ + class WriteException( + /** + * The key for the failed write attempt + */ + val key: Any?, // TODO why are we not marking keys non-null ? + /** + * The value for the failed write attempt + */ + val value:Any?, + /** + * The exception thrown from the [SourceOfTruth]'s [write] method. + */ + cause : Throwable + ) : RuntimeException( + "Failed to write value to Source of Truth. key: $key", + cause + ) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as WriteException + + if (key != other.key) return false + if (value != other.value) return false + if (cause != other.cause) return false + return true + } + + override fun hashCode(): Int { + var result = key.hashCode() + result = 31 * result + value.hashCode() + return result + } + } + + /** + * Exception created when a [reader] throws an exception. + * + * see [StoreResponse.Error.Exception] + */ + class ReadException( + /** + * The key for the failed write attempt + */ + val key: Any?, // TODO shouldn't key be non-null? + cause: Throwable + ) : RuntimeException( + "Failed to read from Source of Truth. key: $key", + cause + ) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as ReadException + + if (key != other.key) return false + if (cause != other.cause) return false + return true + } + + override fun hashCode(): Int { + return key.hashCode() + } + } } diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt index 45a1781c0..22dff866d 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt @@ -197,12 +197,23 @@ internal class RealStore( origin = diskData.origin ) ) + } else if (diskData is StoreResponse.Error) { + // disk sent an error, send it down as well + emit(diskData.swapType()) } // if this is the first disk value and it is null, we should enable fetcher - // TODO should we ignore the index and always enable? - if (index == 0 && (diskValue == null || request.refresh)) { - networkLock.complete(Unit) + // we should also allow fetcher if disk sent a read error but not if it is + // a write error since we should always wait for the read attempt + if (diskData is StoreResponse.Error.Exception) { + if (diskData.error is SourceOfTruth.ReadException) { + networkLock.complete(Unit) + } + // for other errors, don't do anything, wait for the read attempt + } else if (diskData is StoreResponse.Data){ + if (request.refresh || diskValue == null) { + networkLock.complete(Unit) + } } } } diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt index 5812650e9..08482758a 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt @@ -19,6 +19,7 @@ import com.dropbox.android.external.store4.ResponseOrigin import com.dropbox.android.external.store4.SourceOfTruth import com.dropbox.android.external.store4.StoreResponse import com.dropbox.android.external.store4.impl.operators.mapIndexed +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview @@ -30,6 +31,7 @@ import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onStart import java.util.concurrent.atomic.AtomicLong /** @@ -67,28 +69,58 @@ internal class SourceOfTruthWithBarrier( emitAll(barrier.asFlow() .flatMapLatest { val messageArrivedAfterMe = readerVersion < it.version - when (it) { + val writeError = if (messageArrivedAfterMe && it is BarrierMsg.Open) { + it.writeError + } else { + null + } + val readFlow = when (it) { is BarrierMsg.Open -> delegate.reader(key).mapIndexed { index, output -> if (index == 0 && messageArrivedAfterMe) { + val firstMsgOrigin = if (writeError == null) { + // restarted barrier without an error means write succeeded + ResponseOrigin.Fetcher + } else { + // when a write fails, we still get a new reader because + // we've disabled the previous reader before starting the + // write operation. But since write has failed, we should + // use the SourceOfTruth as the origin + ResponseOrigin.SourceOfTruth + } StoreResponse.Data( - origin = ResponseOrigin.Fetcher, + origin = firstMsgOrigin, value = output ) } else { StoreResponse.Data( origin = ResponseOrigin.SourceOfTruth, value = output - ) as StoreResponse - } + ) + } as StoreResponse // necessary cast for catch block }.catch { this.emit(StoreResponse.Error.Exception( - error = it, + error = SourceOfTruth.ReadException( + key = key, + cause = it + ), origin = ResponseOrigin.SourceOfTruth)) } is BarrierMsg.Blocked -> { flowOf() } } + readFlow + .onStart { + // if we have a pending error, make sure to dispatch it first. + if (writeError != null) { + emit( + StoreResponse.Error.Exception( + origin = ResponseOrigin.SourceOfTruth, + error = writeError + ) + ) + } + } }) } finally { // we are using a finally here instead of onCompletion as there might be a @@ -102,8 +134,31 @@ internal class SourceOfTruthWithBarrier( val barrier = barriers.acquire(key) try { barrier.send(BarrierMsg.Blocked(versionCounter.incrementAndGet())) - delegate.write(key, value) - barrier.send(BarrierMsg.Open(versionCounter.incrementAndGet())) + val writeError = try { + delegate.write(key, value) + null + } catch (throwable : Throwable) { + if (throwable !is CancellationException) { + throwable + } else { + null + } + } + + barrier.send(BarrierMsg.Open( + version = versionCounter.incrementAndGet(), + writeError = writeError?.let { + SourceOfTruth.WriteException( + key = key, + value = value, + cause = writeError + ) + })) + if (writeError is CancellationException) { + // only throw if it failed because of cancelation. + // otherwise, we take care of letting downstream know that there was a write error + throw writeError + } } finally { barriers.release(key, barrier) } @@ -121,7 +176,7 @@ internal class SourceOfTruthWithBarrier( val version: Long ) { class Blocked(version: Long) : BarrierMsg(version) - class Open(version: Long) : BarrierMsg(version) { + class Open(version: Long, val writeError: Throwable? = null) : BarrierMsg(version) { companion object { val INITIAL = Open(INITIAL_VERSION) } diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt index c3475e63e..b788ad8d5 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt @@ -15,11 +15,19 @@ */ package com.dropbox.android.external.store4 +import com.dropbox.android.external.store4.SourceOfTruth.ReadException +import com.dropbox.android.external.store4.SourceOfTruth.WriteException import com.dropbox.android.external.store4.testutil.FakeFetcher import com.dropbox.android.external.store4.testutil.InMemoryPersister import com.dropbox.android.external.store4.testutil.asSourceOfTruth import com.dropbox.android.external.store4.testutil.assertThat import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.junit.Test @@ -32,37 +40,359 @@ class SourceOfTruthErrorsTest { private val testScope = TestCoroutineScope() @Test - fun writeErrorCanBeCaught() = testScope.runBlockingTest { - val persister = ThrowingPersister() - val fetcher = FakeFetcher( - 3 to "three-1", - 3 to "three-2" - ) - val pipeline = StoreBuilder - .from( - fetcher = fetcher, - sourceOfTruth = persister.asSourceOfTruth()) - .scope(testScope) - .build() - val writeException = IllegalArgumentException("i fail") - persister.writeHandler = { key, value -> - throw writeException + fun `GIVEN Source of Truth WHEN write fails THEN exception should be send to the collector`() = + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = FakeFetcher( + 3 to "a", + 3 to "b" + ) + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .scope(testScope) + .build() + persister.preWriteCallback = { _, _ -> + throw TestException("i fail") + } + + assertThat( + pipeline.stream(StoreRequest.fresh(3)) + ).emitsExactly( + StoreResponse.Loading(ResponseOrigin.Fetcher), + StoreResponse.Error.Exception( + error = WriteException( + key = 3, + value = "a", + cause = TestException("i fail") + ), + origin = ResponseOrigin.SourceOfTruth + ) + ) } - assertThat( - pipeline.stream(StoreRequest.fresh(3)) - ).emitsExactly( - StoreResponse.Loading(ResponseOrigin.Fetcher), - StoreResponse.Error.Exception( - error = writeException, - origin = ResponseOrigin.SourceOfTruth + + @Test + fun `GIVEN Source of Truth WHEN read fails THEN exception should be send to the collector`() = + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = FakeFetcher( + 3 to "a", + 3 to "b" + ) + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .scope(testScope) + .build() + + persister.postReadCallback = { _, value -> + throw TestException(value ?: "null") + } + + assertThat( + pipeline.stream(StoreRequest.cached(3, refresh = false)) + ).emitsExactly( + StoreResponse.Error.Exception( + error = ReadException( + key = 3, + cause = TestException("a") + ), + origin = ResponseOrigin.SourceOfTruth + ), + // after disk fails, we should still invoke fetcher + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ), + // and after fetcher writes the value, it will trigger another read which will also + // fail + StoreResponse.Error.Exception( + error = ReadException( + key = 3, + cause = TestException("b") + ), + origin = ResponseOrigin.SourceOfTruth + ) + ) + } + + @Test + fun `GIVEN Source of Truth WHEN first write fails THEN it should keep reading from Fetcher`() = + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = Fetcher.ofFlow { _: Int -> + flowOf("a", "b", "c", "d") + } + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .disableCache() + .scope(testScope) + .build() + persister.preWriteCallback = { _, value -> + if (value in listOf("a", "c")) { + throw TestException(value) + } + value + } + assertThat( + pipeline.stream(StoreRequest.cached(3, refresh = true)) + ).emitsExactly( + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ), + StoreResponse.Error.Exception( + error = WriteException( + key = 3, + value = "a", + cause = TestException("a") + ), + origin = ResponseOrigin.SourceOfTruth + ), + StoreResponse.Data( + value = "b", + origin = ResponseOrigin.Fetcher + ), + StoreResponse.Error.Exception( + error = WriteException( + key = 3, + value = "c", + cause = TestException("c") + ), + origin = ResponseOrigin.SourceOfTruth + ), + // disk flow will restart after a failed write (because we stopped it before the + // write attempt starts, so we will get the disk value again). + StoreResponse.Data( + value = "b", + origin = ResponseOrigin.SourceOfTruth + ), + StoreResponse.Data( + value = "d", + origin = ResponseOrigin.Fetcher + ) + ) + } + + @Test + fun `GIVEN Source of Truth with failing write WHEN a passive reader arrives THEN it should receive the new write error`() = + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = Fetcher.ofFlow { _: Int-> + flowOf("a", "b", "c", "d") + } + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .disableCache() + .scope(testScope) + .build() + persister.preWriteCallback = { _, value -> + if (value in listOf("a", "c")) { + delay(50) + throw TestException(value) + } else { + delay(10) + } + value + } + // keep collection hot + val collector = launch { + pipeline.stream( + StoreRequest.cached(3, refresh = true) + ).toList() + } + + // miss writes for a and b and let the write operation for c start such that + // we'll catch that write error + delay(70) + assertThat( + pipeline.stream(StoreRequest.cached(3, refresh = true)) + ).emitsExactly( + // we wanted the disk value but write failed so we don't get it + StoreResponse.Error.Exception( + error = WriteException( + key = 3, + value = "c", + cause = TestException("c") + ), + origin = ResponseOrigin.SourceOfTruth + ), + // after the write error, we should get the value on disk + StoreResponse.Data( + value = "b", + origin = ResponseOrigin.SourceOfTruth + ), + // now we'll unlock the fetcher after disk is read + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ), + StoreResponse.Data( + value = "d", + origin = ResponseOrigin.Fetcher + ) ) - ) + collector.cancelAndJoin() + } + + @Test + fun `Given Source of Truth with failing write WHEN a passive reader arrives THEN it should not get errors happened before`() = + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = Fetcher.ofFlow { + flow { + emit("a") + emit("b") + emit("c") + // now delay, wait for the new subscriber + delay(100) + emit("d") + } + } + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .disableCache() + .scope(testScope) + .build() + persister.preWriteCallback = { _, value -> + if (value in listOf("a", "c")) { + throw TestException(value) + } + value + } + val collector = launch { + pipeline.stream( + StoreRequest.cached(3, refresh = true) + ).toList() // keep collection hot + } + + // miss both failures but arrive before d is fetched + delay(70) + assertThat( + pipeline.stream(StoreRequest.skipMemory(3, refresh = true)) + ).emitsExactly( + StoreResponse.Data( + value = "b", + origin = ResponseOrigin.SourceOfTruth + ), + // don't receive the write exception because technically it started before we + // started reading + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ), + StoreResponse.Data( + value = "d", + origin = ResponseOrigin.Fetcher + ) + ) + collector.cancelAndJoin() + } + + @Test + fun `Given Source of Truth with failing write WHEN a fresh value reader arrives THEN it should not get disk errors from a pending write`() = + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = Fetcher.ofFlow { + flowOf("a", "b", "c", "d") + } + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .disableCache() + .scope(testScope) + .build() + persister.preWriteCallback = { _, value -> + if (value == "c") { + // slow down read so that the new reader arrives + delay(50) + } + if (value in listOf("a", "c")) { + throw TestException(value) + } + value + } + val collector = launch { + pipeline.stream( + StoreRequest.cached(3, refresh = true) + ).toList() // keep collection hot + } + // miss both failures but arrive before d is fetched + delay(20) + assertThat( + pipeline.stream(StoreRequest.fresh(3)) + ).emitsExactly( + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ), + StoreResponse.Data( + value = "d", + origin = ResponseOrigin.Fetcher + ) + ) + collector.cancelAndJoin() + } + + @Test + fun `Given Source of Truth with read failure WHEN cached value reader arrives THEN fetcher should be called to get a new value`() { + testScope.runBlockingTest { + val persister = InMemoryPersister() + val fetcher = Fetcher.of { _:Int -> "a" } + val pipeline = StoreBuilder + .from( + fetcher = fetcher, + sourceOfTruth = persister.asSourceOfTruth() + ) + .disableCache() + .scope(testScope) + .build() + persister.postReadCallback = { _, value -> + if (value == null) { + throw TestException("first read") + } + value + } + assertThat( + pipeline.stream(StoreRequest.cached(3, refresh = true)) + ).emitsExactly( + StoreResponse.Error.Exception( + origin = ResponseOrigin.SourceOfTruth, + error = ReadException( + key = 3, + cause = TestException("first read") + ) + ), + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ), + StoreResponse.Data( + value = "a", + origin = ResponseOrigin.Fetcher + ) + ) + } } - class ThrowingPersister : InMemoryPersister() { - var writeHandler : ((Key, Output) -> Unit)?=null - override suspend fun write(key: Key, output: Output) { - writeHandler?.invoke(key, output) ?: super.write(key, output) + private class TestException(val msg: String) : Exception(msg) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is TestException) return false + return msg == other.msg + } + + override fun hashCode(): Int { + return msg.hashCode() } } -} \ No newline at end of file +} diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt index 4488e2990..e316c7a22 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt @@ -15,6 +15,8 @@ */ package com.dropbox.android.external.store4 +import com.dropbox.android.external.store4.SourceOfTruth.ReadException +import com.dropbox.android.external.store4.SourceOfTruth.WriteException import com.dropbox.android.external.store4.impl.PersistentSourceOfTruth import com.dropbox.android.external.store4.impl.SourceOfTruthWithBarrier import com.dropbox.android.external.store4.testutil.InMemoryPersister @@ -28,10 +30,12 @@ import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield import org.junit.Test @@ -124,7 +128,10 @@ class SourceOfTruthWithBarrierTest { ).emitsExactly( StoreResponse.Error.Exception( origin = ResponseOrigin.SourceOfTruth, - error = exception + error = ReadException( + key = 1, + cause = exception + ) ) ) } @@ -142,7 +149,7 @@ class SourceOfTruthWithBarrierTest { value } val reader = source.reader(1, CompletableDeferred(Unit)) - val collected = mutableSetOf>() + val collected = mutableListOf>() val collection = async { reader.collect { collected.add(it) @@ -152,7 +159,10 @@ class SourceOfTruthWithBarrierTest { assertThat(collected).containsExactly( StoreResponse.Error.Exception( origin = ResponseOrigin.SourceOfTruth, - error = exception + error = ReadException( + key = 1, + cause = exception + ) ) ) // make sure it is not cancelled for the read error @@ -163,7 +173,10 @@ class SourceOfTruthWithBarrierTest { assertThat(collected).containsExactly( StoreResponse.Error.Exception( origin = ResponseOrigin.SourceOfTruth, - error = exception + error = ReadException( + key = 1, + cause = exception + ) ), StoreResponse.Data( // this is fetcher since we are using the write API @@ -174,4 +187,48 @@ class SourceOfTruthWithBarrierTest { collection.cancelAndJoin() } + @Test + fun `Given Source Of Truth WHEN write fails THEN error should propogate`() { + testScope.runBlockingTest { + val exception = RuntimeException("write fails") + persister.preWriteCallback = { key, value -> + throw exception + } + val reader = source.reader(1, CompletableDeferred(Unit)) + val collected = mutableListOf>() + val collection = async { + reader.collect { + collected.add(it) + } + } + advanceUntilIdle() + source.write(1, "will fail") + advanceUntilIdle() + // make sure collection does not cancel for a write error + assertThat(collection.isActive).isTrue() + assertThat( + collected + ).containsExactly( + StoreResponse.Data( + origin = ResponseOrigin.SourceOfTruth, + value = null + ), + StoreResponse.Error.Exception( + origin = ResponseOrigin.SourceOfTruth, + error = WriteException( + key = 1, + value = "will fail", + cause = exception + ) + ), + StoreResponse.Data( + origin = ResponseOrigin.SourceOfTruth, + value = null + ) + ) + advanceUntilIdle() + assertThat(collection.isActive).isTrue() + collection.cancelAndJoin() + } + } } From e4c2b6a078db6b2e11405ba811d10a11748a5894 Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Thu, 18 Jun 2020 20:35:41 -0700 Subject: [PATCH 3/6] fix styles --- .../java/com/dropbox/android/external/store4/SourceOfTruth.kt | 4 ++-- .../com/dropbox/android/external/store4/impl/RealStore.kt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt b/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt index c26ba69b9..99bfd3c45 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt @@ -140,11 +140,11 @@ interface SourceOfTruth { /** * The value for the failed write attempt */ - val value:Any?, + val value: Any?, /** * The exception thrown from the [SourceOfTruth]'s [write] method. */ - cause : Throwable + cause: Throwable ) : RuntimeException( "Failed to write value to Source of Truth. key: $key", cause diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt index 22dff866d..50e5def05 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt @@ -210,7 +210,7 @@ internal class RealStore( networkLock.complete(Unit) } // for other errors, don't do anything, wait for the read attempt - } else if (diskData is StoreResponse.Data){ + } else if (diskData is StoreResponse.Data) { if (request.refresh || diskValue == null) { networkLock.complete(Unit) } From 519c981c2673723a1dc064420df43b210d021bf2 Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Thu, 18 Jun 2020 20:38:37 -0700 Subject: [PATCH 4/6] fix more styles :/ --- .../android/external/store4/impl/RealStore.kt | 2 +- .../store4/impl/SourceOfTruthWithBarrier.kt | 22 ++++++++++--------- .../store4/SourceOfTruthErrorsTest.kt | 4 ++-- .../store4/SourceOfTruthWithBarrierTest.kt | 4 ---- .../store4/testutil/InMemoryPersister.kt | 6 ++--- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt index 50e5def05..d6978cb83 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt @@ -188,7 +188,7 @@ internal class RealStore( } } else if (it is Either.Right) { // right, that is data from disk - val (index, diskData) = it.value + val (_, diskData) = it.value val diskValue = diskData.dataOrNull() if (diskValue != null) { emit( diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt index 08482758a..032b157f1 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/SourceOfTruthWithBarrier.kt @@ -137,7 +137,7 @@ internal class SourceOfTruthWithBarrier( val writeError = try { delegate.write(key, value) null - } catch (throwable : Throwable) { + } catch (throwable: Throwable) { if (throwable !is CancellationException) { throwable } else { @@ -145,15 +145,17 @@ internal class SourceOfTruthWithBarrier( } } - barrier.send(BarrierMsg.Open( - version = versionCounter.incrementAndGet(), - writeError = writeError?.let { - SourceOfTruth.WriteException( - key = key, - value = value, - cause = writeError - ) - })) + barrier.send( + BarrierMsg.Open( + version = versionCounter.incrementAndGet(), + writeError = writeError?.let { + SourceOfTruth.WriteException( + key = key, + value = value, + cause = writeError + ) + }) + ) if (writeError is CancellationException) { // only throw if it failed because of cancelation. // otherwise, we take care of letting downstream know that there was a write error diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt index b788ad8d5..20331e4df 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt @@ -183,7 +183,7 @@ class SourceOfTruthErrorsTest { fun `GIVEN Source of Truth with failing write WHEN a passive reader arrives THEN it should receive the new write error`() = testScope.runBlockingTest { val persister = InMemoryPersister() - val fetcher = Fetcher.ofFlow { _: Int-> + val fetcher = Fetcher.ofFlow { _: Int -> flowOf("a", "b", "c", "d") } val pipeline = StoreBuilder @@ -348,7 +348,7 @@ class SourceOfTruthErrorsTest { fun `Given Source of Truth with read failure WHEN cached value reader arrives THEN fetcher should be called to get a new value`() { testScope.runBlockingTest { val persister = InMemoryPersister() - val fetcher = Fetcher.of { _:Int -> "a" } + val fetcher = Fetcher.of { _: Int -> "a" } val pipeline = StoreBuilder .from( fetcher = fetcher, diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt index e316c7a22..d9d2e3428 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthWithBarrierTest.kt @@ -29,16 +29,12 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.Test - @FlowPreview @ExperimentalCoroutinesApi class SourceOfTruthWithBarrierTest { diff --git a/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt b/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt index 79f6a9ded..2553624eb 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/testutil/InMemoryPersister.kt @@ -7,11 +7,11 @@ import com.dropbox.android.external.store4.SourceOfTruth */ open class InMemoryPersister { private val data = mutableMapOf() - var preWriteCallback : (suspend (key:Key, value : Output) -> Output)? = null - var postReadCallback : (suspend (key : Key, value : Output?) -> Output?)? = null + var preWriteCallback: (suspend (key: Key, value: Output) -> Output)? = null + var postReadCallback: (suspend (key: Key, value: Output?) -> Output?)? = null @Suppress("RedundantSuspendModifier") // for function reference - suspend fun read(key: Key) :Output? { + suspend fun read(key: Key): Output? { val value = data[key] postReadCallback?.let { return it(key, value) From 08f7768974c3e85e873ccf1a13f86575039ecc39 Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Thu, 18 Jun 2020 20:39:59 -0700 Subject: [PATCH 5/6] fix the test assertion --- .../android/external/store4/SourceOfTruthErrorsTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt index 20331e4df..4bfd43dab 100644 --- a/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt +++ b/store/src/test/java/com/dropbox/android/external/store4/SourceOfTruthErrorsTest.kt @@ -99,7 +99,7 @@ class SourceOfTruthErrorsTest { StoreResponse.Error.Exception( error = ReadException( key = 3, - cause = TestException("a") + cause = TestException("null") ), origin = ResponseOrigin.SourceOfTruth ), @@ -112,7 +112,7 @@ class SourceOfTruthErrorsTest { StoreResponse.Error.Exception( error = ReadException( key = 3, - cause = TestException("b") + cause = TestException("a") ), origin = ResponseOrigin.SourceOfTruth ) From e6abbfb9e6912600a66335de032156dfc77cfd4d Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Fri, 26 Jun 2020 13:02:41 -0700 Subject: [PATCH 6/6] address comments in review --- .../com/dropbox/android/external/store4/SourceOfTruth.kt | 2 +- .../dropbox/android/external/store4/impl/RealStore.kt | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt b/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt index 99bfd3c45..4ae89a076 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/SourceOfTruth.kt @@ -128,7 +128,7 @@ interface SourceOfTruth { } /** - * The exception provided when a Write operation fails in SourceOfTruth. + * The exception provided when a write operation fails in SourceOfTruth. * * see [StoreResponse.Error.Exception] */ diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt index d6978cb83..d00947763 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/RealStore.kt @@ -202,9 +202,12 @@ internal class RealStore( emit(diskData.swapType()) } - // if this is the first disk value and it is null, we should enable fetcher - // we should also allow fetcher if disk sent a read error but not if it is - // a write error since we should always wait for the read attempt + // If this is the first disk value and it is null, we should allow fetcher + // to start emitting values. + // If disk sent a read error, we should allow fetcher to start emitting values + // since there is nothing to read from disk. + // If disk sent a write error, we should NOT allow fetcher to start emitting + // values as we should always wait for the read attempt. if (diskData is StoreResponse.Error.Exception) { if (diskData.error is SourceOfTruth.ReadException) { networkLock.complete(Unit)