Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite concat operation to not block on subscribe #271

Merged
merged 1 commit into from
May 16, 2013

Conversation

Treora
Copy link
Contributor

@Treora Treora commented May 14, 2013

Hi,
While trying to stop the subscribe function from blocking, I ended up rewriting the most of the concat operation. From related issue discussions it appears the desired behaviour is still under debate, so I hope you agree on the chosen approach. See the explanation below.
Because practically it implements two asynchronous observers that interact with eachother, there are quite a few possible race conditions and other possibilities to break the Rx grammar, but I hope I fixed all of them.
Regards,

Gerben

The concat operator previously blocked on calling subscribe until all the
sequences had finished. In quite some cases this results in unwanted (and
unexpected) behaviour, such as when prefixing an infinite Observable
with a fixed one, for example when using startWith (which calls concat):
someInputStream.startWith(123).subscribe(x -> print(x));
This statement will block indefinitely if the input stream is infinite. Also
on finite sequences it seems silly to have to wait for them to finish.

In this new approach the incoming observables are put into a queue, instead
of waiting for the whole sequence to finish. When the first observable
completes, the next one is taken from the queue and subscribed to, and so
on. The queue can be extended while processing the observables, and
onCompleted is only called when both the source of observables has completed
and all observables in the queue have been read.

The concat operator previously blocked on calling subscribe until all the
sequences had finished. In quite some cases this results in unwanted (and
unexpected) behaviour, such as when prefixing an infinite Observable
with a fixed one, for example when using startWith (which calls concat):
someInputStream.startWith(123).subscribe(x -> print(x));
This statement will block indefinitely if the input stream is infinite. Also
on finite sequences it seems silly to have to wait for them to finish.

In this new approach the incoming observables are put into a queue, instead
of waiting for the whole sequence to finish. When the first observable
completes, the next one is taken from the queue and subscribed to, and so
on. The queue can be extended while processing the observables, and
onCompleted is only called when both the source of observables has completed
and all observables in the queue have been read.
@cloudbees-pull-request-builder

RxJava-pull-requests #143 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

That sounds like exactly what we want and discussed this morning at #270.

I don't have time to review right this minute but will do so this week as I'd like concat to be non-blocking for the 0.9 release.

Thank you!

@benjchristensen benjchristensen merged commit cfa7155 into ReactiveX:master May 16, 2013
@billyy
Copy link
Contributor

billyy commented May 16, 2013

Thanks for the fix. The code looks good and I have added new unit test to check for subscribe() returning before the Observable of Observable onComplete() is called.

@Treora Treora deleted the concat branch May 29, 2013 19:38
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
org.mockito.Matchers is deprecated, ArgumentMatchers should be used instead.
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…matchers

fix (ReactiveX#271): remove deprecated usage of Mockito Matchers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants