Skip to content

Commit

Permalink
1.x: javac 9 compatible fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 3, 2016
1 parent e804172 commit 34110d5
Show file tree
Hide file tree
Showing 14 changed files with 33 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,8 @@ public static <T> Observable<T> from(Future<? extends T> future, long timeout, T
*/
public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) {
// TODO in a future revision the Scheduler will become important because we'll start polling instead of blocking on the Future
return create(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
Observable<T> o = create(OnSubscribeToObservableFuture.toObservableFuture(future));
return o.subscribeOn(scheduler);
}

/**
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ public static <T> Observable<T> merge(Single<? extends T> t1, Single<? extends T
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <T1, T2, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1]);
Expand Down Expand Up @@ -980,7 +980,7 @@ public R call(Object... args) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <T1, T2, T3, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, final Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2]);
Expand Down Expand Up @@ -1013,7 +1013,7 @@ public R call(Object... args) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <T1, T2, T3, T4, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4, final Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3, s4}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3, s4}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2], (T4) args[3]);
Expand Down Expand Up @@ -1048,7 +1048,7 @@ public R call(Object... args) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <T1, T2, T3, T4, T5, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4, Single<? extends T5> s5, final Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3, s4, s5}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3, s4, s5}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2], (T4) args[3], (T5) args[4]);
Expand Down Expand Up @@ -1086,7 +1086,7 @@ public R call(Object... args) {
*/
public static <T1, T2, T3, T4, T5, T6, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4, Single<? extends T5> s5, Single<? extends T6> s6,
final Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3, s4, s5, s6}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3, s4, s5, s6}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2], (T4) args[3], (T5) args[4], (T6) args[5]);
Expand Down Expand Up @@ -1126,7 +1126,7 @@ public R call(Object... args) {
*/
public static <T1, T2, T3, T4, T5, T6, T7, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4, Single<? extends T5> s5, Single<? extends T6> s6, Single<? extends T7> s7,
final Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3, s4, s5, s6, s7}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3, s4, s5, s6, s7}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2], (T4) args[3], (T5) args[4], (T6) args[5], (T7) args[6]);
Expand Down Expand Up @@ -1168,7 +1168,7 @@ public R call(Object... args) {
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4, Single<? extends T5> s5, Single<? extends T6> s6, Single<? extends T7> s7, Single<? extends T8> s8,
final Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3, s4, s5, s6, s7, s8}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3, s4, s5, s6, s7, s8}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2], (T4) args[3], (T5) args[4], (T6) args[5], (T7) args[6], (T8) args[7]);
Expand Down Expand Up @@ -1212,7 +1212,7 @@ public R call(Object... args) {
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4, Single<? extends T5> s5, Single<? extends T6> s6, Single<? extends T7> s7, Single<? extends T8> s8,
Single<? extends T9> s9, final Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return SingleOperatorZip.zip(new Single<?>[] {s1, s2, s3, s4, s5, s6, s7, s8, s9}, new FuncN<R>() {
return SingleOperatorZip.zip(new Single[] {s1, s2, s3, s4, s5, s6, s7, s8, s9}, new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2], (T4) args[3], (T5) args[4], (T6) args[5], (T7) args[6], (T8) args[7], (T9) args[8]);
Expand Down Expand Up @@ -1242,7 +1242,8 @@ public R call(Object... args) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? extends R> zipFunction) {
return SingleOperatorZip.zip(iterableToArray(singles), zipFunction);
Single[] iterableToArray = iterableToArray(singles);
return SingleOperatorZip.zip(iterableToArray, zipFunction);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <T> Iterable<T> latest(final Observable<? extends T> source) {
@Override
public Iterator<T> iterator() {
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
source.materialize().subscribe(lio);
((Observable<T>)source).materialize().subscribe(lio);
return lio;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Iterator<T> iterator() {
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);
((Observable<T>)source).subscribe(mostRecentObserver);

return mostRecentObserver.getIterable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private boolean moveToNext() {
started = true;
// if not started, start now
observer.setWaiting(1);
items.materialize().subscribe(observer);
((Observable<T>)items).materialize().subscribe(observer);
}

Notification<? extends T> nextNotification = observer.takeNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static <T> Future<T> toFuture(Observable<? extends T> that) {
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

final Subscription s = that.single().subscribe(new Subscriber<T>() {
final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {

@Override
public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
SubscriberIterator<T> subscriber = new SubscriberIterator<T>();

// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
source.materialize().subscribe(subscriber);
((Observable<T>)source).materialize().subscribe(subscriber);
return subscriber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import rx.subscriptions.SerialSubscription;

public final class CompletableOnSubscribeConcat implements CompletableOnSubscribe {
final Observable<? extends Completable> sources;
final Observable<Completable> sources;
final int prefetch;

public CompletableOnSubscribeConcat(Observable<? extends Completable> sources, int prefetch) {
this.sources = sources;
this.sources = (Observable<Completable>)sources;
this.prefetch = prefetch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import rx.subscriptions.CompositeSubscription;

public final class CompletableOnSubscribeMerge implements CompletableOnSubscribe {
final Observable<? extends Completable> source;
final Observable<Completable> source;
final int maxConcurrency;
final boolean delayErrors;

public CompletableOnSubscribeMerge(Observable<? extends Completable> source, int maxConcurrency, boolean delayErrors) {
this.source = source;
this.source = (Observable<Completable>)source;
this.maxConcurrency = maxConcurrency;
this.delayErrors = delayErrors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void subscribe(Observable<? extends T>[] sources) {
if (cancelled) {
return;
}
sources[i].subscribe(as[i]);
((Observable<T>)sources[i]).subscribe(as[i]);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,6 @@ public void onCompleted() {
sub = subscription;
}
if (sub != null)
source.subscribe(sub);
((Observable<T>)source).subscribe(sub);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ public void onCompleted() {

serial.set(otherSubscriber);

other.subscribe(otherSubscriber);
((Observable<Object>)other).subscribe(otherSubscriber);
}
}
7 changes: 4 additions & 3 deletions src/main/java/rx/observables/AsyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,13 @@ public void onCompleted() {
};
subscriptions.add(s);

t.doOnTerminate(new Action0() {
Observable<? extends T> doOnTerminate = t.doOnTerminate(new Action0() {
@Override
public void call() {
subscriptions.remove(s);
}})
.subscribe(s);
}});

((Observable<T>)doOnTerminate).subscribe(s);

merger.onNext(buffer);
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void forEach(final Action1<? super T> onNext) {
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
* (see http://reactivex.io/documentation/contract.html) as this is the final subscribe in the chain.
*/
Subscription subscription = o.subscribe(new Subscriber<T>() {
Subscription subscription = ((Observable<T>)o).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
Expand Down Expand Up @@ -430,7 +430,7 @@ private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);

Subscription subscription = observable.subscribe(new Subscriber<T>() {
Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
Expand Down Expand Up @@ -467,7 +467,7 @@ public void onNext(final T item) {
public void subscribe() {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] error = { null };
Subscription s = o.subscribe(new Subscriber<T>() {
Subscription s = ((Observable<T>)o).subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {

Expand Down Expand Up @@ -504,7 +504,7 @@ public void subscribe(Observer<? super T> observer) {
final NotificationLite<T> nl = NotificationLite.instance();
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();

Subscription s = o.subscribe(new Subscriber<T>() {
Subscription s = ((Observable<T>)o).subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {
queue.offer(nl.next(t));
Expand Down Expand Up @@ -592,7 +592,7 @@ public void call() {
}
}));

o.subscribe(s);
((Observable<T>)o).subscribe(s);

try {
for (;;) {
Expand Down

0 comments on commit 34110d5

Please sign in to comment.