Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/3340 single buffer #3364

Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package reactor.core.publisher;

import org.openjdk.jmh.annotations.*;

import java.util.concurrent.TimeUnit;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class SinkManyReplayLatestBenchmark {


@Param({"1000", "10000"})
int rangeSize;

public static void main(String[] args) throws Exception {
reactor.core.scrabble.ShakespearePlaysScrabbleParallelOpt
s = new reactor.core.scrabble.ShakespearePlaysScrabbleParallelOpt();
s.init();
System.out.println(s.measureThroughput());
}
MikkelHJuul marked this conversation as resolved.
Show resolved Hide resolved

@Threads(1)
@Benchmark
public void measureLatestSingleton() {
Sinks.Many<Integer> underTest = Sinks.many().replay().latest();
Flux.range(0, rangeSize)
.doOnComplete(underTest::tryEmitComplete)
.subscribe(underTest::tryEmitNext);
underTest.asFlux().blockLast();
}

@Threads(1)
@Benchmark
public void measureSizeBoundLimit1() {
Sinks.Many<Integer> underTest = new SinkManyReplayProcessor<>(new FluxReplay.SizeBoundReplayBuffer<>(1));
Sinks.Many<Integer> wrapper = new SinkManySerialized<>(underTest, (ContextHolder) underTest);
Flux.range(0, rangeSize)
.doOnComplete(wrapper::tryEmitComplete)
.subscribe(wrapper::tryEmitNext);
underTest.asFlux().blockLast();
}
MikkelHJuul marked this conversation as resolved.
Show resolved Hide resolved
}
234 changes: 233 additions & 1 deletion reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,237 @@ public void replay(ReplaySubscription<T> rs) {
}
}

static final class SingletonReplayBuffer<T> implements ReplayBuffer<T> {

/**
* provides uniqueness to the value.
*/
private static final class Wrapper<T> {
final T actual;

private Wrapper(T actual) {
this.actual = actual;
}
}
MikkelHJuul marked this conversation as resolved.
Show resolved Hide resolved
@SuppressWarnings({"unchecked", "rawtypes", "ConstantConditions"})
static final Wrapper<?> EMPTY = new Wrapper(null);
@SuppressWarnings("unchecked")
volatile Wrapper<T> value = (Wrapper<T>) EMPTY;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<SingletonReplayBuffer, Wrapper> ACTUAL =
AtomicReferenceFieldUpdater.newUpdater(SingletonReplayBuffer.class, Wrapper.class, "value");

private Throwable error;
private volatile boolean done;

@Override
public void add(T value) {
ACTUAL.set(this, new Wrapper<>(value));
}

@Override
public void onError(Throwable ex) {
error = ex;
done = true;
}

@Override
public void onComplete() {
done = true;
}


@Override
public Throwable getError() {
return error;
}

void replayFused(ReplaySubscription<T> rs) {
int missed = 1;

final Subscriber<? super T> a = rs.actual();

for (; ; ) {

if (rs.isCancelled()) {
rs.node(null);
return;
}

boolean d = done;

a.onNext(null);

if (d) {
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}

missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}

@Override
public void replay(ReplaySubscription<T> rs) {
if (!rs.enter()) {
return;
}

if (rs.fusionMode() == NONE) {
replayNormal(rs);
}
else {
replayFused(rs);
}
}


void replayNormal(ReplaySubscription<T> rs) {
final Subscriber<? super T> a = rs.actual();

int missed = 1;

for (; ; ) {

long r = rs.requested();

@SuppressWarnings("unchecked") Wrapper<T> node = (Wrapper<T>) rs.node();
boolean produced = false;

if (rs.isCancelled()) {
rs.node(null);
return;
}

boolean d = done;
Wrapper<T> next = value;
boolean empty = next == EMPTY;

if (d && empty || d && node == next) {
rs.node(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}

if (!empty && node != next) {
a.onNext(next.actual);
rs.requestMore(rs.index() + 1);
produced = true;
node = next;
}

if (r==1) {
if (rs.isCancelled()) {
rs.node(null);
return;
}

if (done && value == null) {
rs.node(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
}
else {
a.onComplete();
}
return;
}
}

if (produced && r != Long.MAX_VALUE) {
rs.produced(1);
}

rs.node(node);

missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}


@Override
public boolean isDone() {
return done;
}

@Override
public T poll(ReplaySubscription<T> rs) {
@SuppressWarnings("unchecked") Wrapper<T> node = (Wrapper<T>) rs.node();
Wrapper<T> next = value;
if (node == null || node == EMPTY) {
node = next;
rs.node(node);
}

if (next == EMPTY) {
return null;
}
rs.node(next);
rs.requestMore(rs.index() + 1);

return next.actual;
}

@Override
public void clear(ReplaySubscription<T> rs) {
rs.node(null);
}

@Override
public boolean isEmpty(ReplaySubscription<T> rs) {
@SuppressWarnings("unchecked") Wrapper<T> node = (Wrapper<T>) rs.node();
if (node == null) {
node = value;
rs.node(node);
}
return node == EMPTY && value == EMPTY;
}

@Override
public int size(ReplaySubscription<T> rs) {
Wrapper<T> val = value;
return rs.node() == val ? 0 : sizeOf(val);
}

@Override
public int size() {
return sizeOf(value);
}

private int sizeOf(Wrapper<T> value) {
return value == EMPTY ? 0 : 1;
}

@Override
public int capacity() {
return 1;
}

@Override
public boolean isExpired() {
return false;
}
}

static final class UnboundedReplayBuffer<T> implements ReplayBuffer<T> {

final int batchSize;
Expand Down Expand Up @@ -1107,7 +1338,8 @@ ReplaySubscriber<T> newState() {
scheduler), this, history);
}
if (history != Integer.MAX_VALUE) {
return new ReplaySubscriber<>(new SizeBoundReplayBuffer<>(history),
ReplayBuffer<T> buffer = history == 1 ? new SingletonReplayBuffer<>() : new SizeBoundReplayBuffer<>(history);
return new ReplaySubscriber<>(buffer,
this,
history);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ static <E> SinkManyReplayProcessor<E> create(int historySize, boolean unbounded)
if (unbounded) {
buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize);
}
else if (historySize == 1) {
buffer = new FluxReplay.SingletonReplayBuffer<>();
}
else {
buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize);
}
Expand Down
Loading