From c4debd56704e3e3183a0d50ea67bf9032d0506df Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Sun, 11 Sep 2022 09:59:14 -0700 Subject: [PATCH] more robust error handling --- .../concurrent/api/CacheSingle.java | 5 +- .../concurrent/api/PublisherFlatMapMerge.java | 119 +++++++++++------- .../api/PublisherFlatMapSingle.java | 35 ++++-- .../api/PublisherFlatMapMergeTest.java | 84 ++++++++++++- .../api/PublisherFlatMapSingleTest.java | 28 ++++- .../concurrent/internal/SubscriberUtils.java | 41 +++++- 6 files changed, 239 insertions(+), 73 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CacheSingle.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CacheSingle.java index a626d93e63..e92ecf8642 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CacheSingle.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CacheSingle.java @@ -18,6 +18,7 @@ import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.internal.ArrayUtils; import io.servicetalk.concurrent.internal.DelayedCancellable; +import io.servicetalk.concurrent.internal.SubscriberUtils; import io.servicetalk.concurrent.internal.TerminalNotification; import io.servicetalk.context.api.ContextMap; @@ -293,7 +294,7 @@ public void onSuccess(@Nullable final T result) { break; } } else { - logDuplicateTerminal(this); + SubscriberUtils.logDuplicateTerminalOnSuccess(this, result); break; } } @@ -318,7 +319,7 @@ public void onError(final Throwable t) { break; } } else { - logDuplicateTerminal(this); + logDuplicateTerminal(this, t); break; } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java index 83a0844dd3..4b6ff9dd91 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java @@ -43,9 +43,7 @@ import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; -import static io.servicetalk.concurrent.internal.ThrowableUtils.catchUnexpected; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; -import static io.servicetalk.utils.internal.ThrowableUtils.throwException; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Objects.requireNonNull; @@ -300,12 +298,19 @@ private void tryEmitItem(Object item, final boolean needsDemand, FlatMapPublishe subscriber.markSignalsQueued(); enqueueAndDrain(item); } else if (item == MAPPED_SOURCE_COMPLETE) { - requestMoreFromUpstream(1); + try { + requestMoreFromUpstream(1); + } catch (Throwable cause) { + onErrorNotHoldingLock(cause); + } } else if (tryAcquireLock(emittingLockUpdater, this)) { // fast path. no concurrency, try to skip the queue. try { final boolean demandConsumed = sendToTarget(item); assert demandConsumed == needsDemand || targetTerminated; } finally { + // No need to catch exception and onErrorHoldingLock. If the signal is not a terminal it's safe to + // propagate, if the signal is a terminal then we have already delivered a terminal down stream and + // can't do anything else (just let the upstream handle it). if (!releaseLock(emittingLockUpdater, this)) { drainPending(); } @@ -320,6 +325,34 @@ private void tryEmitItem(Object item, final boolean needsDemand, FlatMapPublishe } } + /** + * Process an error while holding the {@link #emittingLock}. We cannot re-throw here because sources have + * already terminated. This means the source cannot deliver another terminal or else it will violate the + * reactive stream spec. The best we can do is cancel upstream and mapped subscribers, and propagate the error + * downstream. + * @param cause The cause that occurred while delivering signals down stream. + */ + private void onErrorHoldingLock(Throwable cause) { + try { + doCancel(true); + } finally { + sendToTarget(error(cause)); + } + } + + private void onErrorNotHoldingLock(Throwable cause) { + if (tryAcquireLock(emittingLockUpdater, this)) { // fast path. no concurrency, try to skip the queue. + onErrorHoldingLock(cause); + } else { + try { + doCancel(true); + } finally { + // Emit the error to preserve ordering relative to onNext signals for this source. + enqueueAndDrain(error(cause)); + } + } + } + private void enqueueItem(Object item) { if (!signals.offer(item)) { enqueueFailed(item); @@ -342,11 +375,10 @@ private static void enqueueFailed(Object item) { } private void drainPending() { - Throwable delayedCause = null; boolean tryAcquire = true; - int mappedSourcesCompleted = 0; while (tryAcquire && tryAcquireLock(emittingLockUpdater, this)) { try { + int mappedSourcesCompleted = 0; final long prevDemand = pendingDemandUpdater.getAndSet(this, 0); if (prevDemand < 0) { pendingDemand = prevDemand; @@ -354,42 +386,33 @@ private void drainPending() { long emittedCount = 0; Object t; while (emittedCount < prevDemand && (t = signals.poll()) != null) { - try { - if (t == MAPPED_SOURCE_COMPLETE) { - ++mappedSourcesCompleted; - } else if (sendToTarget(t)) { - ++emittedCount; - } - } catch (Throwable cause) { + if (t == MAPPED_SOURCE_COMPLETE) { + ++mappedSourcesCompleted; + } else if (sendToTarget(t)) { ++emittedCount; - delayedCause = catchUnexpected(delayedCause, cause); } } // check if a terminal event is pending, or give back demand. if (emittedCount == prevDemand) { for (;;) { - try { - t = signals.peek(); - if (t == MAPPED_SOURCE_COMPLETE) { - signals.poll(); - ++mappedSourcesCompleted; - } else if (t instanceof FlatMapPublisherSubscriber) { - signals.poll(); - @SuppressWarnings("unchecked") - final FlatMapPublisherSubscriber hungrySubscriber = - (FlatMapPublisherSubscriber) t; - distributeMappedDemand(hungrySubscriber); - } else { - break; - } - } catch (Throwable cause) { - delayedCause = catchUnexpected(delayedCause, cause); + t = signals.peek(); + if (t == MAPPED_SOURCE_COMPLETE) { + signals.poll(); + ++mappedSourcesCompleted; + } else if (t instanceof FlatMapPublisherSubscriber) { + signals.poll(); + @SuppressWarnings("unchecked") + final FlatMapPublisherSubscriber hungrySubscriber = + (FlatMapPublisherSubscriber) t; + distributeMappedDemand(hungrySubscriber); + } else { + break; } } if (t instanceof TerminalNotification) { - sendToTarget(t); // if this throws its OK as we have terminated + sendToTarget(t); } else { sendToTargetIfPrematureError(); } @@ -399,17 +422,21 @@ private void drainPending() { FlowControlUtils::addWithOverflowProtectionIfNotNegative); } } - } finally { - tryAcquire = !releaseLock(emittingLockUpdater, this); - } - } - if (mappedSourcesCompleted != 0) { - requestMoreFromUpstream(mappedSourcesCompleted); - } - - if (delayedCause != null) { - throwException(delayedCause); + if (mappedSourcesCompleted != 0) { + requestMoreFromUpstream(mappedSourcesCompleted); + } + } catch (Throwable cause) { + // We can't propagate an exception if the subscriber triggering this invocation has terminated, but + // we don't know in which context this method is being invoked and if the subscriber invoking this + // method may have a terminal event queued. Also since throwing is in violation of the spec we keep + // the code simple and handle the exception here. This method may also be invoked from a request(..) + // in which we can't let the exception propagate for graceful termination. + onErrorHoldingLock(cause); + return; // Poison emittingUpdater. We prematurely terminated, other signals should be ignored. + } + // Release lock after we handle errors, because error handling needs to poison the lock. + tryAcquire = !releaseLock(emittingLockUpdater, this); } } @@ -547,7 +574,7 @@ public void onSubscribe(final Subscription s) { // To accommodate for the "approximate" mapped demand we maintain a signal queue (bounded by the // concurrency). This presents an opportunity to decouple downstream requestN requests from iterating // all active mapped sources and instead optimistically give out demand here and replenish demand after - // signals are delivered to the downstream subscriber (based upon available demand is available). + // signals are delivered to the downstream subscriber (based upon available demand). parent.distributeMappedDemand(this); } @@ -572,7 +599,8 @@ public void onNext(@Nullable final R r) { private void handleInvalidDemand(int pendingDemand, @Nullable final R r) { // Reset pendingDemand because we want to allow for a terminal event to be propagated. This is safe // because request(..) won't be called until demand is exhausted. If the atomic operation fails - // something is wrong but don't override as it may have been terminated in another thread. + // either there is concurrency on the Subscriber (violating reactive streams spec) or request(..) has + // been called on another thread (demand was still violated, we drain demand to 0 before requesting). pendingDemandUpdater.compareAndSet(this, pendingDemand, (pendingDemand > TERMINATED) ? 0 : TERMINATED); throw new IllegalStateException("Too many onNext signals for Subscriber: " + this + " pendingDemand: " + pendingDemand + " discarding: " + r); @@ -580,6 +608,11 @@ private void handleInvalidDemand(int pendingDemand, @Nullable final R r) { @Override public void onError(final Throwable t) { + final int unusedDemand = pendingDemandUpdater.getAndSet(this, TERMINATED); + if (unusedDemand < 0) { + SubscriberUtils.logDuplicateTerminal(this, t); + return; + } Throwable currPendingError = parent.pendingError; if (parent.source.maxDelayedErrors == 0) { if (currPendingError == null && pendingErrorUpdater.compareAndSet(parent, null, t)) { @@ -604,7 +637,7 @@ public void onError(final Throwable t) { addPendingError(pendingErrorCountUpdater, parent, parent.source.maxDelayedErrors, currPendingError, t); } - if (parent.removeSubscriber(this, pendingDemandUpdater.getAndSet(this, TERMINATED))) { + if (parent.removeSubscriber(this, unusedDemand)) { parent.enqueueAndDrain(error(currPendingError)); } else { parent.tryEmitItem(MAPPED_SOURCE_COMPLETE, false, this); diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java index 88d5fcdae9..d23c41f26b 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java @@ -237,6 +237,13 @@ private void onError0(Throwable throwable, boolean cancelUpstream) { } } + /** + * Process an error while holding the {@link #emitting} lock. We cannot re-throw here because sources have + * already terminated. This means the source cannot deliver another terminal or else it will violate the + * reactive stream spec. The best we can do is cancel upstream and mapped subscribers, and propagate the error + * downstream. + * @param cause The cause that occurred while delivering signals down stream. + */ private void onErrorHoldingLock(Throwable cause) { try { doCancel(true); @@ -372,17 +379,31 @@ public void onSubscribe(Cancellable singleCancellable) { @Override public void onSuccess(@Nullable R result) { + if (singleCancellable == null) { + SubscriberUtils.logDuplicateTerminalOnSuccess(this, result); + return; + } + cancellableSet.remove(singleCancellable); + singleCancellable = null; + // First enqueue the result and then decrement active count. Since onComplete() checks for active count, // if we decrement count before enqueuing, onComplete() may emit the terminal event without emitting // the result. tryEmitItem(wrapNull(result)); - if (onSingleTerminated()) { + if (decrementActiveMappedSources()) { enqueueAndDrain(complete()); } } @Override public void onError(Throwable t) { + if (singleCancellable == null) { + SubscriberUtils.logDuplicateTerminal(this, t); + return; + } + cancellableSet.remove(singleCancellable); + singleCancellable = null; + Throwable currPendingError = pendingError; if (source.maxDelayedErrors == 0) { if (currPendingError == null && @@ -403,7 +424,7 @@ public void onError(Throwable t) { addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, source.maxDelayedErrors, currPendingError, t); } - if (onSingleTerminated()) { + if (decrementActiveMappedSources()) { enqueueAndDrain(error(currPendingError)); } else { // Queueing/draining may result in requestN more data. @@ -411,16 +432,6 @@ public void onError(Throwable t) { } } } - - private boolean onSingleTerminated() { - if (singleCancellable == null) { - SubscriberUtils.logDuplicateTerminal(this); - return false; - } - cancellableSet.remove(singleCancellable); - singleCancellable = null; - return decrementActiveMappedSources(); - } } } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapMergeTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapMergeTest.java index 899f8c1e31..566309ef6c 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapMergeTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapMergeTest.java @@ -41,8 +41,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.stream.IntStream; import javax.annotation.Nullable; @@ -121,6 +123,61 @@ void singleToPublisherOnNextErrorPropagated(boolean delayError) { assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void subscriptionThrowsFromTerminalHandled(boolean delayError) { + AtomicLong requestCount = new AtomicLong(); + LongConsumer requestThrower = n -> { + if (requestCount.accumulateAndGet(n, Long::sum) > 1) { + throw DELIBERATE_EXCEPTION; + } + }; + Function> func = Publisher::from; + toSource(delayError ? + publisher.whenRequest(requestThrower).flatMapMergeDelayError(func, 1) : + publisher.whenRequest(requestThrower).flatMapMerge(func, 1)) + .subscribe(subscriber); + subscriber.awaitSubscription().request(1); + publisher.onNext(1); + publisher.onComplete(); + assertThat(subscriber.takeOnNext(), is(1)); + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void exceptionHandledWhileConcurrentProcessing(boolean delayError) { + assert executor != null; + TestPublisher publisher2 = new TestPublisher<>(); + Function> func = i -> { + if (i == 1) { + return publisher2; + } else if (i == 2) { + return executor.submit(() -> i).toPublisher(); + } else { + return never(); + } + }; + toSource((delayError ? + publisher.flatMapMergeDelayError(func, 2) : + publisher.flatMapMerge(func, 2)) + .map(i -> { + if (i == 2) { + publisher2.onNext(12); + return i; + } else if (i == 12) { + throw DELIBERATE_EXCEPTION; + } + throw new IllegalStateException("unexpected i: " + i); + })) + .subscribe(subscriber); + subscriber.awaitSubscription().request(2); + publisher.onNext(1, 2); + publisher.onComplete(); + assertThat(subscriber.takeOnNext(), is(2)); + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + @Test void mappedRecoverMakesProgress() throws Exception { @SuppressWarnings("unchecked") @@ -266,21 +323,36 @@ void singleItemMappedErrorPostSourceComplete() { assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @Test - void testDuplicateTerminal() { + @ParameterizedTest(name = "{displayName} [{index}] errorFirst={0} errorSecond={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void testDuplicateTerminal(boolean errorFirst, boolean errorSecond) { PublisherSource mappedPublisher = subscriber -> { subscriber.onSubscribe(EMPTY_SUBSCRIPTION); - subscriber.onComplete(); + if (errorFirst) { + subscriber.onError(DELIBERATE_EXCEPTION); + } else { + subscriber.onComplete(); + } + // intentionally violate the RS spec to verify the operator's behavior. // [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7 - subscriber.onComplete(); + if (errorSecond) { + subscriber.onError(new IllegalStateException("duplicate terminal should be discarded!")); + } else { + subscriber.onComplete(); + } }; @SuppressWarnings("unchecked") Subscriber mockSubscriber = mock(Subscriber.class); toSource(publisher.flatMapMerge(i -> fromSource(mappedPublisher), 1)).subscribe(mockSubscriber); publisher.onNext(1); - publisher.onComplete(); - verify(mockSubscriber).onComplete(); + + if (errorFirst) { + verify(mockSubscriber).onError(DELIBERATE_EXCEPTION); + } else { + publisher.onComplete(); + verify(mockSubscriber).onComplete(); + } } @ParameterizedTest(name = "{displayName} [{index}] delayError={0} queuedSignals={1}") diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapSingleTest.java index 6015c0294e..2e7a9ae988 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapSingleTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapSingleTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayDeque; @@ -220,21 +221,36 @@ void testSingleErrorPostSourceComplete() { assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @Test - void testDuplicateTerminal() { + @ParameterizedTest(name = "{displayName} [{index}] errorFirst={0} errorSecond={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void testDuplicateTerminal(boolean errorFirst, boolean errorSecond) { SingleSource single = subscriber -> { subscriber.onSubscribe(IGNORE_CANCEL); - subscriber.onSuccess(2); + if (errorFirst) { + subscriber.onError(DELIBERATE_EXCEPTION); + } else { + subscriber.onSuccess(2); + } + // intentionally violate the RS spec to verify the operator's behavior. // [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7 - subscriber.onSuccess(3); + if (errorSecond) { + subscriber.onError(new IllegalStateException("duplicate terminal should be discarded!")); + } else { + subscriber.onSuccess(3); + } }; @SuppressWarnings("unchecked") Subscriber mockSubscriber = mock(Subscriber.class); toSource(source.flatMapMergeSingle(integer1 -> fromSource(single), 2)).subscribe(mockSubscriber); source.onNext(1); - source.onComplete(); - verify(mockSubscriber).onComplete(); + + if (errorFirst) { + verify(mockSubscriber).onError(DELIBERATE_EXCEPTION); + } else { + source.onComplete(); + verify(mockSubscriber).onComplete(); + } } @Test diff --git a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/SubscriberUtils.java b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/SubscriberUtils.java index a720dcacb3..3546a2d39e 100644 --- a/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/SubscriberUtils.java +++ b/servicetalk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/SubscriberUtils.java @@ -338,7 +338,18 @@ public static void safeCancel(Cancellable cancellable) { * @param The type of {@link PublisherSource.Subscriber}. */ public static void logDuplicateTerminal(PublisherSource.Subscriber subscriber) { - logDuplicateTerminal0(subscriber); + logDuplicateTerminal0(subscriber, null); + } + + /** + * Log if the ReactiveStreams specification has been violated related to out of order + * {@link PublisherSource.Subscriber#onSubscribe(PublisherSource.Subscription)} or duplicate terminal signals. + * @param subscriber The {@link PublisherSource.Subscriber}. + * @param cause The cause from {@link PublisherSource.Subscriber#onError(Throwable)}. + * @param The type of {@link PublisherSource.Subscriber}. + */ + public static void logDuplicateTerminal(PublisherSource.Subscriber subscriber, Throwable cause) { + logDuplicateTerminal0(subscriber, cause); } /** @@ -348,15 +359,37 @@ public static void logDuplicateTerminal(PublisherSource.Subscriber subscr * @param The type of {@link SingleSource.Subscriber}. */ public static void logDuplicateTerminal(SingleSource.Subscriber subscriber) { - logDuplicateTerminal0(subscriber); + logDuplicateTerminal0(subscriber, null); + } + + /** + * Log if the ReactiveStreams specification has been violated related to out of order + * {@link SingleSource.Subscriber#onSubscribe(Cancellable)} or duplicate terminal signals. + * @param subscriber The {@link SingleSource.Subscriber}. + * @param onSuccess The signal delivered to {@link SingleSource.Subscriber#onSuccess(Object)}. + * @param The type of {@link SingleSource.Subscriber}. + */ + public static void logDuplicateTerminalOnSuccess(SingleSource.Subscriber subscriber, @Nullable T onSuccess) { + logDuplicateTerminal0(subscriber, new IllegalStateException("ignoring onSuccess: " + onSuccess)); + } + + /** + * Log if the ReactiveStreams specification has been violated related to out of order + * {@link SingleSource.Subscriber#onSubscribe(Cancellable)} or duplicate terminal signals. + * @param subscriber The {@link SingleSource.Subscriber}. + * @param cause The cause from {@link SingleSource.Subscriber#onError(Throwable)}. + * @param The type of {@link SingleSource.Subscriber}. + */ + public static void logDuplicateTerminal(SingleSource.Subscriber subscriber, Throwable cause) { + logDuplicateTerminal0(subscriber, cause); } - private static void logDuplicateTerminal0(Object subscriber) { + private static void logDuplicateTerminal0(Object subscriber, @Nullable Throwable cause) { LOGGER.warn("onSubscribe not called before terminal or duplicate terminal on Subscriber {}", subscriber, new IllegalStateException( "onSubscribe not called before terminal or duplicate terminal on Subscriber " + subscriber + " forbidden see: " + "https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9" + - "https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7")); + "https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7", cause)); } }