Skip to content

Commit

Permalink
FromNPublisher: limit recursion depth to 1 (apple#1653)
Browse files Browse the repository at this point in the history
Motivation:

To mitigate apple#1652 issue, temporary limit the mutual recursion between
`onNext` and `request` to a depth of 1.

Modifications:

- Use the left 4 bits of the `state` to limit recursion depth to 1;
- Remove `boundedDepthOfOnNextAndRequestRecursion()` override from
`PublisherFrom2TckTest` and `PublisherFrom3TckTest`;

Result:

1. `FromNPublisher` has a maximum recursion depth of 1.
2. Less risk of encountering issue apple#1652.
  • Loading branch information
idelpivnitskiy authored and bondolo committed Jul 2, 2021
1 parent ecd9ff3 commit 1c19c0a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ void doSubscribe(final Subscriber<? super T> subscriber) {
}

private final class NValueSubscription implements Subscription {
private static final byte ZERO = 0;
private static final byte ONE = 1;
private static final byte TWO = 2;
private static final byte TERMINATED = 3;
private byte requested;
private byte state;
Expand All @@ -65,9 +68,9 @@ private final class NValueSubscription implements Subscription {
private NValueSubscription(final Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
if (v1 == UNUSED_REF) {
// 3-value version - simulate 1 emitted item, start counting from 1.
// 2-value version - simulate 1 emitted item, start counting from 1.
requested = 1;
state++;
state = ONE;
}
}

Expand All @@ -78,7 +81,7 @@ public void cancel() {

@Override
public void request(final long n) {
if (state == TERMINATED) {
if (state() == TERMINATED) {
return;
}
if (!isRequestNValid(n)) {
Expand All @@ -90,20 +93,27 @@ public void request(final long n) {
return;
}
requested = (byte) min(3, addWithOverflowProtection(requested, n));
boolean successful = true;
while (successful && state < requested) {
if (state == 0) {
successful = deliver(v1);
} else if (state == 1) {
successful = deliver(v2);
} else if (state == 2 && deliver(v3)) {
subscriber.onComplete();
if (ignoreRequests()) {
return;
}
ignoreRequests(true);
while (state() < requested) {
if (state() == ZERO) {
deliver(v1, ONE);
} else if (state() == ONE) {
deliver(v2, TWO);
} else if (state() == TWO) {
if (deliver(v3, TERMINATED)) {
subscriber.onComplete();
}
return;
}
}
ignoreRequests(false);
}

private boolean deliver(@Nullable T value) {
++state;
private boolean deliver(@Nullable final T value, final byte nextState) {
state = (byte) ((state & 0x10) | nextState);
try {
subscriber.onNext(value);
return true;
Expand All @@ -113,5 +123,21 @@ private boolean deliver(@Nullable T value) {
return false;
}
}

private byte state() {
return (byte) (state & 0x0F);
}

private boolean ignoreRequests() {
return (state & 0x10) > 0;
}

private void ignoreRequests(final boolean ignore) {
if (ignore) {
state |= 0x10;
} else {
state &= 0x0F;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,4 @@ public Publisher<Integer> createServiceTalkPublisher(long elements) {
public long maxElementsFromPublisher() {
return 2;
}

@Override
public long boundedDepthOfOnNextAndRequestRecursion() {
return 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,4 @@ public Publisher<Integer> createServiceTalkPublisher(long elements) {
public long maxElementsFromPublisher() {
return 3;
}

@Override
public long boundedDepthOfOnNextAndRequestRecursion() {
return 3;
}
}

0 comments on commit 1c19c0a

Please sign in to comment.