Skip to content

Commit

Permalink
[doc] Polish doc of Iterable operators, clarify iterator() calls
Browse files Browse the repository at this point in the history
This commit improves the javadoc from reactor#2014:
 - missed adding the javadoc discard tags to `fromIterable` and
 `fromStream`
 - align javadocs of `concatMapIterable` and `flatMapIterable` since
 they are aliases

It also improves the wording and clarifies that `flatMapIterable` and
`fromIterable` discard support can lead to multiple `iterator()` calls.

Fixes reactor#2127
  • Loading branch information
simonbasle committed Apr 24, 2020
1 parent 48ed884 commit c88fe5b
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -979,10 +979,16 @@ public static <T> Flux<T> fromArray(T[] array) {

/**
* Create a {@link Flux} that emits the items contained in the provided {@link Iterable}.
* A new iterator will be created for each subscriber.
* The {@link Iterable#iterator()} method will be invoked at least once and at most twice
* for each subscriber (see onDiscard information below).
* <p>
* <img class="marble" src="doc-files/marbles/fromIterable.svg" alt="">
*
* @reactor.discard Upon cancellation, this operator attempts to discard remainder of the
* {@link Iterable} if it can safely ensure the iterator is not infinite (see {@link Spliterator#getExactSizeIfKnown()}).
* Note that this means the {@link Iterable#iterator()} method could be invoked twice.
* This second invocation is skipped on a {@link Collection} however, which is assumed to be finite.
*
* @param it the {@link Iterable} to read data from
* @param <T> The type of values in the source {@link Iterable} and resulting Flux
*
Expand All @@ -1001,6 +1007,10 @@ public static <T> Flux<T> fromIterable(Iterable<? extends T> it) {
* <p>
* <img class="marble" src="doc-files/marbles/fromStream.svg" alt="">
*
* @reactor.discard Upon cancellation, this operator attempts to discard remainder of the
* {@link Stream} through its open {@link Spliterator}, if it can safely ensure it is not infinite
* (see {@link Spliterator#getExactSizeIfKnown()}).
*
* @param s the {@link Stream} to read data from
* @param <T> The type of values in the source {@link Stream} and resulting Flux
*
Expand All @@ -1019,6 +1029,10 @@ public static <T> Flux<T> fromStream(Stream<? extends T> s) {
* <p>
* <img class="marble" src="doc-files/marbles/fromStream.svg" alt="">
*
* @reactor.discard Upon cancellation, this operator attempts to discard remainder of the
* {@link Stream} through its open {@link Spliterator}, if it can safely ensure it is not infinite
* (see {@link Spliterator#getExactSizeIfKnown()}).
*
* @param streamSupplier the {@link Supplier} that generates the {@link Stream} from
* which to read data
* @param <T> The type of values in the source {@link Stream} and resulting Flux
Expand Down Expand Up @@ -3790,7 +3804,15 @@ public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publi
*
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
* safely assume the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
* safely ensure the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
* Note that this means each {@link Iterable}'s {@link Iterable#iterator()} method could be invoked twice.
* This second invocation is skipped on a {@link Collection} however, which are assumed to be finite.
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param <R> the merged output sequence type
Expand All @@ -3815,7 +3837,15 @@ public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterabl
*
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
* safely assume the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
* safely ensure the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
* Note that this means each {@link Iterable}'s {@link Iterable#iterator()} method could be invoked twice.
* This second invocation is skipped on a {@link Collection} however, which are assumed to be finite.
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param prefetch the maximum in-flight elements from each inner {@link Iterable} sequence
Expand Down Expand Up @@ -5071,17 +5101,19 @@ public final <R> Flux<R> flatMap(
*
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
* safely assume iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param <R> the merged output sequence type
* safely ensure the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
* Note that this means each {@link Iterable}'s {@link Iterable#iterator()} method could be invoked twice.
* This second invocation is skipped on a {@link Collection} however, which are assumed to be finite.
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param <R> the merged output sequence type
* @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
*/
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
Expand All @@ -5103,18 +5135,19 @@ public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<
*
* @reactor.discard Upon cancellation, this operator discards {@code T} elements it prefetched and, in
* some cases, attempts to discard remainder of the currently processed {@link Iterable} (if it can
* safely assume the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param prefetch the maximum in-flight elements from each inner {@link Iterable} sequence
* @param <R> the merged output sequence type
* safely ensure the iterator is not infinite, see {@link Spliterator#getExactSizeIfKnown()}).
* Note that this means each {@link Iterable}'s {@link Iterable#iterator()} method could be invoked twice.
* This second invocation is skipped on a {@link Collection} however, which are assumed to be finite.
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param prefetch the maximum in-flight elements from each inner {@link Iterable} sequence
* @param <R> the merged output sequence type
* @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
*/
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
Expand Down

0 comments on commit c88fe5b

Please sign in to comment.