Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concatMapIterable could be enhanced to discard iterator remainder #2014

Closed
simonbasle opened this issue Jan 14, 2020 · 6 comments
Closed

concatMapIterable could be enhanced to discard iterator remainder #2014

simonbasle opened this issue Jan 14, 2020 · 6 comments

Comments

@simonbasle
Copy link
Contributor

From #1925:

I can't verify because the step before bufferUntil is concatMapIterable which produces a collection of allocated items and those don't seem to pass through doOnDiscard in case of a downstream error. Could there be an issue with concatMapIterable?

Here is a simplified test:

@Test
void concatMapIterableDoOnDiscardTest() {

	Foo foo1 = new Foo();
	Foo foo2 = new Foo();
	Foo foo3 = new Foo();

	Flux<Foo> source = Flux.just(1)
			.concatMapIterable(i -> Arrays.asList(foo1, foo2, foo3))
			.doOnDiscard(Foo.class, Foo::release);

	StepVerifier.create(source)
			.consumeNextWith(foo -> {
				foo.release();
			})
			.thenCancel()
			.verify();

	assertThat(foo1.getRefCount()).isEqualTo(0); // okay
	assertThat(foo2.getRefCount()).isEqualTo(0); // fails
	assertThat(foo3.getRefCount()).isEqualTo(0); // fails
}

static class Foo {

	int refCount = 1;

	public int getRefCount() {
		return this.refCount;
	}

	public void release() {
		this.refCount = 0;
	}
}

Originally posted by @rstoyanchev in #1925 (comment)

@simonbasle
Copy link
Contributor Author

Currently concatMapIterable doesn't attempt to walk the remainder of the current iterator to discard further elements upon cancellation.

We could attempt to discard the iterator on top of the prefetched source elements.

This is tricky though, since an arbitrary Iterable/Iterator can be lazy and infinite...

In order to implement a best effort solution, we can retain the Class<Iterable> and inspect it, only attempting to discard the remainder of the Iterator if original iterable is either a Collection or Tuple2.

@bsideup
Copy link
Contributor

bsideup commented Jan 14, 2020

@simonbasle simonbasle added this to the 3.2.15.RELEASE milestone Jan 14, 2020
@simonbasle
Copy link
Contributor Author

the estimateSize() from Spliterator might be a perfect way of avoiding the infinite remainder case. We could switch the internal implementation from an Iterator-based one to a Spliterato-based one.

@rstoyanchev
Copy link
Contributor

What would that mean for the API? Currently it takes a mapper with Iterable.

@bsideup
Copy link
Contributor

bsideup commented Jan 16, 2020

Every Iterable can be converted into Spliterator:

https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html#spliterator--

simonbasle added a commit that referenced this issue Jan 23, 2020
…from iterator

If both an Iterator and a Spliterator can be generated for each of the
processed Iterables, then the Spliterator is used to ensure the Iterable
is SIZED. This allows us to safely assume we can iterate over the
remainder of the iterator when cancelling, in order to discard its
elements that weren't emitted.

Not doing this check would likely cause trouble with infinite discarding
loops in the case of infinite Iterables (which is technically possible).

For Streams, since both the iterator() and spliterator() methods are
terminating the Stream we only generate the Spliterator. We use it to
check SIZED and then wrap it in an Iterator adapter for iteration (which
is what BaseStream does by default).

Note that using a Spliterator to drive the internal iteration doesn't
work that well, since the default Iterable#spliterator isn't SIZED and
its estimatedSize() method doesn't behave like hasNext().
Iterator#hasNext is far better suited for looking ahead of the emitted
element to trigger onComplete immediately after the last onNext.
@simonbasle
Copy link
Contributor Author

See more details in #2021, but going Spliterator the whole way doesn't work terribly well. For instance, the default Iterable#spliterator() creates a Spliterator over the Iterator, but its estimateSize() doesn't reflect hasNext() == false (always Long.MAX_VALUE), which is a test that we absolutely need for correct termination of the Flux.

An approach mixing the Iterator consumption of the iterable with a peek at the Spliterator#getExactSizeIfKnown() works better, at least the later won't give us false positive on which iterator we can "drain" and discard in case of cancellation.

simonbasle added a commit that referenced this issue Feb 7, 2020
In this change, the goal is to discard elements of the Iterable that
haven't been processed yet. The challenge is to avoid attempting doing
so for _infinite_ Iterables (which would lead to infinite discarding
loops).

If the Iterable is a Collection, it should be finite.

If both an Iterator and a Spliterator can be generated for each of the
processed Iterables, then the Spliterator is used to ensure the Iterable
is SIZED. This allows us to safely assume we can iterate over the
remainder of the iterator when cancelling, in order to discard its
elements that weren't emitted.

For Streams, since both the iterator() and spliterator() methods are
terminating the Stream we only generate the Spliterator. We use it to
check SIZED and then wrap it in an Iterator adapter for iteration (which
is what BaseStream does by default).

Implementation Notes
----
We didn't fully switch to using a Spliterator to drive the internal
iteration. It doesn't work that well, since the Iterable#spliterator
default implementation isn't SIZED and its estimatedSize() method does
not behave like hasNext().
Iterator#hasNext is far better suited for looking ahead of the emitted
element to trigger onComplete immediately after the last onNext.
simonbasle added a commit that referenced this issue Feb 7, 2020
simonbasle added a commit to OlegDokuka/reactor-core that referenced this issue Apr 24, 2020
This commit improves the javadoc from reactor#2014:
 - missed adding the javadoc discard tags to `fromIterable` and
 `fromStream`
 - align javadocs of `concatMapIterable` and `flatMapIterable` since
 they are aliases

It also improves the wording and clarifies that `flatMapIterable` and
`fromIterable` discard support can lead to multiple `iterator()` calls.

Fixes reactor#2127
simonbasle added a commit that referenced this issue Apr 27, 2020
This commit improves the javadoc from #2014:
 - missed adding the javadoc discard tags to `fromIterable` and
 `fromStream`
 - align javadocs of `concatMapIterable` and `flatMapIterable` since
 they are aliases

It also improves the wording and clarifies that `flatMapIterable` and
`fromIterable` discard support can lead to multiple `iterator()` calls.

Fixes #2127
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants