Skip to content

Commit

Permalink
Add submitAsync and scheduleAsync methods, to ease the deprecation of…
Browse files Browse the repository at this point in the history
… Futures.dereference

[]

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=162004818
  • Loading branch information
clm authored and cpovirk committed Jul 14, 2017
1 parent b6c86db commit 71b5b85
Show file tree
Hide file tree
Showing 7 changed files with 694 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static com.google.common.util.concurrent.Futures.lazyTransform;
import static com.google.common.util.concurrent.Futures.makeChecked;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static com.google.common.util.concurrent.Futures.scheduleAsync;
import static com.google.common.util.concurrent.Futures.submitAsync;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.Futures.transformAsync;
Expand All @@ -51,6 +53,7 @@
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -79,6 +82,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1727,27 +1731,247 @@ public ListenableFuture<Integer> apply(String input) throws Exception {

// Pause the executor.
final CountDownLatch beforeFunction = new CountDownLatch(1);
@SuppressWarnings("unused") // go/futurereturn-lsc
Future<?> possiblyIgnoredError =
executor.submit(
new Runnable() {
@Override
public void run() {
awaitUninterruptibly(beforeFunction);
}
});
executor.execute(
new Runnable() {
@Override
public void run() {
awaitUninterruptibly(beforeFunction);
}
});

// Cancel the future after making input available.
inputFuture.set("value");
future.cancel(false);

// Unpause the executor.
beforeFunction.countDown();
executor.awaitTermination(5, SECONDS);
executor.shutdown();
assertTrue(executor.awaitTermination(5, SECONDS));

assertFalse(functionCalled.get());
}

public void testSubmitAsync_asyncCallable_error() throws InterruptedException {
final Error error = new Error("deliberate");
AsyncCallable<Integer> callable =
new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() {
throw error;
}
};
SettableFuture<String> inputFuture = SettableFuture.create();
ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor());
inputFuture.set("value");
try {
getDone(outputFuture);
fail();
} catch (ExecutionException expected) {
assertSame(error, expected.getCause());
}
}

public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception {
ListenableFuture<?> chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor());
try {
getDone(chainedFuture);
fail();
} catch (ExecutionException expected) {
NullPointerException cause = (NullPointerException) expected.getCause();
assertThat(cause)
.hasMessage(
"AsyncCallable.call returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)?");
}
}

@GwtIncompatible // threads

public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction()
throws InterruptedException, ExecutionException {
final CountDownLatch inFunction = new CountDownLatch(1);
final CountDownLatch callableDone = new CountDownLatch(1);
final SettableFuture<Integer> resultFuture = SettableFuture.create();
AsyncCallable<Integer> callable =
new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() throws InterruptedException {
inFunction.countDown();
callableDone.await();
return resultFuture;
}
};
SettableFuture<String> inputFuture = SettableFuture.create();
ListenableFuture<Integer> future = submitAsync(callable, newSingleThreadExecutor());
inputFuture.set("value");
inFunction.await();
future.cancel(false);
callableDone.countDown();
try {
future.get();
fail();
} catch (CancellationException expected) {
}
try {
resultFuture.get();
fail();
} catch (CancellationException expected) {
}
}

@GwtIncompatible // threads

public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction()
throws InterruptedException {
final AtomicBoolean callableCalled = new AtomicBoolean();
AsyncCallable<Integer> callable =
new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() {
callableCalled.set(true);
return immediateFuture(1);
}
};
ExecutorService executor = newSingleThreadExecutor();
// Pause the executor.
final CountDownLatch beforeFunction = new CountDownLatch(1);
executor.execute(
new Runnable() {
@Override
public void run() {
awaitUninterruptibly(beforeFunction);
}
});
ListenableFuture<Integer> future = submitAsync(callable, executor);
future.cancel(false);

// Unpause the executor.
beforeFunction.countDown();
executor.shutdown();
assertTrue(executor.awaitTermination(5, SECONDS));

assertFalse(callableCalled.get());
}

@GwtIncompatible // threads

public void testScheduleAsync_asyncCallable_error() throws InterruptedException {
final Error error = new Error("deliberate");
AsyncCallable<Integer> callable =
new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() {
throw error;
}
};
SettableFuture<String> inputFuture = SettableFuture.create();
ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor());
inputFuture.set("value");
try {
getDone(outputFuture);
fail();
} catch (ExecutionException expected) {
assertSame(error, expected.getCause());
}
}

@GwtIncompatible // threads

public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception {
ListenableFuture<?> chainedFuture =
scheduleAsync(
constantAsyncCallable(null),
1,
TimeUnit.NANOSECONDS,
newSingleThreadScheduledExecutor());
try {
chainedFuture.get();
fail();
} catch (ExecutionException expected) {
NullPointerException cause = (NullPointerException) expected.getCause();
assertThat(cause)
.hasMessage(
"AsyncCallable.call returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)?");
}
}

@GwtIncompatible // threads

public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction()
throws InterruptedException, ExecutionException {
final CountDownLatch inFunction = new CountDownLatch(1);
final CountDownLatch callableDone = new CountDownLatch(1);
final SettableFuture<Integer> resultFuture = SettableFuture.create();
AsyncCallable<Integer> callable =
new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() throws InterruptedException {
inFunction.countDown();
callableDone.await();
return resultFuture;
}
};
ListenableFuture<Integer> future =
scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor());
inFunction.await();
future.cancel(false);
callableDone.countDown();
try {
future.get();
fail();
} catch (CancellationException expected) {
}
try {
resultFuture.get();
fail();
} catch (CancellationException expected) {
}
}

@GwtIncompatible // threads

public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction()
throws InterruptedException {
final AtomicBoolean callableCalled = new AtomicBoolean();
AsyncCallable<Integer> callable =
new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() {
callableCalled.set(true);
return immediateFuture(1);
}
};
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
// Pause the executor.
final CountDownLatch beforeFunction = new CountDownLatch(1);
executor.execute(
new Runnable() {
@Override
public void run() {
awaitUninterruptibly(beforeFunction);
}
});
ListenableFuture<Integer> future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, executor);
future.cancel(false);

// Unpause the executor.
beforeFunction.countDown();
executor.shutdown();
assertTrue(executor.awaitTermination(5, SECONDS));

assertFalse(callableCalled.get());
}

private static <T> AsyncCallable<T> constantAsyncCallable(final ListenableFuture<T> returnValue) {
return new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() {
return returnValue;
}
};
}

public void testDereference_genericsWildcard() throws Exception {
ListenableFuture<?> inner = immediateFuture(null);
ListenableFuture<ListenableFuture<?>> outer =
Expand Down
38 changes: 38 additions & 0 deletions android/guava/src/com/google/common/util/concurrent/Futures.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,44 @@ public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedChecke
return new ImmediateFailedCheckedFuture<V, X>(exception);
}

/**
* Executes {@code callable} on the specified {@code executor}, returning a {@code Future}.
*
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @since 23.0
*/
public static <O> ListenableFuture<O> submitAsync(AsyncCallable<O> callable, Executor executor) {
TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable);
executor.execute(task);
return task;
}

/**
* Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}.
*
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @since 23.0
*/
@GwtIncompatible // java.util.concurrent.ScheduledExecutorService
public static <O> ListenableFuture<O> scheduleAsync(
AsyncCallable<O> callable,
long delay,
TimeUnit timeUnit,
ScheduledExecutorService executorService) {
TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable);
final Future<?> scheduled = executorService.schedule(task, delay, timeUnit);
task.addListener(
new Runnable() {
@Override
public void run() {
// Don't want to interrupt twice
scheduled.cancel(false);
}
},
directExecutor());
return task;
}

/**
* Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
* primary input fails with the given {@code exceptionType}, from the result provided by the
Expand Down
Loading

0 comments on commit 71b5b85

Please sign in to comment.