-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix #2014 Cancel discards concatMapIterable/fromIterable's remainder … #2021
Conversation
Codecov Report
@@ Coverage Diff @@
## 3.2.x #2021 +/- ##
============================================
+ Coverage 84.61% 84.73% +0.12%
- Complexity 3967 3981 +14
============================================
Files 364 364
Lines 30178 30260 +82
Branches 5612 5614 +2
============================================
+ Hits 25536 25642 +106
+ Misses 3032 3010 -22
+ Partials 1610 1608 -2
Continue to review full report at Codecov.
|
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following block seems to be duplicated many times:
Context context = actual.currentContext();
Operators.onDiscardQueueWithClear(queue, context, null);
Operators.onDiscardMultiple(it, itFinite, context);
WDYT about extracting it as well?
Not as obvious an improvement as the methods can make sense separately. |
@simonbasle we don't need to remove them, but "group". It feels weird to always call two "onDiscard" methods, feels like it should be a single method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I noticed while reviewing is that Operators#onDiscard*
methods put try-catch
around the entire loop. So if the discard action throws an error for one item, the rest will not be discarded. Should those keep going to try to discard as many as possible?
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java
Outdated
Show resolved
Hide resolved
Indeed, that is another avenue of improvement. I'll introduce a separate commit, so I'll clean up the history of this PR to allow for a rebase and merge. |
@bsideup @rstoyanchev can you review this? should be the final state in terms of code, and once approved I'll locally squash the commits to get 2 separates commits (one that addresses the cancel, the other that makes doOnDiscard more resilient). |
In this change, the goal is to discard elements of the Iterable that haven't been processed yet. The challenge is to avoid attempting doing so for _infinite_ Iterables (which would lead to infinite discarding loops). If the Iterable is a Collection, it should be finite. If both an Iterator and a Spliterator can be generated for each of the processed Iterables, then the Spliterator is used to ensure the Iterable is SIZED. This allows us to safely assume we can iterate over the remainder of the iterator when cancelling, in order to discard its elements that weren't emitted. For Streams, since both the iterator() and spliterator() methods are terminating the Stream we only generate the Spliterator. We use it to check SIZED and then wrap it in an Iterator adapter for iteration (which is what BaseStream does by default). Implementation Notes ---- We didn't fully switch to using a Spliterator to drive the internal iteration. It doesn't work that well, since the Iterable#spliterator default implementation isn't SIZED and its estimatedSize() method does not behave like hasNext(). Iterator#hasNext is far better suited for looking ahead of the emitted element to trigger onComplete immediately after the last onNext.
This commit improves discard resiliency when dealing with queues, streams, collections and iterators. By introducing finer grained try/catch blocks, we ensure that failures around a single discarded element doesn't prevent discarding of further elements of the container.
812d265
to
f0dfc64
Compare
Rebased on top of 3.2.x and cleaned up the commits, will merge as soon as tests pass. |
@simonbasle this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
…from iterator
If both an Iterator and a Spliterator can be generated for each of the
processed Iterables, then the Spliterator is used to ensure the Iterable
is SIZED. This allows us to safely assume we can iterate over the
remainder of the iterator when cancelling, in order to discard its
elements that weren't emitted.
Not doing this check would likely cause trouble with infinite discarding
loops in the case of infinite Iterables (which is technically possible).
For Streams, since both the iterator() and spliterator() methods are
terminating the Stream we only generate the Spliterator. We use it to
check SIZED and then wrap it in an Iterator adapter for iteration (which
is what BaseStream does by default).
Note that using a Spliterator to drive the internal iteration doesn't
work that well, since the default Iterable#spliterator isn't SIZED and
its estimatedSize() method doesn't behave like hasNext().
Iterator#hasNext is far better suited for looking ahead of the emitted
element to trigger onComplete immediately after the last onNext.