Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add onDropped callback for throttleWithTimeout - #7458 #7510

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8930,7 +8930,57 @@ public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit) {
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler));
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, null));
}

/**
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission.
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
* will be emitted by the resulting {@code Flowable}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.s.v3.png" alt="">
* <p>
* Delivery of the item after the grace period happens on the given {@code Scheduler}'s
* {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
* {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
* (yielding an {@code InterruptedException}). It is recommended processing items
* that may take long time to be moved to another thread via {@link #observeOn} applied after
* {@code debounce} itself.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the time each item has to be "the most recent" of those emitted by the current {@code Flowable} to
* ensure that it's not dropped
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}

/**
Expand Down Expand Up @@ -17587,6 +17637,47 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
return debounce(timeout, unit, scheduler);
}

/**
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
* will be emitted by the resulting {@code Flowable}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.s.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the length of the window of time that must pass after the emission of an item from the current
* {@code Flowable} in which it emits no items in order for the item to be emitted by the
* resulting {@code Flowable}
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #debounce(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
return debounce(timeout, unit, scheduler, onDropped);
}

/**
* Returns a {@code Flowable} that emits records of the time interval between consecutive items emitted by the
* current {@code Flowable}.
Expand Down
85 changes: 84 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7896,7 +7896,53 @@ public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit) {
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, null));
}

/**
* Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the
* current {@code Observable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission.
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Observable} faster than the timeout then no items
* will be emitted by the resulting {@code Observable}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.s.v3.png" alt="">
* <p>
* Delivery of the item after the grace period happens on the given {@code Scheduler}'s
* {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
* {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
* (yielding an {@code InterruptedException}). It is recommended processing items
* that may take long time to be moved to another thread via {@link #observeOn} applied after
* {@code debounce} itself.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the time each item has to be "the most recent" of those emitted by the current {@code Observable} to
* ensure that it's not dropped
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}

/**
Expand Down Expand Up @@ -14597,6 +14643,43 @@ public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit u
return debounce(timeout, unit, scheduler);
}

/**
* Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the
* current {@code Observable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission (Alias to {@link #debounce(long, TimeUnit, Scheduler)}).
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Observable} faster than the timeout then no items
* will be emitted by the resulting {@code Observable}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.s.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the length of the window of time that must pass after the emission of an item from the current
* {@code Observable}, in which the current {@code Observable} emits no items, in order for the item to be emitted by the
* resulting {@code Observable}
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #debounce(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
return debounce(timeout, unit, scheduler, onDropped);
}

/**
* Returns an {@code Observable} that emits records of the time interval between consecutive items emitted by the
* current {@code Observable}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
Expand All @@ -32,19 +34,20 @@ public final class FlowableDebounceTimed<T> extends AbstractFlowableWithUpstream
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;

public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new DebounceTimedSubscriber<>(
new SerializedSubscriber<>(s),
timeout, unit, scheduler.createWorker()));
new SerializedSubscriber<>(s), timeout, unit, scheduler.createWorker(), onDropped));
}

static final class DebounceTimedSubscriber<T> extends AtomicLong
Expand All @@ -55,20 +58,22 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final Consumer<T> onDropped;

Subscription upstream;

Disposable timer;
DebounceEmitter<T> timer;

volatile long index;

boolean done;

DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.onDropped = onDropped;
}

@Override
Expand All @@ -93,6 +98,18 @@ public void onNext(T t) {
d.dispose();
}

if (onDropped != null && timer != null) {
try {
onDropped.accept(timer.value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
done = true;
downstream.onError(ex);
worker.dispose();
}
}

DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
timer = de;
d = worker.schedule(de, timeout, unit);
Expand Down Expand Up @@ -121,15 +138,13 @@ public void onComplete() {
}
done = true;

Disposable d = timer;
DebounceEmitter<T> d = timer;
if (d != null) {
d.dispose();
}

@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
if (de != null) {
de.emit();
if (d != null) {
d.emit();
}

downstream.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand All @@ -27,19 +29,20 @@ public final class ObservableDebounceTimed<T> extends AbstractObservableWithUpst
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;

public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DebounceTimedObserver<>(
new SerializedObserver<>(t),
timeout, unit, scheduler.createWorker()));
new SerializedObserver<>(t), timeout, unit, scheduler.createWorker(), onDropped));
}

static final class DebounceTimedObserver<T>
Expand All @@ -48,20 +51,22 @@ static final class DebounceTimedObserver<T>
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final Consumer<T> onDropped;

Disposable upstream;

Disposable timer;
DebounceEmitter<T> timer;

volatile long index;

boolean done;

DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.onDropped = onDropped;
}

@Override
Expand All @@ -85,6 +90,17 @@ public void onNext(T t) {
d.dispose();
}

if (onDropped != null && timer != null) {
try {
onDropped.accept(timer.value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
downstream.onError(ex);
done = true;
}
}

DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
timer = de;
d = worker.schedule(de, timeout, unit);
Expand Down Expand Up @@ -113,15 +129,13 @@ public void onComplete() {
}
done = true;

Disposable d = timer;
DebounceEmitter<T> d = timer;
if (d != null) {
d.dispose();
}

@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
if (de != null) {
de.run();
if (d != null) {
d.run();
}
downstream.onComplete();
worker.dispose();
Expand Down
Loading