From 494bce153b99f0a562862224a257cd9643c5094c Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Fri, 24 Feb 2023 13:16:51 +0100 Subject: [PATCH 01/13] [issue 3340] SingletonReplayBuffer based off of SizeBoundReplayBuffer, but with simplifications in size/cpacity/replay etc. --- .../reactor/core/publisher/FluxReplay.java | 220 +++++++++++++++++- 1 file changed, 219 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java index e6c1d2bcdc..9ca99aa4ab 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java @@ -496,6 +496,223 @@ public void replay(ReplaySubscription rs) { } } + static final class SingletonReplayBuffer implements ReplayBuffer { + volatile T value = null; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater ACTUAL = + AtomicReferenceFieldUpdater.newUpdater(SingletonReplayBuffer.class, Object.class, "value"); + + private Throwable error; + private volatile boolean done; + + @Override + public void add(T value) { + ACTUAL.set(this, value); + } + + @Override + public void onError(Throwable ex) { + error = ex; + done = true; + } + + @Override + public void onComplete() { + done = true; + } + + + @Override + public Throwable getError() { + return error; + } + + void replayFused(ReplaySubscription rs) { + int missed = 1; + + final Subscriber a = rs.actual(); + + for (; ; ) { + + if (rs.isCancelled()) { + rs.node(null); + return; + } + + boolean d = done; + + a.onNext(null); + + if (d) { + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } + else { + a.onComplete(); + } + return; + } + + missed = rs.leave(missed); + if (missed == 0) { + break; + } + } + } + + @Override + public void replay(ReplaySubscription rs) { + if (!rs.enter()) { + return; + } + + if (rs.fusionMode() == NONE) { + replayNormal(rs); + } + else { + replayFused(rs); + } + } + + + void replayNormal(ReplaySubscription rs) { + final Subscriber a = rs.actual(); + + int missed = 1; + + for (; ; ) { + + long r = rs.requested(); + + @SuppressWarnings("unchecked") T node = (T) rs.node(); + boolean produced = false; + + if (rs.isCancelled()) { + rs.node(null); + return; + } + + boolean d = done; + T next = value; + boolean empty = next == null; + + if (d && empty || d && node == next) { + rs.node(null); + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } + else { + a.onComplete(); + } + return; + } + + if (!empty && node != next) { + a.onNext(next); + rs.requestMore(rs.index() + 1); + produced = true; + node = next; + } + + if (r==1) { + if (rs.isCancelled()) { + rs.node(null); + return; + } + + if (done && value == null) { + rs.node(null); + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } + else { + a.onComplete(); + } + return; + } + } + + if (produced && r != Long.MAX_VALUE) { + rs.produced(1); + } + + rs.node(node); + + missed = rs.leave(missed); + if (missed == 0) { + break; + } + } + } + + + @Override + public boolean isDone() { + return done; + } + + @Override + public T poll(ReplaySubscription rs) { + @SuppressWarnings("unchecked") T node = (T) rs.node(); + T next = value; + if (node == null) { + node = next; + rs.node(node); + } + + if (next == null) { + return null; + } + rs.node(next); + rs.requestMore(rs.index() + 1); + + return next; + } + + @Override + public void clear(ReplaySubscription rs) { + rs.node(null); + } + + @Override + public boolean isEmpty(ReplaySubscription rs) { + @SuppressWarnings("unchecked") T node = (T) rs.node(); + if (node == null) { + node = value; + rs.node(node); + } + return node == null; + } + + @Override + public int size(ReplaySubscription rs) { + T val = value; + return rs.node() == val ? 0 : sizeOf(val); + } + + @Override + public int size() { + return sizeOf(value); + } + + private int sizeOf(T value) { + return value == null ? 0 : 1; + } + + @Override + public int capacity() { + return 1; + } + + @Override + public boolean isExpired() { + return false; + } + } + static final class UnboundedReplayBuffer implements ReplayBuffer { final int batchSize; @@ -1107,7 +1324,8 @@ ReplaySubscriber newState() { scheduler), this, history); } if (history != Integer.MAX_VALUE) { - return new ReplaySubscriber<>(new SizeBoundReplayBuffer<>(history), + ReplayBuffer buffer = history == 1 ? new SingletonReplayBuffer<>() : new SizeBoundReplayBuffer<>(history); + return new ReplaySubscriber<>(buffer, this, history); } From df167cbc587fe51fa4a80f144cbb2599fc35b148 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Fri, 24 Feb 2023 13:18:06 +0100 Subject: [PATCH 02/13] [issue 3340] SingletonReplayBuffer bindings add bindings to places where the SingletonReplayBuffer can be used in stead of the SizeBoundReplayBuffer --- .../java/reactor/core/publisher/SinkManyReplayProcessor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java index 9493e79738..a0c50da8fc 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinkManyReplayProcessor.java @@ -136,6 +136,9 @@ static SinkManyReplayProcessor create(int historySize, boolean unbounded) if (unbounded) { buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize); } + else if (historySize == 1) { + buffer = new FluxReplay.SingletonReplayBuffer<>(); + } else { buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize); } From b3e35b656bc416820edd9b4469b0bd75062de4fc Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Fri, 24 Feb 2023 13:18:42 +0100 Subject: [PATCH 03/13] [issue 3340] SingletonReplayLatest Benchmark to compare against the replaced implementation --- .../SinkManyReplayLatestBenchmark.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java diff --git a/benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java new file mode 100644 index 0000000000..3e36fcb41b --- /dev/null +++ b/benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java @@ -0,0 +1,46 @@ +package reactor.core.publisher; + +import org.openjdk.jmh.annotations.*; + +import java.util.concurrent.TimeUnit; + +@BenchmarkMode({Mode.AverageTime}) +@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class SinkManyReplayLatestBenchmark { + + + @Param({"1000", "10000"}) + int rangeSize; + + public static void main(String[] args) throws Exception { + reactor.core.scrabble.ShakespearePlaysScrabbleParallelOpt + s = new reactor.core.scrabble.ShakespearePlaysScrabbleParallelOpt(); + s.init(); + System.out.println(s.measureThroughput()); + } + + @Threads(1) + @Benchmark + public void measureLatestSingleton() { + Sinks.Many underTest = Sinks.many().replay().latest(); + Flux.range(0, rangeSize) + .doOnComplete(underTest::tryEmitComplete) + .subscribe(underTest::tryEmitNext); + underTest.asFlux().blockLast(); + } + + @Threads(1) + @Benchmark + public void measureSizeBoundLimit1() { + Sinks.Many underTest = new SinkManyReplayProcessor<>(new FluxReplay.SizeBoundReplayBuffer<>(1)); + Sinks.Many wrapper = new SinkManySerialized<>(underTest, (ContextHolder) underTest); + Flux.range(0, rangeSize) + .doOnComplete(wrapper::tryEmitComplete) + .subscribe(wrapper::tryEmitNext); + underTest.asFlux().blockLast(); + } +} From 5915981ef8dd4999299569e98c548b5e3cce7e58 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Sun, 26 Feb 2023 22:24:00 +0100 Subject: [PATCH 04/13] [issue 3340] Tests for Singleton 1 test fails --- .../core/publisher/FluxReplayTest.java | 127 +++++++++++++----- 1 file changed, 95 insertions(+), 32 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java index c29c94580f..a12969b3d3 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java @@ -33,6 +33,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -42,6 +44,7 @@ import reactor.core.Scannable; import reactor.core.scheduler.Schedulers; import reactor.test.MemoryUtils; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.scheduler.VirtualTimeScheduler; @@ -327,13 +330,13 @@ public void cacheFlux() { } - @Test + @ParameterizedTestWithName + @CsvSource(value = {"1", "2", "3", "4"}) @Tag("VirtualTime") - public void cacheFluxFused() { - + public void cacheFluxFused(int history) { Flux> source = Flux.just(1, 2, 3) .delayElements(Duration.ofMillis(1000)) - .replay() + .replay(history) .autoConnect() .elapsed(); @@ -345,14 +348,16 @@ public void cacheFluxFused() { .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3) .verifyComplete(); - StepVerifier.create(source) + StepVerifier.Step> verifier = StepVerifier.create(source) .expectFusion(Fuseable.ANY) - .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3))) - .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1) - .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2) - .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3) - .verifyComplete(); - + .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3))); + for (int i = Math.min(history, 3); i > 0; i--) { + int iCpy = 4 - i; + verifier.expectNextMatches( + t -> t.getT1() == 0 && t.getT2() == iCpy + ); + } + verifier.verifyComplete(); } @Test @@ -632,9 +637,10 @@ public void onlyInitialRequestWithLateUnboundedSubscriber() { ts.assertValueCount(8); //despite unbounded, as it was late it only sees the replay capacity } - @Test - public void cancel() { - ConnectableFlux replay = Sinks.many().unicast().onBackpressureBuffer().asFlux().replay(2); + @ParameterizedTestWithName + @CsvSource(value = {"1", "2"}) + public void cancel(int num) { + ConnectableFlux replay = Sinks.many().unicast().onBackpressureBuffer().asFlux().replay(num); replay.subscribe(v -> {}, e -> { throw Exceptions.propagate(e); }); @@ -727,10 +733,11 @@ public void cacheSingleSubscriberWithMultipleRequestsDoesntHang() { assertThat(listFromStream).hasSize(1000); } - @Test - public void cacheNotOverrunByMaxPrefetch() { + @ParameterizedTestWithName + @CsvSource(value = {"1", "5"}) + public void cacheNotOverrunByMaxPrefetch(int history) { Flux s = Flux.range(1, 30) - .cache(5); + .cache(history); StepVerifier.create(s, 10) .expectNextCount(10) @@ -739,14 +746,16 @@ public void cacheNotOverrunByMaxPrefetch() { .verifyComplete(); StepVerifier.create(s) - .expectNextCount(5) + .expectNextCount(history) .verifyComplete(); } - @Test - public void ifSubscribeBeforeConnectThenTrackFurtherRequests() { - ConnectableFlux connectableFlux = Flux.just(1L, 2L, 3L, 4L) - .replay(2); + @ParameterizedTestWithName + @CsvSource(value = {"1","2","3","4"}) + public void ifSubscribeBeforeConnectThenTrackFurtherRequests(int num) { + Long[] nums = {1L, 2L, 3L, 4L}; + ConnectableFlux connectableFlux = Flux.fromArray(nums) + .replay(num); StepVerifier.create(connectableFlux, 1) .expectSubscription() @@ -758,29 +767,83 @@ public void ifSubscribeBeforeConnectThenTrackFurtherRequests() { .expectComplete() .verify(Duration.ofSeconds(10)); - StepVerifier.create(connectableFlux, 1) - .expectNext(3L) - .thenRequest(10) - .expectNext(4L) - .expectComplete() + StepVerifier.Step verifier = StepVerifier.create(connectableFlux, 1) + .expectNext(nums[nums.length-num]) + .thenRequest(10); + for (int i=num-1; i > 0; i--) { + verifier = verifier.expectNext(nums[nums.length-i]); + } + verifier.expectComplete() .verify(Duration.ofSeconds(10)); } - @Test - public void ifNoSubscriptionBeforeConnectThenPrefetches() { + enum NoSubscribtionTestEnum { + replay1( + 1, + new Integer[]{24}, + new Long[]{1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L} + ), + replay2( + 2, + new Integer[]{23, 24}, + new Long[]{2L, 2L, 2L, 2L, 2L, 2L, 2L, 2L, 2L, 2L} + ), + replay3( + 3, + new Integer[]{22, 23, 24}, + new Long[]{3L, 3L, 3L, 3L, 3L, 3L, 3L, 3L, 3L} + ), + replay4( + 4, + new Integer[]{21, 22, 23, 24}, + new Long[]{4L, 3L, 3L, 3L, 3L, 3L, 3L, 3L, 3L} + ), + replay5( + 5, + new Integer[]{20, 21, 22, 23, 24}, + new Long[]{5L, 4L, 4L, 4L, 4L, 4L, 4L} + ), + replay6( + 6, + new Integer[]{19, 20, 21, 22, 23, 24}, + new Long[]{6L, 5L, 5L, 5L, 5L} + ), + replay7( + 7, + new Integer[]{18, 19, 20, 21, 22, 23, 24}, + new Long[]{7L, 6L, 6L, 6L, 6L} + ), + replay8( + 8, + new Integer[]{17, 18, 19, 20, 21, 22, 23, 24}, + new Long[]{8L, 6L, 6L, 6L, 6L} + ); + final int history; + final Integer[] expected; + final Long[] requestQueue; + NoSubscribtionTestEnum(int history, Integer[] expected, Long[] requestQueue) { + this.history = history; + this.expected = expected; + this.requestQueue = requestQueue; + } + } + + @ParameterizedTestWithName + @EnumSource(NoSubscribtionTestEnum.class) + public void ifNoSubscriptionBeforeConnectThenPrefetches(NoSubscribtionTestEnum testdata) { Queue totalRequested = new ArrayBlockingQueue<>(10); ConnectableFlux connectable = Flux.range(1, 24) .doOnRequest(totalRequested::offer) - .replay(8); + .replay(testdata.history); connectable.connect(); StepVerifier.create(connectable) - .expectNext(17, 18, 19, 20, 21, 22, 23, 24) + .expectNext(testdata.expected) .expectComplete() .verify(Duration.ofSeconds(10)); - assertThat(totalRequested).containsExactly(8L, 6L, 6L, 6L, 6L); + assertThat(totalRequested).containsExactly(testdata.requestQueue); } private static final class TwoRequestsSubscriber extends BaseSubscriber { From c7109b3c44e22e495f284e2a1c0bb2371cf99bef Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Sun, 26 Feb 2023 23:14:18 +0100 Subject: [PATCH 05/13] [issue 3340] Tests for Size and capacity --- .../core/publisher/FluxReplayTest.java | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java index a12969b3d3..89283c9014 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java @@ -23,6 +23,8 @@ import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.assertj.core.data.Offset; @@ -331,7 +333,7 @@ public void cacheFlux() { } @ParameterizedTestWithName - @CsvSource(value = {"1", "2", "3", "4"}) + @CsvSource(value = {"1", "2", "3", "4", "0x7fffffff"}) @Tag("VirtualTime") public void cacheFluxFused(int history) { Flux> source = Flux.just(1, 2, 3) @@ -869,4 +871,47 @@ protected void hookOnNext(Integer value) { } } + static final int UNSET = -1; + enum SizeAndCapacityTestData { + singleton( i -> + new FluxReplay.SingletonReplayBuffer<>(), + 1, 1), + sizeandtime( i -> + new FluxReplay.SizeAndTimeBoundReplayBuffer<>(i,Long.MAX_VALUE, Schedulers.single()), + UNSET, UNSET), + size( i -> + new FluxReplay.SizeBoundReplayBuffer<>(i), + UNSET, UNSET), + unbounded( i -> + new FluxReplay.UnboundedReplayBuffer<>(2), + Integer.MAX_VALUE, Integer.MAX_VALUE); + final Function> bufferClass; + final int maxSize; + final int capacity; + SizeAndCapacityTestData(Function> bufferClass, int maxSize, int capacity) { + this.bufferClass = bufferClass; + this.maxSize = maxSize; + this.capacity = capacity; + } + } + + @ParameterizedTestWithName + @EnumSource(SizeAndCapacityTestData.class) + public void sizeAndCapacityTest(SizeAndCapacityTestData testData) { + for (int input : new Integer[]{1, 2, 3, 10}) { + FluxReplay.ReplaySubscription rs = new FluxReplay.ReplayInner<>(null, null); + FluxReplay.ReplayBuffer buffer = testData.bufferClass.apply(input); + assertThat(buffer.capacity()).isEqualTo(testData.capacity == UNSET ? input : testData.capacity); + + assertThat(buffer.size()).isEqualTo(0); // no data added + assertThat(buffer.size(rs)).isEqualTo(0); // no data added + for (int i = 1; i < input; i++){ + buffer.add(0); + int expectedSize = testData.maxSize != UNSET ? testData.maxSize : i; + assertThat(buffer.size()).isEqualTo(Math.min(expectedSize, i)); + assertThat(buffer.size(rs)).isEqualTo(Math.min(expectedSize, i)); + } + } + + } } From 695150c66f99a4ef17731ee965b1ef2a30b40767 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Mon, 27 Feb 2023 21:53:47 +0100 Subject: [PATCH 06/13] [issue 3340] remove unused import --- .../src/test/java/reactor/core/publisher/FluxReplayTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java index 89283c9014..05b34d7fe9 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java @@ -24,7 +24,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.assertj.core.data.Offset; From ae5153e40b96efe4ef953c0a78cbbfa5b73be253 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Mon, 27 Feb 2023 22:30:27 +0100 Subject: [PATCH 07/13] [issue 3340] fix bug when passing same value in a row When the same value is passed two times in a row the prior implementation does not convey the second signal --- .../reactor/core/publisher/FluxReplay.java | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java index 9ca99aa4ab..739bba2f81 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java @@ -497,17 +497,31 @@ public void replay(ReplaySubscription rs) { } static final class SingletonReplayBuffer implements ReplayBuffer { - volatile T value = null; + + /** + * provides uniqueness to the value. + */ + private static final class Wrapper { + final T actual; + + private Wrapper(T actual) { + this.actual = actual; + } + } + @SuppressWarnings({"unchecked", "rawtypes", "ConstantConditions"}) + static final Wrapper EMPTY = new Wrapper(null); + @SuppressWarnings("unchecked") + volatile Wrapper value = (Wrapper) EMPTY; @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ACTUAL = - AtomicReferenceFieldUpdater.newUpdater(SingletonReplayBuffer.class, Object.class, "value"); + static final AtomicReferenceFieldUpdater ACTUAL = + AtomicReferenceFieldUpdater.newUpdater(SingletonReplayBuffer.class, Wrapper.class, "value"); private Throwable error; private volatile boolean done; @Override public void add(T value) { - ACTUAL.set(this, value); + ACTUAL.set(this, new Wrapper<>(value)); } @Override @@ -585,7 +599,7 @@ void replayNormal(ReplaySubscription rs) { long r = rs.requested(); - @SuppressWarnings("unchecked") T node = (T) rs.node(); + @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); boolean produced = false; if (rs.isCancelled()) { @@ -594,8 +608,8 @@ void replayNormal(ReplaySubscription rs) { } boolean d = done; - T next = value; - boolean empty = next == null; + Wrapper next = value; + boolean empty = next == EMPTY; if (d && empty || d && node == next) { rs.node(null); @@ -610,7 +624,7 @@ void replayNormal(ReplaySubscription rs) { } if (!empty && node != next) { - a.onNext(next); + a.onNext(next.actual); rs.requestMore(rs.index() + 1); produced = true; node = next; @@ -656,20 +670,20 @@ public boolean isDone() { @Override public T poll(ReplaySubscription rs) { - @SuppressWarnings("unchecked") T node = (T) rs.node(); - T next = value; + @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); + Wrapper next = value; if (node == null) { node = next; rs.node(node); } - if (next == null) { + if (next == EMPTY) { return null; } rs.node(next); rs.requestMore(rs.index() + 1); - return next; + return next.actual; } @Override @@ -679,7 +693,7 @@ public void clear(ReplaySubscription rs) { @Override public boolean isEmpty(ReplaySubscription rs) { - @SuppressWarnings("unchecked") T node = (T) rs.node(); + @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); if (node == null) { node = value; rs.node(node); @@ -689,7 +703,7 @@ public boolean isEmpty(ReplaySubscription rs) { @Override public int size(ReplaySubscription rs) { - T val = value; + Wrapper val = value; return rs.node() == val ? 0 : sizeOf(val); } @@ -698,8 +712,8 @@ public int size() { return sizeOf(value); } - private int sizeOf(T value) { - return value == null ? 0 : 1; + private int sizeOf(Wrapper value) { + return value == EMPTY ? 0 : 1; } @Override From 55baba2056067a57f822af94ff54422b333113ef Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Wed, 22 Mar 2023 09:18:08 +0100 Subject: [PATCH 08/13] [issue 3340] fix a bug FUSED still not working --- .../src/main/java/reactor/core/publisher/FluxReplay.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java index 739bba2f81..e096ee1b2d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java @@ -672,7 +672,7 @@ public boolean isDone() { public T poll(ReplaySubscription rs) { @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); Wrapper next = value; - if (node == null) { + if (node == null || node == EMPTY) { node = next; rs.node(node); } @@ -698,12 +698,12 @@ public boolean isEmpty(ReplaySubscription rs) { node = value; rs.node(node); } - return node == null; + return node == EMPTY && value == EMPTY; } @Override public int size(ReplaySubscription rs) { - Wrapper val = value; + Wrapper val = value; return rs.node() == val ? 0 : sizeOf(val); } From b8734d43c2ebe40d72aadc2d6a9c95b8a1183b9b Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Wed, 22 Mar 2023 14:42:51 +0100 Subject: [PATCH 09/13] [issue 3340] AtomicStampedImpl --- .../reactor/core/publisher/FluxReplay.java | 84 ++++++------------- .../core/publisher/ReplayProcessor.java | 3 + 2 files changed, 30 insertions(+), 57 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java index e096ee1b2d..8d88a9c1fd 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicStampedReference; import java.util.function.Consumer; import java.util.stream.Stream; @@ -497,31 +498,14 @@ public void replay(ReplaySubscription rs) { } static final class SingletonReplayBuffer implements ReplayBuffer { - - /** - * provides uniqueness to the value. - */ - private static final class Wrapper { - final T actual; - - private Wrapper(T actual) { - this.actual = actual; - } - } - @SuppressWarnings({"unchecked", "rawtypes", "ConstantConditions"}) - static final Wrapper EMPTY = new Wrapper(null); - @SuppressWarnings("unchecked") - volatile Wrapper value = (Wrapper) EMPTY; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ACTUAL = - AtomicReferenceFieldUpdater.newUpdater(SingletonReplayBuffer.class, Wrapper.class, "value"); - + final AtomicStampedReference val = new AtomicStampedReference<>(null, 0); private Throwable error; private volatile boolean done; @Override public void add(T value) { - ACTUAL.set(this, new Wrapper<>(value)); + int stamp = val.getStamp(); + val.set(value, stamp + 1); } @Override @@ -549,7 +533,6 @@ void replayFused(ReplaySubscription rs) { for (; ; ) { if (rs.isCancelled()) { - rs.node(null); return; } @@ -599,20 +582,19 @@ void replayNormal(ReplaySubscription rs) { long r = rs.requested(); - @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); + int idx = rs.index(); boolean produced = false; if (rs.isCancelled()) { - rs.node(null); return; } boolean d = done; - Wrapper next = value; - boolean empty = next == EMPTY; + int[] stamp = new int[1]; + T next = val.get(stamp); + boolean empty = isEmpty(rs); - if (d && empty || d && node == next) { - rs.node(null); + if (d && empty) { Throwable ex = error; if (ex != null) { a.onError(ex); @@ -623,21 +605,18 @@ void replayNormal(ReplaySubscription rs) { return; } - if (!empty && node != next) { - a.onNext(next.actual); - rs.requestMore(rs.index() + 1); + if (!empty && idx != stamp[0]) { + a.onNext(next); + rs.requestMore(stamp[0]); produced = true; - node = next; } if (r==1) { if (rs.isCancelled()) { - rs.node(null); return; } - if (done && value == null) { - rs.node(null); + if (done && isEmpty(rs)) { Throwable ex = error; if (ex != null) { a.onError(ex); @@ -653,8 +632,6 @@ void replayNormal(ReplaySubscription rs) { rs.produced(1); } - rs.node(node); - missed = rs.leave(missed); if (missed == 0) { break; @@ -670,20 +647,16 @@ public boolean isDone() { @Override public T poll(ReplaySubscription rs) { - @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); - Wrapper next = value; - if (node == null || node == EMPTY) { - node = next; - rs.node(node); - } + int idx = rs.index(); + int[] stamp = new int[1]; + T stampedVal = val.get(stamp); - if (next == EMPTY) { + if (idx == stamp[0] || stamp[0] == 0) { return null; } - rs.node(next); - rs.requestMore(rs.index() + 1); + rs.requestMore(stamp[0]); - return next.actual; + return stampedVal; } @Override @@ -693,27 +666,24 @@ public void clear(ReplaySubscription rs) { @Override public boolean isEmpty(ReplaySubscription rs) { - @SuppressWarnings("unchecked") Wrapper node = (Wrapper) rs.node(); - if (node == null) { - node = value; - rs.node(node); - } - return node == EMPTY && value == EMPTY; + int idx = rs.index(); + int stamp = val.getStamp(); + return stamp == 0 || idx == stamp; } @Override public int size(ReplaySubscription rs) { - Wrapper val = value; - return rs.node() == val ? 0 : sizeOf(val); + int valIdx = val.getStamp(); + return rs.index() == valIdx ? 0 : sizeOf(valIdx); } @Override public int size() { - return sizeOf(value); + return sizeOf(val.getStamp()); } - private int sizeOf(Wrapper value) { - return value == EMPTY ? 0 : 1; + private int sizeOf(int value) { + return value == 0 ? 0 : 1; } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java index 39b1daaf5b..58d4a928c7 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ReplayProcessor.java @@ -154,6 +154,9 @@ public static ReplayProcessor create(int historySize, boolean unbounded) if (unbounded) { buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize); } + else if(historySize == 1) { + buffer = new FluxReplay.SingletonReplayBuffer<>(); + } else { buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize); } From 3eb08cede0e0865d7de1f01134b3b35a41639261 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Wed, 22 Mar 2023 14:53:45 +0100 Subject: [PATCH 10/13] [issue 3340] remove test --- .../SinkManyReplayLatestBenchmark.java | 46 ------------------- 1 file changed, 46 deletions(-) delete mode 100644 benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java diff --git a/benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java deleted file mode 100644 index 3e36fcb41b..0000000000 --- a/benchmarks/src/main/java/reactor/core/publisher/SinkManyReplayLatestBenchmark.java +++ /dev/null @@ -1,46 +0,0 @@ -package reactor.core.publisher; - -import org.openjdk.jmh.annotations.*; - -import java.util.concurrent.TimeUnit; - -@BenchmarkMode({Mode.AverageTime}) -@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) -@Fork(value = 1) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Benchmark) -public class SinkManyReplayLatestBenchmark { - - - @Param({"1000", "10000"}) - int rangeSize; - - public static void main(String[] args) throws Exception { - reactor.core.scrabble.ShakespearePlaysScrabbleParallelOpt - s = new reactor.core.scrabble.ShakespearePlaysScrabbleParallelOpt(); - s.init(); - System.out.println(s.measureThroughput()); - } - - @Threads(1) - @Benchmark - public void measureLatestSingleton() { - Sinks.Many underTest = Sinks.many().replay().latest(); - Flux.range(0, rangeSize) - .doOnComplete(underTest::tryEmitComplete) - .subscribe(underTest::tryEmitNext); - underTest.asFlux().blockLast(); - } - - @Threads(1) - @Benchmark - public void measureSizeBoundLimit1() { - Sinks.Many underTest = new SinkManyReplayProcessor<>(new FluxReplay.SizeBoundReplayBuffer<>(1)); - Sinks.Many wrapper = new SinkManySerialized<>(underTest, (ContextHolder) underTest); - Flux.range(0, rangeSize) - .doOnComplete(wrapper::tryEmitComplete) - .subscribe(wrapper::tryEmitNext); - underTest.asFlux().blockLast(); - } -} From b9e298f07de93291d9a0f6e5505af6811aa30b18 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Thu, 23 Mar 2023 12:29:46 +0100 Subject: [PATCH 11/13] [issue/3340] add stress test --- .../SinksManyReplayLatestStressTest.java | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java new file mode 100644 index 0000000000..8f759545ba --- /dev/null +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java @@ -0,0 +1,136 @@ +package reactor.core.publisher; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; +import org.openjdk.jcstress.infra.results.I_Result; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +public class SinksManyReplayLatestStressTest { + final StressSubscriber target = new StressSubscriber<>(); + + final SinkManyReplayProcessor sink = new SinkManyReplayProcessor<>( + new FluxReplay.SingletonReplayBuffer<>() + ); + + @JCStressTest + @Outcome(id = {"1"}, expect = ACCEPTABLE, desc = "single emission") + @Outcome(id = {"2"}, expect = ACCEPTABLE, desc = "two emissions") + @State + public static class TryEmitNextStressTest extends SinksManyReplayLatestStressTest { + + @Actor + public void first() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void second() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void subscriber() { + sink.subscribe(target); + } + + @Arbiter + public void arbiter(I_Result r) { + r.r1 = target.onNextCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"6, 63"}, expect = ACCEPTABLE, desc = "six subscribers, 64-1 result") + @State + public static class SubscriberCountStressTest extends SinksManyReplayLatestStressTest { + + AtomicInteger count = new AtomicInteger(); + + @Actor + public void first() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void one() { + sink.subscribe(s -> count.addAndGet(1)); + } + + @Actor + public void two() { + sink.subscribe(s -> count.addAndGet(2)); + } + + @Actor + public void three() { + sink.subscribe(s -> count.addAndGet(4)); + } + + @Actor + public void four() { + sink.subscribe(s -> count.addAndGet(8)); + } + + @Actor + public void five() { + sink.subscribe(s -> count.addAndGet(16)); + } + + @Actor + public void six() { + sink.subscribe(s -> count.addAndGet(32)); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = sink.currentSubscriberCount(); + r.r2 = count.get(); + } + } + + + @JCStressTest + @Outcome(id = {"0, 0"}, expect = ACCEPTABLE, desc = "complete first") + @Outcome(id = {"0, 1"}, expect = ACCEPTABLE, desc = "subscriber 1 before complete") + @Outcome(id = {"0, 2"}, expect = ACCEPTABLE, desc = "subscriber 2 before complete") + @Outcome(id = {"0, 3"}, expect = ACCEPTABLE, desc = "both subscribe before complete") + @State + public static class SubscriberCountCompleteStressTest extends SinksManyReplayLatestStressTest { + + AtomicInteger count = new AtomicInteger(); + + @Actor + public void first() { + sink.tryEmitNext("Hello"); + } + + @Actor + public void completer() { + sink.tryEmitComplete(); + } + + + @Actor + public void one() { + sink.subscribe(s -> count.addAndGet(1)); + } + + @Actor + public void two() { + sink.subscribe(s -> count.addAndGet(2)); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = sink.currentSubscriberCount(); + r.r2 = count.get(); + } + } +} From f40f8b0d374674c60d82b7d815e70588bb772fe4 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Thu, 23 Mar 2023 20:43:51 +0100 Subject: [PATCH 12/13] [issue 3340] found one more discrete bug --- .../main/java/reactor/core/publisher/FluxReplay.java | 2 +- .../java/reactor/core/publisher/FluxReplayTest.java | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java index 8d88a9c1fd..631bb7e39b 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java @@ -605,7 +605,7 @@ void replayNormal(ReplaySubscription rs) { return; } - if (!empty && idx != stamp[0]) { + if (!empty && idx != stamp[0] && r != 0) { a.onNext(next); rs.requestMore(stamp[0]); produced = true; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java index 05b34d7fe9..8ee3d04aa7 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java @@ -26,6 +26,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.assertj.core.api.DoublePredicateAssert; import org.assertj.core.data.Offset; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -913,4 +914,15 @@ public void sizeAndCapacityTest(SizeAndCapacityTestData testData) { } } + + @ParameterizedTestWithName + @EnumSource(SizeAndCapacityTestData.class) + void testNoEmitOnRequestNone(SizeAndCapacityTestData cache) { + AtomicInteger outcome = new AtomicInteger(); + FluxReplay.ReplaySubscription rs = new FluxReplay.ReplayInner<>(null, null); + assertThat(rs.requested()).isEqualTo(0); + FluxReplay.ReplayBuffer buffer = cache.bufferClass.apply(1); + buffer.add(123); + buffer.replay(rs); + } } From b0ff5bcfcf8af32799f4663312c50d9b966942f5 Mon Sep 17 00:00:00 2001 From: Mikkel Juul Date: Sun, 26 Mar 2023 20:27:01 +0200 Subject: [PATCH 13/13] [issue 3340] rewrite test slightly --- .../java/reactor/core/publisher/FluxReplayTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java index 8ee3d04aa7..c58786cb43 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java @@ -58,6 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.fail; public class FluxReplayTest extends FluxOperatorTest { @@ -918,11 +919,17 @@ public void sizeAndCapacityTest(SizeAndCapacityTestData testData) { @ParameterizedTestWithName @EnumSource(SizeAndCapacityTestData.class) void testNoEmitOnRequestNone(SizeAndCapacityTestData cache) { - AtomicInteger outcome = new AtomicInteger(); FluxReplay.ReplaySubscription rs = new FluxReplay.ReplayInner<>(null, null); assertThat(rs.requested()).isEqualTo(0); FluxReplay.ReplayBuffer buffer = cache.bufferClass.apply(1); buffer.add(123); - buffer.replay(rs); + try { + buffer.replay(rs); + } catch (NullPointerException interaction) { + // this failure may become incorrect, + // as it relies on the ReplaySubscription implementation + fail("we did not expect an interaction with neither " + + "the parent nor the actual subscription"); + } } }