diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java new file mode 100644 index 0000000000..8f759545ba --- /dev/null +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java @@ -0,0 +1,136 @@ +package reactor.core.publisher; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; +import org.openjdk.jcstress.infra.results.I_Result; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +public class SinksManyReplayLatestStressTest { + final StressSubscriber target = new StressSubscriber<>(); + + final SinkManyReplayProcessor sink = new SinkManyReplayProcessor<>( + new FluxReplay.SingletonReplayBuffer<>() + ); + + @JCStressTest + @Outcome(id = {"1"}, expect = ACCEPTABLE, desc = "single emission") + @Outcome(id = {"2"}, expect = ACCEPTABLE, desc = "two emissions") + @State + public static class TryEmitNextStressTest extends SinksManyReplayLatestStressTest { + + @Actor + public void first() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void second() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void subscriber() { + sink.subscribe(target); + } + + @Arbiter + public void arbiter(I_Result r) { + r.r1 = target.onNextCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"6, 63"}, expect = ACCEPTABLE, desc = "six subscribers, 64-1 result") + @State + public static class SubscriberCountStressTest extends SinksManyReplayLatestStressTest { + + AtomicInteger count = new AtomicInteger(); + + @Actor + public void first() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void one() { + sink.subscribe(s -> count.addAndGet(1)); + } + + @Actor + public void two() { + sink.subscribe(s -> count.addAndGet(2)); + } + + @Actor + public void three() { + sink.subscribe(s -> count.addAndGet(4)); + } + + @Actor + public void four() { + sink.subscribe(s -> count.addAndGet(8)); + } + + @Actor + public void five() { + sink.subscribe(s -> count.addAndGet(16)); + } + + @Actor + public void six() { + sink.subscribe(s -> count.addAndGet(32)); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = sink.currentSubscriberCount(); + r.r2 = count.get(); + } + } + + + @JCStressTest + @Outcome(id = {"0, 0"}, expect = ACCEPTABLE, desc = "complete first") + @Outcome(id = {"0, 1"}, expect = ACCEPTABLE, desc = "subscriber 1 before complete") + @Outcome(id = {"0, 2"}, expect = ACCEPTABLE, desc = "subscriber 2 before complete") + @Outcome(id = {"0, 3"}, expect = ACCEPTABLE, desc = "both subscribe before complete") + @State + public static class SubscriberCountCompleteStressTest extends SinksManyReplayLatestStressTest { + + AtomicInteger count = new AtomicInteger(); + + @Actor + public void first() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void completer() { + sink.tryEmitComplete(); + } + + + @Actor + public void one() { + sink.subscribe(s -> count.addAndGet(1)); + } + + @Actor + public void two() { + sink.subscribe(s -> count.addAndGet(2)); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = sink.currentSubscriberCount(); + r.r2 = count.get(); + } + } +} diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java index e6c1d2bcdc..631bb7e39b 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicStampedReference; import java.util.function.Consumer; import java.util.stream.Stream; @@ -496,6 +497,206 @@ public void replay(ReplaySubscription rs) { } } + static final class SingletonReplayBuffer implements ReplayBuffer { + final AtomicStampedReference val = new AtomicStampedReference<>(null, 0); + private Throwable error; + private volatile boolean done; + + @Override + public void add(T value) { + int stamp = val.getStamp(); + val.set(value, stamp + 1); + } + + @Override + public void onError(Throwable ex) { + error = ex; + done = true; + } + + @Override + public void onComplete() { + done = true; + } + + + @Override + public Throwable getError() { + return error; + } + + void replayFused(ReplaySubscription rs) { + int missed = 1; + + final Subscriber a = rs.actual(); + + for (; ; ) { + + if (rs.isCancelled()) { + return; + } + + boolean d = done; + + a.onNext(null); + + if (d) { + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } + else { + a.onComplete(); + } + return; + } + + missed = rs.leave(missed); + if (missed == 0) { + break; + } + } + } + + @Override + public void replay(ReplaySubscription rs) { + if (!rs.enter()) { + return; + } + + if (rs.fusionMode() == NONE) { + replayNormal(rs); + } + else { + replayFused(rs); + } + } + + + void replayNormal(ReplaySubscription rs) { + final Subscriber a = rs.actual(); + + int missed = 1; + + for (; ; ) { + + long r = rs.requested(); + + int idx = rs.index(); + boolean produced = false; + + if (rs.isCancelled()) { + return; + } + + boolean d = done; + int[] stamp = new int[1]; + T next = val.get(stamp); + boolean empty = isEmpty(rs); + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } + else { + a.onComplete(); + } + return; + } + + if (!empty && idx != stamp[0] && r != 0) { + a.onNext(next); + rs.requestMore(stamp[0]); + produced = true; + } + + if (r==1) { + if (rs.isCancelled()) { + return; + } + + if (done && isEmpty(rs)) { + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } + else { + a.onComplete(); + } + return; + } + } + + if (produced && r != Long.MAX_VALUE) { + rs.produced(1); + } + + missed = rs.leave(missed); + if (missed == 0) { + break; + } + } + } + + + @Override + public boolean isDone() { + return done; + } + + @Override + public T poll(ReplaySubscription rs) { + int idx = rs.index(); + int[] stamp = new int[1]; + T stampedVal = val.get(stamp); + + if (idx == stamp[0] || stamp[0] == 0) { + return null; + } + rs.requestMore(stamp[0]); + + return stampedVal; + } + + @Override + public void clear(ReplaySubscription rs) { + rs.node(null); + } + + @Override + public boolean isEmpty(ReplaySubscription rs) { + int idx = rs.index(); + int stamp = val.getStamp(); + return stamp == 0 || idx == stamp; + } + + @Override + public int size(ReplaySubscription rs) { + int valIdx = val.getStamp(); + return rs.index() == valIdx ? 0 : sizeOf(valIdx); + } + + @Override + public int size() { + return sizeOf(val.getStamp()); + } + + private int sizeOf(int value) { + return value == 0 ? 0 : 1; + } + + @Override + public int capacity() { + return 1; + } + + @Override + public boolean isExpired() { + return false; + } + } + static final class UnboundedReplayBuffer implements ReplayBuffer { final int batchSize; @@ -1107,7 +1308,8 @@ ReplaySubscriber newState() { scheduler), this, history); } if (history != Integer.MAX_VALUE) { - return new ReplaySubscriber<>(new SizeBoundReplayBuffer<>(history), + ReplayBuffer buffer = history == 1 ? new SingletonReplayBuffer<>() : new SizeBoundReplayBuffer<>(history); + return new ReplaySubscriber<>(buffer, this, history); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java index 39b1daaf5b..58d4a928c7 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java @@ -154,6 +154,9 @@ public static ReplayProcessor create(int historySize, boolean unbounded) if (unbounded) { buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize); } + else if(historySize == 1) { + buffer = new FluxReplay.SingletonReplayBuffer<>(); + } else { buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java index 9493e79738..a0c50da8fc 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java @@ -136,6 +136,9 @@ static SinkManyReplayProcessor create(int historySize, boolean unbounded) if (unbounded) { buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize); } + else if (historySize == 1) { + buffer = new FluxReplay.SingletonReplayBuffer<>(); + } else { buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java index c29c94580f..c58786cb43 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java @@ -23,8 +23,10 @@ import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; +import org.assertj.core.api.DoublePredicateAssert; import org.assertj.core.data.Offset; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -33,6 +35,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -42,6 +46,7 @@ import reactor.core.Scannable; import reactor.core.scheduler.Schedulers; import reactor.test.MemoryUtils; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.scheduler.VirtualTimeScheduler; @@ -53,6 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.fail; public class FluxReplayTest extends FluxOperatorTest { @@ -327,13 +333,13 @@ public void cacheFlux() { } - @Test + @ParameterizedTestWithName + @CsvSource(value = {"1", "2", "3", "4", "0x7fffffff"}) @Tag("VirtualTime") - public void cacheFluxFused() { - + public void cacheFluxFused(int history) { Flux> source = Flux.just(1, 2, 3) .delayElements(Duration.ofMillis(1000)) - .replay() + .replay(history) .autoConnect() .elapsed(); @@ -345,14 +351,16 @@ public void cacheFluxFused() { .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3) .verifyComplete(); - StepVerifier.create(source) + StepVerifier.Step> verifier = StepVerifier.create(source) .expectFusion(Fuseable.ANY) - .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3))) - .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1) - .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2) - .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3) - .verifyComplete(); - + .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3))); + for (int i = Math.min(history, 3); i > 0; i--) { + int iCpy = 4 - i; + verifier.expectNextMatches( + t -> t.getT1() == 0 && t.getT2() == iCpy + ); + } + verifier.verifyComplete(); } @Test @@ -632,9 +640,10 @@ public void onlyInitialRequestWithLateUnboundedSubscriber() { ts.assertValueCount(8); //despite unbounded, as it was late it only sees the replay capacity } - @Test - public void cancel() { - ConnectableFlux replay = Sinks.many().unicast().onBackpressureBuffer().asFlux().replay(2); + @ParameterizedTestWithName + @CsvSource(value = {"1", "2"}) + public void cancel(int num) { + ConnectableFlux replay = Sinks.many().unicast().onBackpressureBuffer().asFlux().replay(num); replay.subscribe(v -> {}, e -> { throw Exceptions.propagate(e); }); @@ -727,10 +736,11 @@ public void cacheSingleSubscriberWithMultipleRequestsDoesntHang() { assertThat(listFromStream).hasSize(1000); } - @Test - public void cacheNotOverrunByMaxPrefetch() { + @ParameterizedTestWithName + @CsvSource(value = {"1", "5"}) + public void cacheNotOverrunByMaxPrefetch(int history) { Flux s = Flux.range(1, 30) - .cache(5); + .cache(history); StepVerifier.create(s, 10) .expectNextCount(10) @@ -739,14 +749,16 @@ public void cacheNotOverrunByMaxPrefetch() { .verifyComplete(); StepVerifier.create(s) - .expectNextCount(5) + .expectNextCount(history) .verifyComplete(); } - @Test - public void ifSubscribeBeforeConnectThenTrackFurtherRequests() { - ConnectableFlux connectableFlux = Flux.just(1L, 2L, 3L, 4L) - .replay(2); + @ParameterizedTestWithName + @CsvSource(value = {"1","2","3","4"}) + public void ifSubscribeBeforeConnectThenTrackFurtherRequests(int num) { + Long[] nums = {1L, 2L, 3L, 4L}; + ConnectableFlux connectableFlux = Flux.fromArray(nums) + .replay(num); StepVerifier.create(connectableFlux, 1) .expectSubscription() @@ -758,29 +770,83 @@ public void ifSubscribeBeforeConnectThenTrackFurtherRequests() { .expectComplete() .verify(Duration.ofSeconds(10)); - StepVerifier.create(connectableFlux, 1) - .expectNext(3L) - .thenRequest(10) - .expectNext(4L) - .expectComplete() + StepVerifier.Step verifier = StepVerifier.create(connectableFlux, 1) + .expectNext(nums[nums.length-num]) + .thenRequest(10); + for (int i=num-1; i > 0; i--) { + verifier = verifier.expectNext(nums[nums.length-i]); + } + verifier.expectComplete() .verify(Duration.ofSeconds(10)); } - @Test - public void ifNoSubscriptionBeforeConnectThenPrefetches() { + enum NoSubscribtionTestEnum { + replay1( + 1, + new Integer[]{24}, + new Long[]{1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L} + ), + replay2( + 2, + new Integer[]{23, 24}, + new Long[]{2L, 2L, 2L, 2L, 2L, 2L, 2L, 2L, 2L, 2L} + ), + replay3( + 3, + new Integer[]{22, 23, 24}, + new Long[]{3L, 3L, 3L, 3L, 3L, 3L, 3L, 3L, 3L} + ), + replay4( + 4, + new Integer[]{21, 22, 23, 24}, + new Long[]{4L, 3L, 3L, 3L, 3L, 3L, 3L, 3L, 3L} + ), + replay5( + 5, + new Integer[]{20, 21, 22, 23, 24}, + new Long[]{5L, 4L, 4L, 4L, 4L, 4L, 4L} + ), + replay6( + 6, + new Integer[]{19, 20, 21, 22, 23, 24}, + new Long[]{6L, 5L, 5L, 5L, 5L} + ), + replay7( + 7, + new Integer[]{18, 19, 20, 21, 22, 23, 24}, + new Long[]{7L, 6L, 6L, 6L, 6L} + ), + replay8( + 8, + new Integer[]{17, 18, 19, 20, 21, 22, 23, 24}, + new Long[]{8L, 6L, 6L, 6L, 6L} + ); + final int history; + final Integer[] expected; + final Long[] requestQueue; + NoSubscribtionTestEnum(int history, Integer[] expected, Long[] requestQueue) { + this.history = history; + this.expected = expected; + this.requestQueue = requestQueue; + } + } + + @ParameterizedTestWithName + @EnumSource(NoSubscribtionTestEnum.class) + public void ifNoSubscriptionBeforeConnectThenPrefetches(NoSubscribtionTestEnum testdata) { Queue totalRequested = new ArrayBlockingQueue<>(10); ConnectableFlux connectable = Flux.range(1, 24) .doOnRequest(totalRequested::offer) - .replay(8); + .replay(testdata.history); connectable.connect(); StepVerifier.create(connectable) - .expectNext(17, 18, 19, 20, 21, 22, 23, 24) + .expectNext(testdata.expected) .expectComplete() .verify(Duration.ofSeconds(10)); - assertThat(totalRequested).containsExactly(8L, 6L, 6L, 6L, 6L); + assertThat(totalRequested).containsExactly(testdata.requestQueue); } private static final class TwoRequestsSubscriber extends BaseSubscriber { @@ -806,4 +872,64 @@ protected void hookOnNext(Integer value) { } } + static final int UNSET = -1; + enum SizeAndCapacityTestData { + singleton( i -> + new FluxReplay.SingletonReplayBuffer<>(), + 1, 1), + sizeandtime( i -> + new FluxReplay.SizeAndTimeBoundReplayBuffer<>(i,Long.MAX_VALUE, Schedulers.single()), + UNSET, UNSET), + size( i -> + new FluxReplay.SizeBoundReplayBuffer<>(i), + UNSET, UNSET), + unbounded( i -> + new FluxReplay.UnboundedReplayBuffer<>(2), + Integer.MAX_VALUE, Integer.MAX_VALUE); + final Function> bufferClass; + final int maxSize; + final int capacity; + SizeAndCapacityTestData(Function> bufferClass, int maxSize, int capacity) { + this.bufferClass = bufferClass; + this.maxSize = maxSize; + this.capacity = capacity; + } + } + + @ParameterizedTestWithName + @EnumSource(SizeAndCapacityTestData.class) + public void sizeAndCapacityTest(SizeAndCapacityTestData testData) { + for (int input : new Integer[]{1, 2, 3, 10}) { + FluxReplay.ReplaySubscription rs = new FluxReplay.ReplayInner<>(null, null); + FluxReplay.ReplayBuffer buffer = testData.bufferClass.apply(input); + assertThat(buffer.capacity()).isEqualTo(testData.capacity == UNSET ? input : testData.capacity); + + assertThat(buffer.size()).isEqualTo(0); // no data added + assertThat(buffer.size(rs)).isEqualTo(0); // no data added + for (int i = 1; i < input; i++){ + buffer.add(0); + int expectedSize = testData.maxSize != UNSET ? testData.maxSize : i; + assertThat(buffer.size()).isEqualTo(Math.min(expectedSize, i)); + assertThat(buffer.size(rs)).isEqualTo(Math.min(expectedSize, i)); + } + } + + } + + @ParameterizedTestWithName + @EnumSource(SizeAndCapacityTestData.class) + void testNoEmitOnRequestNone(SizeAndCapacityTestData cache) { + FluxReplay.ReplaySubscription rs = new FluxReplay.ReplayInner<>(null, null); + assertThat(rs.requested()).isEqualTo(0); + FluxReplay.ReplayBuffer buffer = cache.bufferClass.apply(1); + buffer.add(123); + try { + buffer.replay(rs); + } catch (NullPointerException interaction) { + // this failure may become incorrect, + // as it relies on the ReplaySubscription implementation + fail("we did not expect an interaction with neither " + + "the parent nor the actual subscription"); + } + } }