From 7a62c2d4e27e398f440910c81eacc384f38ca8be Mon Sep 17 00:00:00 2001 From: Chi Wang Date: Wed, 3 Mar 2021 00:53:21 -0800 Subject: [PATCH] Remote: Add interoperability between Rx and ListenableFuture. PiperOrigin-RevId: 360611295 --- .../devtools/build/lib/remote/util/BUILD | 2 +- .../build/lib/remote/util/RxFutures.java | 177 ++++++++++++++ .../devtools/build/lib/remote/util/BUILD | 4 + .../build/lib/remote/util/RxFuturesTest.java | 230 ++++++++++++++++++ .../lib/remote/util/RxNoGlobalErrorsRule.java | 69 ++++++ 5 files changed, 481 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java create mode 100644 src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java create mode 100644 src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD index 6d91bc9f237439..22134fa44a79eb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD @@ -19,7 +19,6 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/authandtls", - "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/remote:ExecutionStatusException", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/options", @@ -28,6 +27,7 @@ java_library( "//src/main/protobuf:failure_details_java_proto", "//third_party:guava", "//third_party:jsr305", + "//third_party:rxjava3", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", "//third_party/protobuf:protobuf_java_util", diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java new file mode 100644 index 00000000000000..7eef8807b964d4 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java @@ -0,0 +1,177 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.util; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableEmitter; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.core.CompletableOnSubscribe; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +/** Methods for interoperating between Rx and ListenableFuture. */ +public class RxFutures { + + private RxFutures() {} + + /** + * Returns a {@link Completable} that is complete once the supplied {@link ListenableFuture} has + * completed. + * + *

A {@link ListenableFuture>} represents some computation that is already in progress. We use + * {@link Callable} here to defer the execution of the thing that produces ListenableFuture until + * there is subscriber. + * + *

Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple + * subscriptions are not allowed. + * + *

Disposes the Completable to cancel the underlying ListenableFuture. + */ + public static Completable toCompletable( + Callable> callable, Executor executor) { + return Completable.create(new OnceCompletableOnSubscribe(callable, executor)); + } + + private static class OnceCompletableOnSubscribe implements CompletableOnSubscribe { + private final AtomicBoolean subscribed = new AtomicBoolean(false); + + private final Callable> callable; + private final Executor executor; + + private OnceCompletableOnSubscribe( + Callable> callable, Executor executor) { + this.callable = callable; + this.executor = executor; + } + + @Override + public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable { + try { + checkState(!subscribed.getAndSet(true), "This completable cannot be subscribed to twice"); + ListenableFuture future = callable.call(); + Futures.addCallback( + future, + new FutureCallback() { + @Override + public void onSuccess(@Nullable Void t) { + emitter.onComplete(); + } + + @Override + public void onFailure(Throwable throwable) { + /* + * CancellationException can be thrown in two cases: + * 1. The ListenableFuture itself is cancelled. + * 2. Completable is disposed by downstream. + * + * This check is used to prevent propagating CancellationException to downstream + * when it has already disposed the Completable. + */ + if (throwable instanceof CancellationException && emitter.isDisposed()) { + return; + } + + emitter.onError(throwable); + } + }, + executor); + emitter.setCancellable(() -> future.cancel(true)); + } catch (Throwable t) { + // We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer + // to pass RuntimeExceptions and Errors down to the subscriber except for certain + // "fatal" exceptions. + Exceptions.throwIfFatal(t); + executor.execute(() -> emitter.onError(t)); + } + } + } + + /** + * Returns a {@link ListenableFuture} that is complete once the {@link Completable} has completed. + * + *

Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to + * the {@link Completable} will automatically be cancelled. + */ + public static ListenableFuture toListenableFuture(Completable completable) { + CompletableFuture future = new CompletableFuture(); + completable.subscribe( + new CompletableObserver() { + @Override + public void onSubscribe(Disposable d) { + future.setCancelCallback(d); + } + + @Override + public void onComplete() { + // Making the Completable as complete. + future.set(null); + } + + @Override + public void onError(Throwable e) { + future.setException(e); + } + }); + return future; + } + + private static final class CompletableFuture extends AbstractFuture { + private final AtomicReference cancelCallback = new AtomicReference<>(); + + private void setCancelCallback(Disposable cancelCallback) { + this.cancelCallback.set(cancelCallback); + // Just in case it was already canceled before we set the callback. + doCancelIfCancelled(); + } + + private void doCancelIfCancelled() { + if (isCancelled()) { + Disposable callback = cancelCallback.getAndSet(null); + if (callback != null) { + callback.dispose(); + } + } + } + + @Override + protected void afterDone() { + doCancelIfCancelled(); + } + + // Allow set to be called by other members. + @Override + protected boolean set(@Nullable Void t) { + return super.set(t); + } + + // Allow setException to be called by other members. + @Override + protected boolean setException(Throwable throwable) { + return super.setException(throwable); + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD index f31e6381026b9d..8871a241a0b01c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD @@ -23,11 +23,15 @@ java_library( "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/test/java/com/google/devtools/build/lib/actions/util", "//third_party:guava", + "//third_party:junit4", + "//third_party:rxjava3", + "//third_party:truth", "//third_party/protobuf:protobuf_java", "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java b/src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java new file mode 100644 index 00000000000000..e38755aeb11a18 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java @@ -0,0 +1,230 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.util; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableEmitter; +import io.reactivex.rxjava3.observers.TestObserver; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RxFutures}. */ +@RunWith(JUnit4.class) +public class RxFuturesTest { + @Rule public final RxNoGlobalErrorsRule rxNoGlobalErrorsRule = new RxNoGlobalErrorsRule(); + + @Test + public void toCompletable_noSubscription_noExecution() { + SettableFuture future = SettableFuture.create(); + AtomicBoolean executed = new AtomicBoolean(false); + + RxFutures.toCompletable( + () -> { + executed.set(true); + return future; + }, + MoreExecutors.directExecutor()); + + assertThat(executed.get()).isFalse(); + } + + @Test + public void toCompletable_futureOnSuccess_completableOnComplete() { + SettableFuture future = SettableFuture.create(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + + TestObserver observer = completable.test(); + observer.assertEmpty(); + future.set(null); + + observer.assertComplete(); + } + + @Test + public void toCompletable_futureOnError_completableOnError() { + SettableFuture future = SettableFuture.create(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + + TestObserver observer = completable.test(); + observer.assertEmpty(); + Throwable error = new IllegalStateException("error"); + future.setException(error); + + observer.assertError(error); + } + + @Test + public void toCompletable_futureOnSuccessBeforeSubscription_completableOnComplete() { + SettableFuture future = SettableFuture.create(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + + future.set(null); + TestObserver observer = completable.test(); + + observer.assertComplete(); + } + + @Test + public void toCompletable_futureOnErrorBeforeSubscription_completableOnError() { + SettableFuture future = SettableFuture.create(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + + Throwable error = new IllegalStateException("error"); + future.setException(error); + TestObserver observer = completable.test(); + + observer.assertError(error); + } + + @Test + public void toCompletable_futureCancelledBeforeSubscription_completableOnError() { + SettableFuture future = SettableFuture.create(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + + future.cancel(true); + TestObserver observer = completable.test(); + + observer.assertError(CancellationException.class); + } + + @Test + public void toCompletable_disposeCompletable_cancelFuture() { + SettableFuture future = SettableFuture.create(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + + TestObserver observer = completable.test(); + observer.assertEmpty(); + observer.dispose(); + + assertThat(future.isCancelled()).isTrue(); + } + + @Test + public void toCompletable_multipleSubscriptions_error() { + ListenableFuture future = immediateVoidFuture(); + Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor()); + completable.test().assertComplete(); + + TestObserver observer = completable.test(); + + observer.assertError(IllegalStateException.class); + } + + @Test + public void toListenableFutureFromCompletable_noEvents_waiting() { + CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create(); + + assertThat(setup.getEmitter()).isNotNull(); + assertThat(setup.getFuture().isDone()).isFalse(); + assertThat(setup.getFuture().isCancelled()).isFalse(); + } + + @Test + public void toListenableFutureFromCompletable_completableOnComplete_futureOnSuccess() { + CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create(); + + setup.getEmitter().onComplete(); + + assertThat(setup.isSuccess()).isTrue(); + assertThat(setup.getFailure()).isNull(); + } + + @Test + public void toListenableFutureFromCompletable_completableOnError_futureOnFailure() { + CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create(); + + Throwable error = new IllegalStateException("error"); + setup.getEmitter().onError(error); + + assertThat(setup.isSuccess()).isFalse(); + assertThat(setup.getFailure()).isEqualTo(error); + } + + @Test + public void toListenableFutureFromCompletable_cancelled() { + CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create(); + + setup.getFuture().cancel(true); + + assertThat(setup.isSuccess()).isFalse(); + assertThat(setup.getFailure()).isInstanceOf(CancellationException.class); + assertThat(setup.isDisposed()).isTrue(); + } + + private static class CompletableToListenableFutureSetup { + public static CompletableToListenableFutureSetup create() { + return new CompletableToListenableFutureSetup(); + } + + private final ListenableFuture future; + + private CompletableEmitter emitter; + private boolean disposed; + private boolean success; + private Throwable failure; + + CompletableToListenableFutureSetup() { + Completable completable = + Completable.create(emitter -> this.emitter = emitter).doOnDispose(() -> disposed = true); + future = RxFutures.toListenableFuture(completable); + Futures.addCallback( + future, + new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + success = true; + } + + @Override + public void onFailure(Throwable t) { + failure = t; + } + }, + MoreExecutors.directExecutor()); + } + + public CompletableEmitter getEmitter() { + return emitter; + } + + public ListenableFuture getFuture() { + return future; + } + + public boolean isDisposed() { + return disposed; + } + + public boolean isSuccess() { + return success; + } + + public Throwable getFailure() { + return failure; + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java b/src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java new file mode 100644 index 00000000000000..ad3d4c3d45e335 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java @@ -0,0 +1,69 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.util; + +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import org.junit.rules.ExternalResource; + +/** + * A JUnit {@link org.junit.Rule} that captures uncaught errors from RxJava streams and rethrows + * them post-test if left unaddressed. + * + *

This is to prevent false-positives caused by RxJava's default uncaught error handler, which + * manually forwards the event to the current Thread's exception handler and bypasses JUnit's + * failure reporting. + * + *

Can also be used to assert that no uncaught errors have yet been thrown mid-test. This is + * useful to ensure tests are in a consistent state before continuing. + */ +public class RxNoGlobalErrorsRule extends ExternalResource { + private final List errors = new CopyOnWriteArrayList<>(); + + @Override + protected void before() { + RxJavaPlugins.setErrorHandler(errors::add); + } + + @Override + protected void after() { + assertNoErrors(); + } + + private static final class UncaughtRxErrors extends RuntimeException { + private UncaughtRxErrors(Throwable cause) { + super("There were uncaught Rx errors during test execution", cause); + } + } + + /** + * Asserts that no uncaught errors have yet occurred. + * + *

If an Rx stream has thrown an uncaught error any time before this method is called, an + * {@link UncaughtRxErrors} is thrown. This is useful for ensuring that tests are in a consistent + * state before continuing. + * + *

You may need to advance any test schedulers so that any pending events are flushed. + */ + private void assertNoErrors() { + if (errors.size() > 1) { + Throwable[] errorsArray = errors.toArray(new Throwable[0]); + throw new UncaughtRxErrors(new CompositeException(errorsArray)); + } else if (errors.size() == 1) { + throw new UncaughtRxErrors(errors.get(0)); + } + } +}