Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher#flatMapMerge allow terminal propagation after invalid demand #2348

Merged
merged 4 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.internal.ArrayUtils;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.context.api.ContextMap;

Expand Down Expand Up @@ -293,7 +294,7 @@ public void onSuccess(@Nullable final T result) {
break;
}
} else {
logDuplicateTerminal(this);
SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
break;
}
}
Expand All @@ -318,7 +319,7 @@ public void onError(final Throwable t) {
break;
}
} else {
logDuplicateTerminal(this);
logDuplicateTerminal(this, t);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

final class CompositeExceptionUtils {
/**
* Default to {@code 1} so {@link Throwable#addSuppressed(Throwable)} will not be used by default.
Expand All @@ -35,7 +33,8 @@ static <T> void addPendingError(AtomicIntegerFieldUpdater<T> updater, T owner, i
if (newSize < 0) {
updater.set(owner, Integer.MAX_VALUE);
} else if (newSize < maxDelayedErrors && original != causeToAdd) {
addSuppressed(original, causeToAdd);
// We ensure original is not equal to causeToAdd, safe to add suppressed.
original.addSuppressed(causeToAdd);
} else {
updater.decrementAndGet(owner);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ private void onError0(Throwable throwable, boolean cancelUpstream) {
}
}

/**
* Process an error while holding the {@link #emitting} lock. We cannot re-throw here because sources have
* already terminated. This means the source cannot deliver another terminal or else it will violate the
* reactive stream spec. The best we can do is cancel upstream and mapped subscribers, and propagate the error
* downstream.
* @param cause The cause that occurred while delivering signals down stream.
*/
private void onErrorHoldingLock(Throwable cause) {
try {
doCancel(true);
Expand Down Expand Up @@ -372,17 +379,31 @@ public void onSubscribe(Cancellable singleCancellable) {

@Override
public void onSuccess(@Nullable R result) {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
return;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;

// First enqueue the result and then decrement active count. Since onComplete() checks for active count,
// if we decrement count before enqueuing, onComplete() may emit the terminal event without emitting
// the result.
tryEmitItem(wrapNull(result));
if (onSingleTerminated()) {
if (decrementActiveMappedSources()) {
enqueueAndDrain(complete());
}
}

@Override
public void onError(Throwable t) {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminal(this, t);
return;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;

Throwable currPendingError = pendingError;
if (source.maxDelayedErrors == 0) {
if (currPendingError == null &&
Expand All @@ -403,24 +424,14 @@ public void onError(Throwable t) {
addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, source.maxDelayedErrors,
currPendingError, t);
}
if (onSingleTerminated()) {
if (decrementActiveMappedSources()) {
enqueueAndDrain(error(currPendingError));
} else {
// Queueing/draining may result in requestN more data.
tryEmitItem(SINGLE_ERROR);
}
}
}

private boolean onSingleTerminated() {
if (singleCancellable == null) {
SubscriberUtils.logDuplicateTerminal(this);
return false;
}
cancellableSet.remove(singleCancellable);
singleCancellable = null;
return decrementActiveMappedSources();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayDeque;
Expand All @@ -40,8 +41,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.IntStream;
import javax.annotation.Nullable;

Expand All @@ -56,7 +59,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static java.lang.Math.min;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -120,6 +123,61 @@ void singleToPublisherOnNextErrorPropagated(boolean delayError) {
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void subscriptionThrowsFromTerminalHandled(boolean delayError) {
AtomicLong requestCount = new AtomicLong();
LongConsumer requestThrower = n -> {
if (requestCount.accumulateAndGet(n, Long::sum) > 1) {
throw DELIBERATE_EXCEPTION;
}
};
Function<? super Integer, ? extends Publisher<? extends Integer>> func = Publisher::from;
toSource(delayError ?
publisher.whenRequest(requestThrower).flatMapMergeDelayError(func, 1) :
publisher.whenRequest(requestThrower).flatMapMerge(func, 1))
.subscribe(subscriber);
subscriber.awaitSubscription().request(1);
publisher.onNext(1);
publisher.onComplete();
assertThat(subscriber.takeOnNext(), is(1));
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void exceptionHandledWhileConcurrentProcessing(boolean delayError) {
assert executor != null;
TestPublisher<Integer> publisher2 = new TestPublisher<>();
Function<? super Integer, ? extends Publisher<? extends Integer>> func = i -> {
if (i == 1) {
return publisher2;
} else if (i == 2) {
return executor.submit(() -> i).toPublisher();
} else {
return never();
}
};
toSource((delayError ?
publisher.flatMapMergeDelayError(func, 2) :
publisher.flatMapMerge(func, 2))
.map(i -> {
if (i == 2) {
publisher2.onNext(12);
return i;
} else if (i == 12) {
throw DELIBERATE_EXCEPTION;
}
throw new IllegalStateException("unexpected i: " + i);
}))
.subscribe(subscriber);
subscriber.awaitSubscription().request(2);
publisher.onNext(1, 2);
publisher.onComplete();
assertThat(subscriber.takeOnNext(), is(2));
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@Test
void mappedRecoverMakesProgress() throws Exception {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -265,21 +323,121 @@ void singleItemMappedErrorPostSourceComplete() {
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void testDuplicateTerminal() {
@ParameterizedTest(name = "{displayName} [{index}] errorFirst={0} errorSecond={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testDuplicateTerminal(boolean errorFirst, boolean errorSecond) {
PublisherSource<Integer> mappedPublisher = subscriber -> {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onComplete();
if (errorFirst) {
subscriber.onError(DELIBERATE_EXCEPTION);
} else {
subscriber.onComplete();
}

// intentionally violate the RS spec to verify the operator's behavior.
// [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7
subscriber.onComplete();
if (errorSecond) {
subscriber.onError(new IllegalStateException("duplicate terminal should be discarded!"));
} else {
subscriber.onComplete();
}
};
@SuppressWarnings("unchecked")
Subscriber<Integer> mockSubscriber = mock(Subscriber.class);
toSource(publisher.flatMapMerge(i -> fromSource(mappedPublisher), 1)).subscribe(mockSubscriber);
publisher.onNext(1);

if (errorFirst) {
verify(mockSubscriber).onError(DELIBERATE_EXCEPTION);
} else {
publisher.onComplete();
verify(mockSubscriber).onComplete();
}
}

@ParameterizedTest(name = "{displayName} [{index}] delayError={0} queuedSignals={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testInvalidDemand(boolean delayError, boolean queuedSignals) throws InterruptedException {
final int firstItem = 1;
Publisher<Integer> publisher = Publisher.range(firstItem, firstItem + 10);
TestSubscription mappedSubscription = new TestSubscription();
TestPublisher<Integer> mappedPublisher = new TestPublisher.Builder<Integer>()
.disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(mappedSubscription);
return subscriber1;
});
Function<Integer, Publisher<Integer>> mapper = i -> i == firstItem ? mappedPublisher : never();
toSource(delayError ? publisher.flatMapMergeDelayError(mapper, 1) : publisher.flatMapMerge(mapper, 1))
.subscribe(subscriber);
Subscription subscription = subscriber.awaitSubscription();
subscription.request(1);

mappedSubscription.awaitRequestN(1);
mappedPublisher.onNext(2);
assertEquals(2, subscriber.takeOnNext());

if (queuedSignals) {
// We issued request(1) on the outer publisher and so the inner publisher is allowed to request more to
// avoid potential deadlocks.
mappedSubscription.awaitRequestN(2);
mappedPublisher.onNext(3);
}

subscription.request(-1);

assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class));
}

@ParameterizedTest(name = "{displayName} [{index}] delayError={0} mapErrorToComplete={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testDemandNotRespectedPropagatesTerminal(boolean delayError, boolean mapErrorToComplete)
throws InterruptedException {
final int firstItem = 1;
TestSubscription upstreamSubscription = new TestSubscription();
publisher = new TestPublisher.Builder<Integer>()
.disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(upstreamSubscription);
return subscriber1;
});
TestSubscription mappedSubscription = new TestSubscription();
TestPublisher<Integer> mappedPublisher = new TestPublisher.Builder<Integer>()
.disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(mappedSubscription);
return subscriber1;
});
Function<Integer, Publisher<Integer>> mapper = i -> i == firstItem ?
(mapErrorToComplete ? mappedPublisher.onErrorComplete() : mappedPublisher) :
never();
toSource(delayError ? publisher.flatMapMergeDelayError(mapper, 1) : publisher.flatMapMerge(mapper, 1))
.subscribe(subscriber);
Subscription subscription = subscriber.awaitSubscription();
subscription.request(1);
upstreamSubscription.awaitRequestN(1);
publisher.onNext(firstItem);

mappedSubscription.awaitRequestN(1);
mappedPublisher.onNext(2);
assertEquals(2, subscriber.takeOnNext());

// We issued request(1) on the outer publisher and so the inner publisher is allowed to request more to avoid
// potential deadlocks.
mappedSubscription.awaitRequestN(2);
mappedPublisher.onNext(3);
assertThat(subscriber.pollOnNext(10, MILLISECONDS), is(nullValue())); // no demand means no delivery.
assertThat(mappedSubscription.requestedEquals(2), is(true));
mappedPublisher.onNext(4); // intentionally deliver an item without demand!
publisher.onComplete();
verify(mockSubscriber).onComplete();

// Drain the item in the queue in order to have the terminal event delivered.
subscription.request(1);
assertEquals(3, subscriber.takeOnNext());

if (mapErrorToComplete) {
subscriber.awaitOnComplete();
} else {
assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class));
}
assertThat(upstreamSubscription.isCancelled(), is(!mapErrorToComplete && !delayError));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -220,21 +221,36 @@ void testSingleErrorPostSourceComplete() {
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void testDuplicateTerminal() {
@ParameterizedTest(name = "{displayName} [{index}] errorFirst={0} errorSecond={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void testDuplicateTerminal(boolean errorFirst, boolean errorSecond) {
SingleSource<Integer> single = subscriber -> {
subscriber.onSubscribe(IGNORE_CANCEL);
subscriber.onSuccess(2);
if (errorFirst) {
subscriber.onError(DELIBERATE_EXCEPTION);
} else {
subscriber.onSuccess(2);
}

// intentionally violate the RS spec to verify the operator's behavior.
// [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7
subscriber.onSuccess(3);
if (errorSecond) {
subscriber.onError(new IllegalStateException("duplicate terminal should be discarded!"));
} else {
subscriber.onSuccess(3);
}
};
@SuppressWarnings("unchecked")
Subscriber<Integer> mockSubscriber = mock(Subscriber.class);
toSource(source.flatMapMergeSingle(integer1 -> fromSource(single), 2)).subscribe(mockSubscriber);
source.onNext(1);
source.onComplete();
verify(mockSubscriber).onComplete();

if (errorFirst) {
verify(mockSubscriber).onError(DELIBERATE_EXCEPTION);
} else {
source.onComplete();
verify(mockSubscriber).onComplete();
}
}

@Test
Expand Down
Loading