Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SingleConcatWithPublisher: limit recursion depth to 1 #1654

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,18 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
*/
private static final Object REQUESTED_ONE = new Object();
/**
* If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)}.
* If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)} or while its
* result is delivering to the target.
*/
private static final Object REQUESTED_MORE = new Object();
/**
* If only one item was {@link #request(long) requested} and {@link #onSuccess(Object)} invoked.
*/
private static final Object SINGLE_DELIVERING = new Object();
/**
* If only one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and its result was
* delivered to the target.
*/
private static final Object SINGLE_DELIVERED = new Object();
/**
* If more than one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and we
Expand All @@ -227,6 +233,7 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
public void onSuccess(@Nullable final T result) {
for (;;) {
final Object oldValue = mayBeResult;
assert oldValue != SINGLE_DELIVERING;
assert oldValue != SINGLE_DELIVERED;
assert oldValue != PUBLISHER_SUBSCRIBED;

Expand All @@ -237,8 +244,8 @@ public void onSuccess(@Nullable final T result) {
break;
}
} else if (oldValue == REQUESTED_ONE) {
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERED)) {
tryEmitSingleSuccessToTarget(result);
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) {
emitSingleSuccessToTarget(result);
break;
}
} else if (oldValue == REQUESTED_MORE &&
Expand Down Expand Up @@ -280,7 +287,7 @@ public void request(long n) {
break;
}
}
} else if (oldVal == REQUESTED_ONE) {
} else if (oldVal == REQUESTED_ONE || oldVal == SINGLE_DELIVERING) {
if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) {
super.request(n);
break;
Expand All @@ -301,13 +308,26 @@ public void request(long n) {
}
break;
}
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERED)) {
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERING)) {
@SuppressWarnings("unchecked")
final T tVal = (T) oldVal;
tryEmitSingleSuccessToTarget(tVal);
emitSingleSuccessToTarget(tVal);
break;
}
}
}

private void emitSingleSuccessToTarget(@Nullable final T result) {
if (tryEmitSingleSuccessToTarget(result)) {
if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERING, SINGLE_DELIVERED)) {
// state didn't change, we are done
} else if (mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) {
// more demand appeared while we were delivering the single result
next.subscribeInternal(this);
} else {
assert mayBeResult == CANCELLED;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ public class SingleConcatWithPublisherDeferSubscribeTckTest extends SingleConcat
boolean deferSubscribe() {
return true;
}

@Override
public long boundedDepthOfOnNextAndRequestRecursion() {
return 2;
}
}