Skip to content

Commit

Permalink
Revert "ConcatDeferNextSubscriber: avoid re-entry"
Browse files Browse the repository at this point in the history
This reverts commit 5e75613.
  • Loading branch information
idelpivnitskiy committed Jul 1, 2021
1 parent 786067d commit 6032a41
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,12 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
*/
private static final Object REQUESTED_ONE = new Object();
/**
* If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)} or while its
* result is delivering to the target.
* If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)}.
*/
private static final Object REQUESTED_MORE = new Object();
/**
* If only one item was {@link #request(long) requested} and {@link #onSuccess(Object)} invoked.
*/
private static final Object SINGLE_DELIVERING = new Object();
/**
* If only one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and its result was
* delivered to the target.
*/
private static final Object SINGLE_DELIVERED = new Object();
/**
* If more than one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and we
Expand All @@ -233,7 +227,6 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
public void onSuccess(@Nullable final T result) {
for (;;) {
final Object oldValue = mayBeResult;
assert oldValue != SINGLE_DELIVERING;
assert oldValue != SINGLE_DELIVERED;
assert oldValue != PUBLISHER_SUBSCRIBED;

Expand All @@ -244,8 +237,8 @@ public void onSuccess(@Nullable final T result) {
break;
}
} else if (oldValue == REQUESTED_ONE) {
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) {
emitSingleSuccessToTarget(result);
if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERED)) {
tryEmitSingleSuccessToTarget(result);
break;
}
} else if (oldValue == REQUESTED_MORE &&
Expand Down Expand Up @@ -287,7 +280,7 @@ public void request(long n) {
break;
}
}
} else if (oldVal == REQUESTED_ONE || oldVal == SINGLE_DELIVERING) {
} else if (oldVal == REQUESTED_ONE) {
if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) {
super.request(n);
break;
Expand All @@ -308,26 +301,13 @@ public void request(long n) {
}
break;
}
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERING)) {
} else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERED)) {
@SuppressWarnings("unchecked")
final T tVal = (T) oldVal;
emitSingleSuccessToTarget(tVal);
tryEmitSingleSuccessToTarget(tVal);
break;
}
}
}

private void emitSingleSuccessToTarget(@Nullable final T result) {
if (tryEmitSingleSuccessToTarget(result)) {
if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERING, SINGLE_DELIVERED)) {
// state didn't change, we are done
} else if (mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) {
// more demand appeared while we were delivering the single result
next.subscribeInternal(this);
} else {
assert mayBeResult == CANCELLED;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ public class SingleConcatWithPublisherDeferSubscribeTckTest extends SingleConcat
boolean deferSubscribe() {
return true;
}

@Override
public long boundedDepthOfOnNextAndRequestRecursion() {
return 2;
}
}

0 comments on commit 6032a41

Please sign in to comment.