Skip to content

Commit

Permalink
more robust error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Sep 12, 2022
1 parent 898308c commit c4debd5
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.internal.ArrayUtils;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.context.api.ContextMap;

Expand Down Expand Up @@ -293,7 +294,7 @@ public void onSuccess(@Nullable final T result) {
break;
}
} else {
logDuplicateTerminal(this);
SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
break;
}
}
Expand All @@ -318,7 +319,7 @@ public void onError(final Throwable t) {
break;
}
} else {
logDuplicateTerminal(this);
logDuplicateTerminal(this, t);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.concurrent.internal.TerminalNotification.error;
import static io.servicetalk.concurrent.internal.ThrowableUtils.catchUnexpected;
import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -300,12 +298,19 @@ private void tryEmitItem(Object item, final boolean needsDemand, FlatMapPublishe
subscriber.markSignalsQueued();
enqueueAndDrain(item);
} else if (item == MAPPED_SOURCE_COMPLETE) {
requestMoreFromUpstream(1);
try {
requestMoreFromUpstream(1);
} catch (Throwable cause) {
onErrorNotHoldingLock(cause);
}
} else if (tryAcquireLock(emittingLockUpdater, this)) { // fast path. no concurrency, try to skip the queue.
try {
final boolean demandConsumed = sendToTarget(item);
assert demandConsumed == needsDemand || targetTerminated;
} finally {
// No need to catch exception and onErrorHoldingLock. If the signal is not a terminal it's safe to
// propagate, if the signal is a terminal then we have already delivered a terminal down stream and
// can't do anything else (just let the upstream handle it).
if (!releaseLock(emittingLockUpdater, this)) {
drainPending();
}
Expand All @@ -320,6 +325,34 @@ private void tryEmitItem(Object item, final boolean needsDemand, FlatMapPublishe
}
}

/**
* Process an error while holding the {@link #emittingLock}. We cannot re-throw here because sources have
* already terminated. This means the source cannot deliver another terminal or else it will violate the
* reactive stream spec. The best we can do is cancel upstream and mapped subscribers, and propagate the error
* downstream.
* @param cause The cause that occurred while delivering signals down stream.
*/
private void onErrorHoldingLock(Throwable cause) {
try {
doCancel(true);
} finally {
sendToTarget(error(cause));
}
}

private void onErrorNotHoldingLock(Throwable cause) {
if (tryAcquireLock(emittingLockUpdater, this)) { // fast path. no concurrency, try to skip the queue.
onErrorHoldingLock(cause);
} else {
try {
doCancel(true);
} finally {
// Emit the error to preserve ordering relative to onNext signals for this source.
enqueueAndDrain(error(cause));
}
}
}

private void enqueueItem(Object item) {
if (!signals.offer(item)) {
enqueueFailed(item);
Expand All @@ -342,54 +375,44 @@ private static void enqueueFailed(Object item) {
}

private void drainPending() {
Throwable delayedCause = null;
boolean tryAcquire = true;
int mappedSourcesCompleted = 0;
while (tryAcquire && tryAcquireLock(emittingLockUpdater, this)) {
try {
int mappedSourcesCompleted = 0;
final long prevDemand = pendingDemandUpdater.getAndSet(this, 0);
if (prevDemand < 0) {
pendingDemand = prevDemand;
} else {
long emittedCount = 0;
Object t;
while (emittedCount < prevDemand && (t = signals.poll()) != null) {
try {
if (t == MAPPED_SOURCE_COMPLETE) {
++mappedSourcesCompleted;
} else if (sendToTarget(t)) {
++emittedCount;
}
} catch (Throwable cause) {
if (t == MAPPED_SOURCE_COMPLETE) {
++mappedSourcesCompleted;
} else if (sendToTarget(t)) {
++emittedCount;
delayedCause = catchUnexpected(delayedCause, cause);
}
}

// check if a terminal event is pending, or give back demand.
if (emittedCount == prevDemand) {
for (;;) {
try {
t = signals.peek();
if (t == MAPPED_SOURCE_COMPLETE) {
signals.poll();
++mappedSourcesCompleted;
} else if (t instanceof FlatMapPublisherSubscriber) {
signals.poll();
@SuppressWarnings("unchecked")
final FlatMapPublisherSubscriber<T, R> hungrySubscriber =
(FlatMapPublisherSubscriber<T, R>) t;
distributeMappedDemand(hungrySubscriber);
} else {
break;
}
} catch (Throwable cause) {
delayedCause = catchUnexpected(delayedCause, cause);
t = signals.peek();
if (t == MAPPED_SOURCE_COMPLETE) {
signals.poll();
++mappedSourcesCompleted;
} else if (t instanceof FlatMapPublisherSubscriber) {
signals.poll();
@SuppressWarnings("unchecked")
final FlatMapPublisherSubscriber<T, R> hungrySubscriber =
(FlatMapPublisherSubscriber<T, R>) t;
distributeMappedDemand(hungrySubscriber);
} else {
break;
}
}

if (t instanceof TerminalNotification) {
sendToTarget(t); // if this throws its OK as we have terminated
sendToTarget(t);
} else {
sendToTargetIfPrematureError();
}
Expand All @@ -399,17 +422,21 @@ private void drainPending() {
FlowControlUtils::addWithOverflowProtectionIfNotNegative);
}
}
} finally {
tryAcquire = !releaseLock(emittingLockUpdater, this);
}
}

if (mappedSourcesCompleted != 0) {
requestMoreFromUpstream(mappedSourcesCompleted);
}

if (delayedCause != null) {
throwException(delayedCause);
if (mappedSourcesCompleted != 0) {
requestMoreFromUpstream(mappedSourcesCompleted);
}
} catch (Throwable cause) {
// We can't propagate an exception if the subscriber triggering this invocation has terminated, but
// we don't know in which context this method is being invoked and if the subscriber invoking this
// method may have a terminal event queued. Also since throwing is in violation of the spec we keep
// the code simple and handle the exception here. This method may also be invoked from a request(..)
// in which we can't let the exception propagate for graceful termination.
onErrorHoldingLock(cause);
return; // Poison emittingUpdater. We prematurely terminated, other signals should be ignored.
}
// Release lock after we handle errors, because error handling needs to poison the lock.
tryAcquire = !releaseLock(emittingLockUpdater, this);
}
}

Expand Down Expand Up @@ -547,7 +574,7 @@ public void onSubscribe(final Subscription s) {
// To accommodate for the "approximate" mapped demand we maintain a signal queue (bounded by the
// concurrency). This presents an opportunity to decouple downstream requestN requests from iterating
// all active mapped sources and instead optimistically give out demand here and replenish demand after
// signals are delivered to the downstream subscriber (based upon available demand is available).
// signals are delivered to the downstream subscriber (based upon available demand).
parent.distributeMappedDemand(this);
}

Expand All @@ -572,14 +599,20 @@ public void onNext(@Nullable final R r) {
private void handleInvalidDemand(int pendingDemand, @Nullable final R r) {
// Reset pendingDemand because we want to allow for a terminal event to be propagated. This is safe
// because request(..) won't be called until demand is exhausted. If the atomic operation fails
// something is wrong but don't override as it may have been terminated in another thread.
// either there is concurrency on the Subscriber (violating reactive streams spec) or request(..) has
// been called on another thread (demand was still violated, we drain demand to 0 before requesting).
pendingDemandUpdater.compareAndSet(this, pendingDemand, (pendingDemand > TERMINATED) ? 0 : TERMINATED);
throw new IllegalStateException("Too many onNext signals for Subscriber: " + this +
" pendingDemand: " + pendingDemand + " discarding: " + r);
}

@Override
public void onError(final Throwable t) {
final int unusedDemand = pendingDemandUpdater.getAndSet(this, TERMINATED);
if (unusedDemand < 0) {
SubscriberUtils.logDuplicateTerminal(this, t);
return;
}
Throwable currPendingError = parent.pendingError;
if (parent.source.maxDelayedErrors == 0) {
if (currPendingError == null && pendingErrorUpdater.compareAndSet(parent, null, t)) {
Expand All @@ -604,7 +637,7 @@ public void onError(final Throwable t) {
addPendingError(pendingErrorCountUpdater, parent,
parent.source.maxDelayedErrors, currPendingError, t);
}
if (parent.removeSubscriber(this, pendingDemandUpdater.getAndSet(this, TERMINATED))) {
if (parent.removeSubscriber(this, unusedDemand)) {
parent.enqueueAndDrain(error(currPendingError));
} else {
parent.tryEmitItem(MAPPED_SOURCE_COMPLETE, false, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ private void onError0(Throwable throwable, boolean cancelUpstream) {
}
}

/**
* Process an error while holding the {@link #emitting} lock. We cannot re-throw here because sources have
* already terminated. This means the source cannot deliver another terminal or else it will violate the
* reactive stream spec. The best we can do is cancel upstream and mapped subscribers, and propagate the error
* downstream.
* @param cause The cause that occurred while delivering signals down stream.
*/
private void onErrorHoldingLock(Throwable cause) {
try {
doCancel(true);
Expand Down Expand Up @@ -372,17 +379,31 @@ public void onSubscribe(Cancellable singleCancellable) {

@Override
public void onSuccess(@Nullable R result) {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
return;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;

// First enqueue the result and then decrement active count. Since onComplete() checks for active count,
// if we decrement count before enqueuing, onComplete() may emit the terminal event without emitting
// the result.
tryEmitItem(wrapNull(result));
if (onSingleTerminated()) {
if (decrementActiveMappedSources()) {
enqueueAndDrain(complete());
}
}

@Override
public void onError(Throwable t) {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminal(this, t);
return;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;

Throwable currPendingError = pendingError;
if (source.maxDelayedErrors == 0) {
if (currPendingError == null &&
Expand All @@ -403,24 +424,14 @@ public void onError(Throwable t) {
addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, source.maxDelayedErrors,
currPendingError, t);
}
if (onSingleTerminated()) {
if (decrementActiveMappedSources()) {
enqueueAndDrain(error(currPendingError));
} else {
// Queueing/draining may result in requestN more data.
tryEmitItem(SINGLE_ERROR);
}
}
}

private boolean onSingleTerminated() {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminal(this);
return false;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;
return decrementActiveMappedSources();
}
}
}
}
Loading

0 comments on commit c4debd5

Please sign in to comment.