Skip to content

Commit

Permalink
3.x: Add eager truncation to bounded replay() to avoid item retention (
Browse files Browse the repository at this point in the history
…#6532)

* 3.x: Add eager truncation to bounded replay() to avoid item retention

* Those eager tests are in their separate files already
  • Loading branch information
akarnokd authored Jun 24, 2019
1 parent 46b4ac8 commit b71a1d9
Show file tree
Hide file tree
Showing 11 changed files with 5,083 additions and 102 deletions.
310 changes: 300 additions & 10 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

272 changes: 262 additions & 10 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,16 @@ public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable
return new ReplaySupplier<T>(parent);
}

public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize) {
return new BufferedReplaySupplier<T>(parent, bufferSize);
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, boolean eagerTruncate) {
return new BufferedReplaySupplier<T>(parent, bufferSize, eagerTruncate);
}

public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new BufferedTimedReplay<T>(parent, bufferSize, time, unit, scheduler);
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new BufferedTimedReplay<T>(parent, bufferSize, time, unit, scheduler, eagerTruncate);
}

public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new TimedReplay<T>(parent, time, unit, scheduler);
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new TimedReplay<T>(parent, time, unit, scheduler, eagerTruncate);
}

public static <T, R> Function<Flowable<T>, Publisher<R>> replayFunction(final Function<? super Flowable<T>, ? extends Publisher<R>> selector, final Scheduler scheduler) {
Expand Down Expand Up @@ -240,7 +240,8 @@ public static <T, R> Function<List<Publisher<? extends T>>, Publisher<? extends
}

static final class ReplaySupplier<T> implements Supplier<ConnectableFlowable<T>> {
private final Flowable<T> parent;

final Flowable<T> parent;

ReplaySupplier(Flowable<T> parent) {
this.parent = parent;
Expand All @@ -253,38 +254,46 @@ public ConnectableFlowable<T> get() {
}

static final class BufferedReplaySupplier<T> implements Supplier<ConnectableFlowable<T>> {
private final Flowable<T> parent;
private final int bufferSize;

BufferedReplaySupplier(Flowable<T> parent, int bufferSize) {
final Flowable<T> parent;

final int bufferSize;

final boolean eagerTruncate;

BufferedReplaySupplier(Flowable<T> parent, int bufferSize, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableFlowable<T> get() {
return parent.replay(bufferSize);
return parent.replay(bufferSize, eagerTruncate);
}
}

static final class BufferedTimedReplay<T> implements Supplier<ConnectableFlowable<T>> {
private final Flowable<T> parent;
private final int bufferSize;
private final long time;
private final TimeUnit unit;
private final Scheduler scheduler;
final Flowable<T> parent;
final int bufferSize;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

final boolean eagerTruncate;

BufferedTimedReplay(Flowable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
BufferedTimedReplay(Flowable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableFlowable<T> get() {
return parent.replay(bufferSize, time, unit, scheduler);
return parent.replay(bufferSize, time, unit, scheduler, eagerTruncate);
}
}

Expand All @@ -294,16 +303,19 @@ static final class TimedReplay<T> implements Supplier<ConnectableFlowable<T>> {
private final TimeUnit unit;
private final Scheduler scheduler;

TimedReplay(Flowable<T> parent, long time, TimeUnit unit, Scheduler scheduler) {
final boolean eagerTruncate;

TimedReplay(Flowable<T> parent, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableFlowable<T> get() {
return parent.replay(time, unit, scheduler);
return parent.replay(time, unit, scheduler, eagerTruncate);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ public static <T> ConnectableFlowable<T> createFrom(Flowable<? extends T> source
* @param <T> the value type
* @param source the source Flowable to use
* @param bufferSize the maximum number of elements to hold
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
* @return the new ConnectableObservable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
final int bufferSize) {
final int bufferSize, boolean eagerTruncate) {
if (bufferSize == Integer.MAX_VALUE) {
return createFrom(source);
}
return create(source, new ReplayBufferTask<T>(bufferSize));
return create(source, new ReplayBufferSupplier<T>(bufferSize, eagerTruncate));
}

/**
Expand All @@ -106,11 +107,12 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
* @param maxAge the maximum age of entries
* @param unit the unit of measure of the age amount
* @param scheduler the target scheduler providing the current time
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
* @return the new ConnectableObservable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
long maxAge, TimeUnit unit, Scheduler scheduler) {
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE);
long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE, eagerTruncate);
}

/**
Expand All @@ -121,11 +123,12 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
* @param unit the unit of measure of the age amount
* @param scheduler the target scheduler providing the current time
* @param bufferSize the maximum number of elements to hold
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
* @return the new ConnectableFlowable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {
return create(source, new ScheduledReplayBufferTask<T>(bufferSize, maxAge, unit, scheduler));
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize, boolean eagerTruncate) {
return create(source, new ScheduledReplayBufferSupplier<T>(bufferSize, maxAge, unit, scheduler, eagerTruncate));
}

/**
Expand Down Expand Up @@ -731,12 +734,15 @@ static class BoundedReplayBuffer<T> extends AtomicReference<Node> implements Rep

private static final long serialVersionUID = 2346567790059478686L;

final boolean eagerTruncate;

Node tail;
int size;

long index;

BoundedReplayBuffer() {
BoundedReplayBuffer(boolean eagerTruncate) {
this.eagerTruncate = eagerTruncate;
Node n = new Node(null, 0);
tail = n;
set(n);
Expand Down Expand Up @@ -780,6 +786,15 @@ final void removeFirst() {
* @param n the Node instance to set as first
*/
final void setFirst(Node n) {
if (eagerTruncate) {
Node m = new Node(null, n.index);
Node nextNode = n.get();
if (nextNode == null) {
tail = m;
}
m.lazySet(nextNode);
n = m;
}
set(n);
}

Expand Down Expand Up @@ -962,7 +977,8 @@ static final class SizeBoundReplayBuffer<T> extends BoundedReplayBuffer<T> {
private static final long serialVersionUID = -5898283885385201806L;

final int limit;
SizeBoundReplayBuffer(int limit) {
SizeBoundReplayBuffer(int limit, boolean eagerTruncate) {
super(eagerTruncate);
this.limit = limit;
}

Expand All @@ -989,7 +1005,8 @@ static final class SizeAndTimeBoundReplayBuffer<T> extends BoundedReplayBuffer<T
final long maxAge;
final TimeUnit unit;
final int limit;
SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Scheduler scheduler) {
SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
super(eagerTruncate);
this.scheduler = scheduler;
this.limit = limit;
this.maxAge = maxAge;
Expand Down Expand Up @@ -1168,35 +1185,42 @@ protected void subscribeActual(Subscriber<? super T> s) {
}
}

static final class ReplayBufferTask<T> implements Supplier<ReplayBuffer<T>> {
private final int bufferSize;
static final class ReplayBufferSupplier<T> implements Supplier<ReplayBuffer<T>> {

ReplayBufferTask(int bufferSize) {
final int bufferSize;

final boolean eagerTruncate;

ReplayBufferSupplier(int bufferSize, boolean eagerTruncate) {
this.bufferSize = bufferSize;
this.eagerTruncate = eagerTruncate;
}

@Override
public ReplayBuffer<T> get() {
return new SizeBoundReplayBuffer<T>(bufferSize);
return new SizeBoundReplayBuffer<T>(bufferSize, eagerTruncate);
}
}

static final class ScheduledReplayBufferTask<T> implements Supplier<ReplayBuffer<T>> {
static final class ScheduledReplayBufferSupplier<T> implements Supplier<ReplayBuffer<T>> {
private final int bufferSize;
private final long maxAge;
private final TimeUnit unit;
private final Scheduler scheduler;

ScheduledReplayBufferTask(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
final boolean eagerTruncate;

ScheduledReplayBufferSupplier(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.bufferSize = bufferSize;
this.maxAge = maxAge;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ReplayBuffer<T> get() {
return new SizeAndTimeBoundReplayBuffer<T>(bufferSize, maxAge, unit, scheduler);
return new SizeAndTimeBoundReplayBuffer<T>(bufferSize, maxAge, unit, scheduler, eagerTruncate);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,16 @@ public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observ
return new ReplaySupplier<T>(parent);
}

public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize) {
return new BufferedReplaySupplier<T>(parent, bufferSize);
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, boolean eagerTruncate) {
return new BufferedReplaySupplier<T>(parent, bufferSize, eagerTruncate);
}

public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new BufferedTimedReplaySupplier<T>(parent, bufferSize, time, unit, scheduler);
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new BufferedTimedReplaySupplier<T>(parent, bufferSize, time, unit, scheduler, eagerTruncate);
}

public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new TimedReplayCallable<T>(parent, time, unit, scheduler);
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new TimedReplayCallable<T>(parent, time, unit, scheduler, eagerTruncate);
}

public static <T, R> Function<Observable<T>, ObservableSource<R>> replayFunction(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler) {
Expand Down Expand Up @@ -250,57 +250,66 @@ public ConnectableObservable<T> get() {
}

static final class BufferedReplaySupplier<T> implements Supplier<ConnectableObservable<T>> {
private final Observable<T> parent;
private final int bufferSize;
final Observable<T> parent;
final int bufferSize;

final boolean eagerTruncate;

BufferedReplaySupplier(Observable<T> parent, int bufferSize) {
BufferedReplaySupplier(Observable<T> parent, int bufferSize, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableObservable<T> get() {
return parent.replay(bufferSize);
return parent.replay(bufferSize, eagerTruncate);
}
}

static final class BufferedTimedReplaySupplier<T> implements Supplier<ConnectableObservable<T>> {
private final Observable<T> parent;
private final int bufferSize;
private final long time;
private final TimeUnit unit;
private final Scheduler scheduler;
final Observable<T> parent;
final int bufferSize;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

BufferedTimedReplaySupplier(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
final boolean eagerTruncate;

BufferedTimedReplaySupplier(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableObservable<T> get() {
return parent.replay(bufferSize, time, unit, scheduler);
return parent.replay(bufferSize, time, unit, scheduler, eagerTruncate);
}
}

static final class TimedReplayCallable<T> implements Supplier<ConnectableObservable<T>> {
private final Observable<T> parent;
private final long time;
private final TimeUnit unit;
private final Scheduler scheduler;
final Observable<T> parent;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

final boolean eagerTruncate;

TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler) {
TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableObservable<T> get() {
return parent.replay(time, unit, scheduler);
return parent.replay(time, unit, scheduler, eagerTruncate);
}
}

Expand Down
Loading

0 comments on commit b71a1d9

Please sign in to comment.