Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add eager truncation to bounded replay() to avoid item retention #6532

Merged
merged 2 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 300 additions & 10 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

272 changes: 262 additions & 10 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,16 @@ public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable
return new ReplaySupplier<T>(parent);
}

public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize) {
return new BufferedReplaySupplier<T>(parent, bufferSize);
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, boolean eagerTruncate) {
return new BufferedReplaySupplier<T>(parent, bufferSize, eagerTruncate);
}

public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new BufferedTimedReplay<T>(parent, bufferSize, time, unit, scheduler);
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new BufferedTimedReplay<T>(parent, bufferSize, time, unit, scheduler, eagerTruncate);
}

public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new TimedReplay<T>(parent, time, unit, scheduler);
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new TimedReplay<T>(parent, time, unit, scheduler, eagerTruncate);
}

public static <T, R> Function<Flowable<T>, Publisher<R>> replayFunction(final Function<? super Flowable<T>, ? extends Publisher<R>> selector, final Scheduler scheduler) {
Expand Down Expand Up @@ -240,7 +240,8 @@ public static <T, R> Function<List<Publisher<? extends T>>, Publisher<? extends
}

static final class ReplaySupplier<T> implements Supplier<ConnectableFlowable<T>> {
private final Flowable<T> parent;

final Flowable<T> parent;

ReplaySupplier(Flowable<T> parent) {
this.parent = parent;
Expand All @@ -253,38 +254,46 @@ public ConnectableFlowable<T> get() {
}

static final class BufferedReplaySupplier<T> implements Supplier<ConnectableFlowable<T>> {
private final Flowable<T> parent;
private final int bufferSize;

BufferedReplaySupplier(Flowable<T> parent, int bufferSize) {
final Flowable<T> parent;

final int bufferSize;

final boolean eagerTruncate;

BufferedReplaySupplier(Flowable<T> parent, int bufferSize, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableFlowable<T> get() {
return parent.replay(bufferSize);
return parent.replay(bufferSize, eagerTruncate);
}
}

static final class BufferedTimedReplay<T> implements Supplier<ConnectableFlowable<T>> {
private final Flowable<T> parent;
private final int bufferSize;
private final long time;
private final TimeUnit unit;
private final Scheduler scheduler;
final Flowable<T> parent;
final int bufferSize;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

final boolean eagerTruncate;

BufferedTimedReplay(Flowable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
BufferedTimedReplay(Flowable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableFlowable<T> get() {
return parent.replay(bufferSize, time, unit, scheduler);
return parent.replay(bufferSize, time, unit, scheduler, eagerTruncate);
}
}

Expand All @@ -294,16 +303,19 @@ static final class TimedReplay<T> implements Supplier<ConnectableFlowable<T>> {
private final TimeUnit unit;
private final Scheduler scheduler;

TimedReplay(Flowable<T> parent, long time, TimeUnit unit, Scheduler scheduler) {
final boolean eagerTruncate;

TimedReplay(Flowable<T> parent, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableFlowable<T> get() {
return parent.replay(time, unit, scheduler);
return parent.replay(time, unit, scheduler, eagerTruncate);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ public static <T> ConnectableFlowable<T> createFrom(Flowable<? extends T> source
* @param <T> the value type
* @param source the source Flowable to use
* @param bufferSize the maximum number of elements to hold
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
* @return the new ConnectableObservable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
final int bufferSize) {
final int bufferSize, boolean eagerTruncate) {
if (bufferSize == Integer.MAX_VALUE) {
return createFrom(source);
}
return create(source, new ReplayBufferTask<T>(bufferSize));
return create(source, new ReplayBufferSupplier<T>(bufferSize, eagerTruncate));
}

/**
Expand All @@ -106,11 +107,12 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
* @param maxAge the maximum age of entries
* @param unit the unit of measure of the age amount
* @param scheduler the target scheduler providing the current time
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
* @return the new ConnectableObservable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
long maxAge, TimeUnit unit, Scheduler scheduler) {
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE);
long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE, eagerTruncate);
}

/**
Expand All @@ -121,11 +123,12 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
* @param unit the unit of measure of the age amount
* @param scheduler the target scheduler providing the current time
* @param bufferSize the maximum number of elements to hold
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
* @return the new ConnectableFlowable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {
return create(source, new ScheduledReplayBufferTask<T>(bufferSize, maxAge, unit, scheduler));
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize, boolean eagerTruncate) {
return create(source, new ScheduledReplayBufferSupplier<T>(bufferSize, maxAge, unit, scheduler, eagerTruncate));
}

/**
Expand Down Expand Up @@ -731,12 +734,15 @@ static class BoundedReplayBuffer<T> extends AtomicReference<Node> implements Rep

private static final long serialVersionUID = 2346567790059478686L;

final boolean eagerTruncate;

Node tail;
int size;

long index;

BoundedReplayBuffer() {
BoundedReplayBuffer(boolean eagerTruncate) {
this.eagerTruncate = eagerTruncate;
Node n = new Node(null, 0);
tail = n;
set(n);
Expand Down Expand Up @@ -780,6 +786,15 @@ final void removeFirst() {
* @param n the Node instance to set as first
*/
final void setFirst(Node n) {
if (eagerTruncate) {
Node m = new Node(null, n.index);
Node nextNode = n.get();
if (nextNode == null) {
tail = m;
}
m.lazySet(nextNode);
n = m;
}
set(n);
}

Expand Down Expand Up @@ -962,7 +977,8 @@ static final class SizeBoundReplayBuffer<T> extends BoundedReplayBuffer<T> {
private static final long serialVersionUID = -5898283885385201806L;

final int limit;
SizeBoundReplayBuffer(int limit) {
SizeBoundReplayBuffer(int limit, boolean eagerTruncate) {
super(eagerTruncate);
this.limit = limit;
}

Expand All @@ -989,7 +1005,8 @@ static final class SizeAndTimeBoundReplayBuffer<T> extends BoundedReplayBuffer<T
final long maxAge;
final TimeUnit unit;
final int limit;
SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Scheduler scheduler) {
SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
super(eagerTruncate);
this.scheduler = scheduler;
this.limit = limit;
this.maxAge = maxAge;
Expand Down Expand Up @@ -1168,35 +1185,42 @@ protected void subscribeActual(Subscriber<? super T> s) {
}
}

static final class ReplayBufferTask<T> implements Supplier<ReplayBuffer<T>> {
private final int bufferSize;
static final class ReplayBufferSupplier<T> implements Supplier<ReplayBuffer<T>> {

ReplayBufferTask(int bufferSize) {
final int bufferSize;

final boolean eagerTruncate;

ReplayBufferSupplier(int bufferSize, boolean eagerTruncate) {
this.bufferSize = bufferSize;
this.eagerTruncate = eagerTruncate;
}

@Override
public ReplayBuffer<T> get() {
return new SizeBoundReplayBuffer<T>(bufferSize);
return new SizeBoundReplayBuffer<T>(bufferSize, eagerTruncate);
}
}

static final class ScheduledReplayBufferTask<T> implements Supplier<ReplayBuffer<T>> {
static final class ScheduledReplayBufferSupplier<T> implements Supplier<ReplayBuffer<T>> {
private final int bufferSize;
private final long maxAge;
private final TimeUnit unit;
private final Scheduler scheduler;

ScheduledReplayBufferTask(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
final boolean eagerTruncate;

ScheduledReplayBufferSupplier(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.bufferSize = bufferSize;
this.maxAge = maxAge;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ReplayBuffer<T> get() {
return new SizeAndTimeBoundReplayBuffer<T>(bufferSize, maxAge, unit, scheduler);
return new SizeAndTimeBoundReplayBuffer<T>(bufferSize, maxAge, unit, scheduler, eagerTruncate);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,16 @@ public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observ
return new ReplaySupplier<T>(parent);
}

public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize) {
return new BufferedReplaySupplier<T>(parent, bufferSize);
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, boolean eagerTruncate) {
return new BufferedReplaySupplier<T>(parent, bufferSize, eagerTruncate);
}

public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new BufferedTimedReplaySupplier<T>(parent, bufferSize, time, unit, scheduler);
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new BufferedTimedReplaySupplier<T>(parent, bufferSize, time, unit, scheduler, eagerTruncate);
}

public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) {
return new TimedReplayCallable<T>(parent, time, unit, scheduler);
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
return new TimedReplayCallable<T>(parent, time, unit, scheduler, eagerTruncate);
}

public static <T, R> Function<Observable<T>, ObservableSource<R>> replayFunction(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler) {
Expand Down Expand Up @@ -250,57 +250,66 @@ public ConnectableObservable<T> get() {
}

static final class BufferedReplaySupplier<T> implements Supplier<ConnectableObservable<T>> {
private final Observable<T> parent;
private final int bufferSize;
final Observable<T> parent;
final int bufferSize;

final boolean eagerTruncate;

BufferedReplaySupplier(Observable<T> parent, int bufferSize) {
BufferedReplaySupplier(Observable<T> parent, int bufferSize, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableObservable<T> get() {
return parent.replay(bufferSize);
return parent.replay(bufferSize, eagerTruncate);
}
}

static final class BufferedTimedReplaySupplier<T> implements Supplier<ConnectableObservable<T>> {
private final Observable<T> parent;
private final int bufferSize;
private final long time;
private final TimeUnit unit;
private final Scheduler scheduler;
final Observable<T> parent;
final int bufferSize;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

BufferedTimedReplaySupplier(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
final boolean eagerTruncate;

BufferedTimedReplaySupplier(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.bufferSize = bufferSize;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableObservable<T> get() {
return parent.replay(bufferSize, time, unit, scheduler);
return parent.replay(bufferSize, time, unit, scheduler, eagerTruncate);
}
}

static final class TimedReplayCallable<T> implements Supplier<ConnectableObservable<T>> {
private final Observable<T> parent;
private final long time;
private final TimeUnit unit;
private final Scheduler scheduler;
final Observable<T> parent;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

final boolean eagerTruncate;

TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler) {
TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
this.parent = parent;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.eagerTruncate = eagerTruncate;
}

@Override
public ConnectableObservable<T> get() {
return parent.replay(time, unit, scheduler);
return parent.replay(time, unit, scheduler, eagerTruncate);
}
}

Expand Down
Loading