Skip to content

Commit db7d3cb

Browse files
committed
fix #2014 Discard concatMapIterable/fromIterable's remainder on Cancel
In this change, the goal is to discard elements of the Iterable that haven't been processed yet. The challenge is to avoid attempting doing so for _infinite_ Iterables (which would lead to infinite discarding loops). If the Iterable is a Collection, it should be finite. If both an Iterator and a Spliterator can be generated for each of the processed Iterables, then the Spliterator is used to ensure the Iterable is SIZED. This allows us to safely assume we can iterate over the remainder of the iterator when cancelling, in order to discard its elements that weren't emitted. For Streams, since both the iterator() and spliterator() methods are terminating the Stream we only generate the Spliterator. We use it to check SIZED and then wrap it in an Iterator adapter for iteration (which is what BaseStream does by default). Implementation Notes ---- We didn't fully switch to using a Spliterator to drive the internal iteration. It doesn't work that well, since the Iterable#spliterator default implementation isn't SIZED and its estimatedSize() method does not behave like hasNext(). Iterator#hasNext is far better suited for looking ahead of the emitted element to trigger onComplete immediately after the last onNext.
1 parent c9c0ae0 commit db7d3cb

File tree

9 files changed

+413
-83
lines changed

9 files changed

+413
-83
lines changed

reactor-core/src/main/java/reactor/core/publisher/Flux.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Objects;
3131
import java.util.Queue;
3232
import java.util.Set;
33+
import java.util.Spliterator;
3334
import java.util.concurrent.Callable;
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.TimeoutException;
@@ -3654,7 +3655,9 @@ public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publi
36543655
* Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
36553656
* improvement for users that explore the API with the concat vs flatMap expectation.
36563657
*
3657-
* @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
3658+
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
3659+
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
3660+
* safely assume the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
36583661
*
36593662
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
36603663
* @param <R> the merged output sequence type
@@ -3677,7 +3680,9 @@ public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterabl
36773680
* Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
36783681
* improvement for users that explore the API with the concat vs flatMap expectation.
36793682
*
3680-
* @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
3683+
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
3684+
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
3685+
* safely assume the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
36813686
*
36823687
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
36833688
* @param prefetch the maximum in-flight elements from each inner {@link Iterable} sequence
@@ -4925,7 +4930,9 @@ public final <R> Flux<R> flatMap(
49254930
* Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
49264931
* improvement for users that explore the API with the concat vs flatMap expectation.
49274932
*
4928-
* @reactor.discard This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
4933+
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
4934+
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
4935+
* safely assume iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
49294936
*
49304937
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
49314938
* @param <R> the merged output sequence type
@@ -4949,7 +4956,9 @@ public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<
49494956
* Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
49504957
* improvement for users that explore the API with the concat vs flatMap expectation.
49514958
*
4952-
* @reactor.discard This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
4959+
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
4960+
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
4961+
* safely assume the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
49534962
*
49544963
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
49554964
* @param prefetch the maximum in-flight elements from each inner {@link Iterable} sequence

0 commit comments

Comments
 (0)