Skip to content

Commit

Permalink
Merge pull request ReactiveX#206 from thegeez/list-subscribe
Browse files Browse the repository at this point in the history
Observable.toList breaks with multiple subscribers
  • Loading branch information
benjchristensen committed Mar 27, 2013
2 parents 1922d7a + 882b228 commit 6a082aa
Showing 1 changed file with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public static <T> Func1<Observer<List<T>>, Subscription> toObservableList(Observ
private static class ToObservableList<T> implements Func1<Observer<List<T>>, Subscription> {

private final Observable<T> that;
final ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<T>();

public ToObservableList(Observable<T> that) {
this.that = that;
Expand All @@ -49,6 +48,7 @@ public ToObservableList(Observable<T> that) {
public Subscription call(final Observer<List<T>> observer) {

return that.subscribe(new Observer<T>() {
final ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<T>();
public void onNext(T value) {
// onNext can be concurrently executed so list must be thread-safe
list.add(value);
Expand Down Expand Up @@ -94,5 +94,28 @@ public void testList() {
verify(aObserver, Mockito.never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testListMultipleObservers() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<List<String>> observable = Observable.create(toObservableList(w));

@SuppressWarnings("unchecked")
Observer<List<String>> o1 = mock(Observer.class);
observable.subscribe(o1);

Observer<List<String>> o2 = mock(Observer.class);
observable.subscribe(o2);

List<String> expected = Arrays.asList("one", "two", "three");

verify(o1, times(1)).onNext(expected);
verify(o1, Mockito.never()).onError(any(Exception.class));
verify(o1, times(1)).onCompleted();

verify(o2, times(1)).onNext(expected);
verify(o2, Mockito.never()).onError(any(Exception.class));
verify(o2, times(1)).onCompleted();
}
}
}

0 comments on commit 6a082aa

Please sign in to comment.