Skip to content

Commit

Permalink
Add take(int, boolean), prep for new default behavior in 3.5 (#2691)
Browse files Browse the repository at this point in the history
This commit introduces the `take(int, boolean)` operator, which can
explicitly behave either like current `take(int)` (with `limitRequest`
set to `false`) or like `limitRequest(int)` (with `limitRequest` set to
`true`).

`take(int)` becomes an alias for it, and `take(n, false)` is the new
canonical way of obtaining the current take behavior (unbounded upstream
request, cancels when enough elements have been emitted).

This is because the behavior of `take(n)` is scheduled to change in 3.5
to still cancel whenever enough elements have been emitted,
but limit the upstream request to what is strictly necessary. This is
the behavior currently obtained through `limitRequest(int)` but that
is confusing to most users, who expect `take` to behave like that
by default.

As a result, `limitRequest` is deprecated in favor of `take(n, true)`
for the remainder of the 3.4.x line, and will be ultimately
replaceable by `take(n)` in 3.5 and above.

It is expected that most usages should see the behavior change in 3.5
as transparent, but `take(int, boolean)` has the benefit of being
totally explicit... See #2690 for 3.5.0 planned changes.

Fixes #2339.
  • Loading branch information
simonbasle authored May 6, 2021
1 parent f383f4d commit dedac83
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 100 deletions.
3 changes: 2 additions & 1 deletion docs/asciidoc/apdx-operatorChoice.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ I want to deal with:

* I want to keep only a subset of the sequence:
** by taking N elements:
*** at the beginning of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-[Flux#take(long)]
*** at the beginning of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-[Flux#take(long, true)]
**** ...requesting an unbounded amount from upstream: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-[Flux#take(long, false)]
**** ...based on a duration: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-java.time.Duration-[Flux#take(Duration)]
**** ...only the first element, as a https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html[Mono]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--[Flux#next()]
**** ...using https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Subscription.html#request(long)[request(N)] rather than cancellation: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRequest-long-[Flux#limitRequest(long)]
Expand Down
73 changes: 59 additions & 14 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5899,15 +5899,16 @@ public final Flux<T> limitRate(int highTide, int lowTide) {
* Furthermore, ensure that the total amount requested upstream is capped at {@code n}.
* If n is zero, the source isn't even subscribed to and the operator completes immediately
* upon subscription.
* <img class="marble" src="doc-files/marbles/limitRequest.svg" alt="">
* <p>
* <img class="marble" src="doc-files/marbles/takeLimitRequestTrue.svg" alt="">
* <p>
* Backpressure signals from downstream subscribers are smaller than the cap are
* propagated as is, but if they would cause the total requested amount to go over the
* cap, they are reduced to the minimum value that doesn't go over.
* <p>
* As a result, this operator never let the upstream produce more elements than the
* cap, and it can be used as a stricter form of {@link #take(long)}. Typically useful
* for cases where a race between request and cancellation can lead the upstream to
* cap.
* Typically useful for cases where a race between request and cancellation can lead the upstream to
* producing a lot of extraneous data, and such a production is undesirable (e.g.
* a source that would send the extraneous data over the network).
*
Expand All @@ -5916,9 +5917,13 @@ public final Flux<T> limitRate(int highTide, int lowTide) {
*
* @return a {@link Flux} of {@code n} elements from the source, that requests AT MOST {@code n} from upstream in total.
* @see #take(long)
* @see #take(long, boolean)
* @deprecated replace with {@link #take(long, boolean) take(n, true)} in 3.4.x, then {@link #take(long)} in 3.5.0.
* To be removed in 3.6.0 at the earliest. See https://github.com/reactor/reactor-core/issues/2339
*/
@Deprecated
public final Flux<T> limitRequest(long n) {
return onAssembly(new FluxLimitRequest<>(this, n));
return take(n, true);
}

/**
Expand Down Expand Up @@ -8584,25 +8589,65 @@ public final Flux<T> tag(String key, String value) {

/**
* Take only the first N values from this {@link Flux}, if available.
* <p>
* If N is zero, the resulting {@link Flux} completes as soon as this {@link Flux}
* signals its first value (which is not not relayed, though).
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.
* <p>
* <img class="marble" src="doc-files/marbles/take.svg" alt="">
* <p>
* Note that this operator doesn't manipulate the backpressure requested amount.
* Rather, it merely lets requests from downstream propagate as is and cancels once
* N elements have been emitted. As a result, the source could produce a lot of
* extraneous elements in the meantime. If that behavior is undesirable and you do
* not own the request from downstream (e.g. prefetching operators), consider
* using {@link #limitRequest(long)} instead.
* <b>Warning:</b> The below behavior will change in 3.5.0 from that of
* {@link #take(long, boolean) take(n, false)} to that of {@link #take(long, boolean) take(n, true)}.
* See https://github.com/reactor/reactor-core/issues/2339
* <p>
* Note that this operator doesn't propagate the backpressure requested amount.
* Rather, it makes an unbounded request and cancels once N elements have been emitted.
* As a result, the source could produce a lot of extraneous elements in the meantime.
* If that behavior is undesirable and you do not own the request from downstream
* (e.g. prefetching operators), consider using {@link #limitRequest(long)} instead.
*
* @param n the number of items to emit from this {@link Flux}
*
* @return a {@link Flux} limited to size N
* @see #limitRequest(long)
* @see #take(long, boolean)
*/
public final Flux<T> take(long n) {
return take(n, false);
}

/**
* Take only the first N values from this {@link Flux}, if available.
* <p>
* <img class="marble" src="doc-files/marbles/takeLimitRequestTrue.svg" alt="">
* <p>
* If {@code limitRequest == true}, ensure that the total amount requested upstream is capped
* at {@code n}. In that configuration, this operator never let the upstream produce more elements
* than the cap, and it can be used to more strictly adhere to backpressure.
* If n is zero, the source isn't even subscribed to and the operator completes immediately
* upon subscription.
* <p>
* This mode is typically useful for cases where a race between request and cancellation can lead
* the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g.
* a source that would send the extraneous data over the network).
* <p>
* <img class="marble" src="doc-files/marbles/takeLimitRequestFalse.svg" alt="takeLimitRequestFalse">
* <p>
* If {@code limitRequest == false} this operator doesn't propagate the backpressure requested amount.
* Rather, it makes an unbounded request and cancels once N elements have been emitted.
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes
* (the behavior inherited from {@link #take(long)}).
* <p>
* In this mode, the source could produce a lot of extraneous elements despite cancellation.
* If that behavior is undesirable and you do not own the request from downstream
* (e.g. prefetching operators), consider using {@code limitRequest = true} instead.
*
* @param n the number of items to emit from this {@link Flux}
* @param limitRequest {@code true} to follow the downstream request more closely and limit the upstream request
* to {@code n}. {@code false} to request an unbounded amount from upstream.
*
* @return a {@link Flux} limited to size N
*/
public final Flux<T> take(long n, boolean limitRequest) {
if (limitRequest) {
return onAssembly(new FluxLimitRequest<>(this, n));
}
if (this instanceof Fuseable) {
return onAssembly(new FluxTakeFuseable<>(this, n));
}
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit dedac83

Please sign in to comment.