Skip to content

Commit 7a62c2d

Browse files
coeuvrecopybara-github
authored andcommitted
Remote: Add interoperability between Rx and ListenableFuture.
PiperOrigin-RevId: 360611295
1 parent 2cb919f commit 7a62c2d

File tree

5 files changed

+481
-1
lines changed

5 files changed

+481
-1
lines changed

src/main/java/com/google/devtools/build/lib/remote/util/BUILD

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ java_library(
1919
"//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
2020
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info",
2121
"//src/main/java/com/google/devtools/build/lib/authandtls",
22-
"//src/main/java/com/google/devtools/build/lib/concurrent",
2322
"//src/main/java/com/google/devtools/build/lib/remote:ExecutionStatusException",
2423
"//src/main/java/com/google/devtools/build/lib/remote/common",
2524
"//src/main/java/com/google/devtools/build/lib/remote/options",
@@ -28,6 +27,7 @@ java_library(
2827
"//src/main/protobuf:failure_details_java_proto",
2928
"//third_party:guava",
3029
"//third_party:jsr305",
30+
"//third_party:rxjava3",
3131
"//third_party/grpc:grpc-jar",
3232
"//third_party/protobuf:protobuf_java",
3333
"//third_party/protobuf:protobuf_java_util",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright 2021 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.util;
15+
16+
import static com.google.common.base.Preconditions.checkState;
17+
18+
import com.google.common.util.concurrent.AbstractFuture;
19+
import com.google.common.util.concurrent.FutureCallback;
20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import io.reactivex.rxjava3.annotations.NonNull;
23+
import io.reactivex.rxjava3.core.Completable;
24+
import io.reactivex.rxjava3.core.CompletableEmitter;
25+
import io.reactivex.rxjava3.core.CompletableObserver;
26+
import io.reactivex.rxjava3.core.CompletableOnSubscribe;
27+
import io.reactivex.rxjava3.disposables.Disposable;
28+
import io.reactivex.rxjava3.exceptions.Exceptions;
29+
import java.util.concurrent.Callable;
30+
import java.util.concurrent.CancellationException;
31+
import java.util.concurrent.Executor;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import javax.annotation.Nullable;
35+
36+
/** Methods for interoperating between Rx and ListenableFuture. */
37+
public class RxFutures {
38+
39+
private RxFutures() {}
40+
41+
/**
42+
* Returns a {@link Completable} that is complete once the supplied {@link ListenableFuture} has
43+
* completed.
44+
*
45+
* <p>A {@link ListenableFuture>} represents some computation that is already in progress. We use
46+
* {@link Callable} here to defer the execution of the thing that produces ListenableFuture until
47+
* there is subscriber.
48+
*
49+
* <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple
50+
* subscriptions are not allowed.
51+
*
52+
* <p>Disposes the Completable to cancel the underlying ListenableFuture.
53+
*/
54+
public static Completable toCompletable(
55+
Callable<ListenableFuture<Void>> callable, Executor executor) {
56+
return Completable.create(new OnceCompletableOnSubscribe(callable, executor));
57+
}
58+
59+
private static class OnceCompletableOnSubscribe implements CompletableOnSubscribe {
60+
private final AtomicBoolean subscribed = new AtomicBoolean(false);
61+
62+
private final Callable<ListenableFuture<Void>> callable;
63+
private final Executor executor;
64+
65+
private OnceCompletableOnSubscribe(
66+
Callable<ListenableFuture<Void>> callable, Executor executor) {
67+
this.callable = callable;
68+
this.executor = executor;
69+
}
70+
71+
@Override
72+
public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {
73+
try {
74+
checkState(!subscribed.getAndSet(true), "This completable cannot be subscribed to twice");
75+
ListenableFuture<Void> future = callable.call();
76+
Futures.addCallback(
77+
future,
78+
new FutureCallback<Void>() {
79+
@Override
80+
public void onSuccess(@Nullable Void t) {
81+
emitter.onComplete();
82+
}
83+
84+
@Override
85+
public void onFailure(Throwable throwable) {
86+
/*
87+
* CancellationException can be thrown in two cases:
88+
* 1. The ListenableFuture itself is cancelled.
89+
* 2. Completable is disposed by downstream.
90+
*
91+
* This check is used to prevent propagating CancellationException to downstream
92+
* when it has already disposed the Completable.
93+
*/
94+
if (throwable instanceof CancellationException && emitter.isDisposed()) {
95+
return;
96+
}
97+
98+
emitter.onError(throwable);
99+
}
100+
},
101+
executor);
102+
emitter.setCancellable(() -> future.cancel(true));
103+
} catch (Throwable t) {
104+
// We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer
105+
// to pass RuntimeExceptions and Errors down to the subscriber except for certain
106+
// "fatal" exceptions.
107+
Exceptions.throwIfFatal(t);
108+
executor.execute(() -> emitter.onError(t));
109+
}
110+
}
111+
}
112+
113+
/**
114+
* Returns a {@link ListenableFuture} that is complete once the {@link Completable} has completed.
115+
*
116+
* <p>Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to
117+
* the {@link Completable} will automatically be cancelled.
118+
*/
119+
public static ListenableFuture<Void> toListenableFuture(Completable completable) {
120+
CompletableFuture future = new CompletableFuture();
121+
completable.subscribe(
122+
new CompletableObserver() {
123+
@Override
124+
public void onSubscribe(Disposable d) {
125+
future.setCancelCallback(d);
126+
}
127+
128+
@Override
129+
public void onComplete() {
130+
// Making the Completable as complete.
131+
future.set(null);
132+
}
133+
134+
@Override
135+
public void onError(Throwable e) {
136+
future.setException(e);
137+
}
138+
});
139+
return future;
140+
}
141+
142+
private static final class CompletableFuture extends AbstractFuture<Void> {
143+
private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>();
144+
145+
private void setCancelCallback(Disposable cancelCallback) {
146+
this.cancelCallback.set(cancelCallback);
147+
// Just in case it was already canceled before we set the callback.
148+
doCancelIfCancelled();
149+
}
150+
151+
private void doCancelIfCancelled() {
152+
if (isCancelled()) {
153+
Disposable callback = cancelCallback.getAndSet(null);
154+
if (callback != null) {
155+
callback.dispose();
156+
}
157+
}
158+
}
159+
160+
@Override
161+
protected void afterDone() {
162+
doCancelIfCancelled();
163+
}
164+
165+
// Allow set to be called by other members.
166+
@Override
167+
protected boolean set(@Nullable Void t) {
168+
return super.set(t);
169+
}
170+
171+
// Allow setException to be called by other members.
172+
@Override
173+
protected boolean setException(Throwable throwable) {
174+
return super.setException(throwable);
175+
}
176+
}
177+
}

src/test/java/com/google/devtools/build/lib/remote/util/BUILD

+4
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@ java_library(
2323
"//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
2424
"//src/main/java/com/google/devtools/build/lib/remote",
2525
"//src/main/java/com/google/devtools/build/lib/remote/common",
26+
"//src/main/java/com/google/devtools/build/lib/remote/util",
2627
"//src/main/java/com/google/devtools/build/lib/util/io",
2728
"//src/main/java/com/google/devtools/build/lib/vfs",
2829
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
2930
"//src/test/java/com/google/devtools/build/lib/actions/util",
3031
"//third_party:guava",
32+
"//third_party:junit4",
33+
"//third_party:rxjava3",
34+
"//third_party:truth",
3135
"//third_party/protobuf:protobuf_java",
3236
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
3337
],

0 commit comments

Comments
 (0)