From 3ad2c22af5b61465ed593759dd1fdff7f3b4beb8 Mon Sep 17 00:00:00 2001 From: Junchuan Wang Date: Fri, 29 Jan 2021 11:58:34 -0800 Subject: [PATCH 1/4] Rename Task.blocking to Task.runInExecutor --- ...estTaskSimpleBatchingStrategyBlocking.java | 2 +- .../parseq/zk/client/ZKClientImpl.java | 4 +-- .../linkedin/parseq/AsyncCallableTask.java | 7 +++-- .../main/java/com/linkedin/parseq/Task.java | 27 +++++++++++++------ .../parseq/TestTaskFactoryMethods.java | 2 +- .../com/linkedin/parseq/TestTaskType.java | 2 +- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java b/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java index 4dd28e28..b0befcef 100644 --- a/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java +++ b/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java @@ -36,7 +36,7 @@ private Strategy(long sleepMs) { @Override public Task>> taskForBatch(Set keys) { - return Task.blocking(() -> { + return Task.runInExecutor(() -> { try { // make this batching task long-running Thread.sleep(_sleepMs); diff --git a/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java b/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java index 9403b0f3..c8fab18e 100644 --- a/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java +++ b/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java @@ -18,14 +18,12 @@ import com.linkedin.parseq.Context; import com.linkedin.parseq.Engine; -import com.linkedin.parseq.MultiException; import com.linkedin.parseq.Task; import com.linkedin.parseq.promise.Promise; import com.linkedin.parseq.promise.PromiseListener; import com.linkedin.parseq.promise.Promises; import com.linkedin.parseq.promise.SettablePromise; import java.io.IOException; -import java.util.ArrayList; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -282,7 +280,7 @@ public Task delete(String path, int version) { */ @Override public Task> multi(List ops, Executor executor) { - return Task.blocking(() -> _zkClient.multi(ops), executor); + return Task.runInExecutor(() -> _zkClient.multi(ops), executor); } /** diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java index 8b466168..e4e45488 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java @@ -15,7 +15,6 @@ */ package com.linkedin.parseq; -import com.linkedin.parseq.EngineBuilder; import com.linkedin.parseq.internal.ArgumentUtil; import com.linkedin.parseq.promise.Promise; import com.linkedin.parseq.promise.Promises; @@ -37,7 +36,7 @@ * To use this class with an engine, register an executor with engine using * {@link #register(EngineBuilder, java.util.concurrent.Executor)} * - * @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}. * @author Walter Fender (wfender@linkedin.com) */ @Deprecated @@ -51,7 +50,7 @@ public static void register(EngineBuilder builder, Executor executor) { } /** - * @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}. */ @Deprecated public AsyncCallableTask(final Callable syncJob) { @@ -59,7 +58,7 @@ public AsyncCallableTask(final Callable syncJob) { } /** - * @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}. */ @Deprecated public AsyncCallableTask(final String name, final Callable syncJob) { diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java index 7e275703..8a343200 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java @@ -20,7 +20,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1079,7 +1078,7 @@ public static Task flatten(final Task> task) { * Creates a new task that have a value of type {@code Void}. Because the * returned task returns no value, it is typically used to produce side effects. * It is not appropriate for long running or blocking actions. If action is - * long running or blocking use {@link #blocking(String, Callable, Executor) blocking} method. + * long running or blocking use {@link #runInExecutor(String, Callable, Executor) blocking} method. * *
    * // this task will print "Hello" on standard output
@@ -1167,7 +1166,7 @@ public static  Task failure(final Throwable failure) {
    * from the supplied callable. This task is useful when doing basic
    * computation that does not require asynchrony. It is not appropriate for
    * long running or blocking callables. If callable is long running or blocking
-   * use {@link #blocking(String, Callable, Executor) blocking} method.
+   * use {@link #runInExecutor(String, Callable, Executor) blocking} method.
    *
    * 
    * // this task will complete with {@code String} representing current time
@@ -1313,7 +1312,7 @@ public static  Task fromTry(final Try tried) {
    *
    * This method is not appropriate for long running or blocking callables.
    * If callable is long running or blocking use
-   * {@link #blocking(String, Callable, Executor) blocking} method.
+   * {@link #runInExecutor(String, Callable, Executor) blocking} method.
    * 

* * @param the type of the return value for this task @@ -1403,7 +1402,7 @@ public static Task async(final Function1> f * @return a new task that will submit the callable to given executor and complete * with result returned by that callable */ - public static Task blocking(final String name, final Callable callable, final Executor executor) { + public static Task runInExecutor(final String name, final Callable callable, final Executor executor) { ArgumentUtil.requireNotNull(callable, "callable"); ArgumentUtil.requireNotNull(callable, "executor"); Task blockingTask = async(name, () -> { @@ -1422,13 +1421,25 @@ public static Task blocking(final String name, final Callable Task runInExecutor(final Callable callable, final Executor executor) { + return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + } + + @Deprecated public static Task blocking(final Callable callable, final Executor executor) { - return blocking("blocking: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + } + + + @Deprecated + public static Task blocking(final String name, final Callable callable, final Executor executor) { + return runInExecutor(name, callable, executor); } + /** * Creates a new task that will run given tasks in parallel. Returned task * will be resolved with results of all tasks as soon as all of them has diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java index f20f40db..f679f21a 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java @@ -94,7 +94,7 @@ public void testAsyncWithContext() { public void testBlocking() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { - Task task = Task.blocking(() -> "from blocking", es); + Task task = Task.runInExecutor(() -> "from blocking", es); runAndWait("TestTaskFactoryMethods.testBlocking", task); assertEquals(task.get(), "from blocking"); assertEquals(es.getCount(), 1); diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java index fb37458e..8f557f40 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java @@ -28,7 +28,7 @@ public void testFusionTaskType() { public void testBlockingTaskType() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { - Task task = Task.blocking(() -> "blocking task", es); + Task task = Task.runInExecutor(() -> "blocking task", es); runAndWait("blockingTaskType", task); assertEquals(task.getShallowTrace().getTaskType(), TaskType.BLOCKING.getName()); } finally { From 4ebe66fb9eebb63843eebfbfab32c25c249bf0f4 Mon Sep 17 00:00:00 2001 From: Junchuan Wang Date: Fri, 29 Jan 2021 12:25:58 -0800 Subject: [PATCH 2/4] address comments --- .../parseq/src/main/java/com/linkedin/parseq/Task.java | 6 ++++++ .../java/com/linkedin/parseq/TestTaskFactoryMethods.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java index 8a343200..67e87038 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java @@ -1428,12 +1428,18 @@ public static Task runInExecutor(final Callable callable, fi return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); } + /** + * @deprecated please use {@link Task#runInExecutor(Callable, Executor)} + */ @Deprecated public static Task blocking(final Callable callable, final Executor executor) { return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); } + /** + * @deprecated please use {@link Task#runInExecutor(String, Callable, Executor)} + */ @Deprecated public static Task blocking(final String name, final Callable callable, final Executor executor) { return runInExecutor(name, callable, executor); diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java index f679f21a..0c20b7d1 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java @@ -91,7 +91,7 @@ public void testAsyncWithContext() { } @Test - public void testBlocking() { + public void testRunInExecutor() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { Task task = Task.runInExecutor(() -> "from blocking", es); From 70316d97d362a4469de802e6edf24a6c102bc428 Mon Sep 17 00:00:00 2001 From: Junchuan Wang Date: Fri, 29 Jan 2021 12:44:31 -0800 Subject: [PATCH 3/4] rename to Task.callableInExecutor --- ...estTaskSimpleBatchingStrategyBlocking.java | 2 +- .../parseq/zk/client/ZKClientImpl.java | 2 +- .../linkedin/parseq/AsyncCallableTask.java | 6 ++--- .../main/java/com/linkedin/parseq/Task.java | 24 +++++++++---------- .../parseq/TestTaskFactoryMethods.java | 4 ++-- .../com/linkedin/parseq/TestTaskType.java | 2 +- 6 files changed, 20 insertions(+), 20 deletions(-) diff --git a/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java b/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java index b0befcef..81a85982 100644 --- a/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java +++ b/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java @@ -36,7 +36,7 @@ private Strategy(long sleepMs) { @Override public Task>> taskForBatch(Set keys) { - return Task.runInExecutor(() -> { + return Task.callableInExecutor(() -> { try { // make this batching task long-running Thread.sleep(_sleepMs); diff --git a/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java b/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java index c8fab18e..cfe719da 100644 --- a/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java +++ b/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java @@ -280,7 +280,7 @@ public Task delete(String path, int version) { */ @Override public Task> multi(List ops, Executor executor) { - return Task.runInExecutor(() -> _zkClient.multi(ops), executor); + return Task.callableInExecutor(() -> _zkClient.multi(ops), executor); } /** diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java index e4e45488..06553258 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java @@ -36,7 +36,7 @@ * To use this class with an engine, register an executor with engine using * {@link #register(EngineBuilder, java.util.concurrent.Executor)} * - * @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}. * @author Walter Fender (wfender@linkedin.com) */ @Deprecated @@ -50,7 +50,7 @@ public static void register(EngineBuilder builder, Executor executor) { } /** - * @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}. */ @Deprecated public AsyncCallableTask(final Callable syncJob) { @@ -58,7 +58,7 @@ public AsyncCallableTask(final Callable syncJob) { } /** - * @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}. */ @Deprecated public AsyncCallableTask(final String name, final Callable syncJob) { diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java index 67e87038..ff9e571e 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java @@ -1078,7 +1078,7 @@ public static Task flatten(final Task> task) { * Creates a new task that have a value of type {@code Void}. Because the * returned task returns no value, it is typically used to produce side effects. * It is not appropriate for long running or blocking actions. If action is - * long running or blocking use {@link #runInExecutor(String, Callable, Executor) blocking} method. + * long running or blocking use {@link #callableInExecutor(String, Callable, Executor)} method. * *

    * // this task will print "Hello" on standard output
@@ -1166,7 +1166,7 @@ public static  Task failure(final Throwable failure) {
    * from the supplied callable. This task is useful when doing basic
    * computation that does not require asynchrony. It is not appropriate for
    * long running or blocking callables. If callable is long running or blocking
-   * use {@link #runInExecutor(String, Callable, Executor) blocking} method.
+   * use {@link #callableInExecutor(String, Callable, Executor)} method.
    *
    * 
    * // this task will complete with {@code String} representing current time
@@ -1312,7 +1312,7 @@ public static  Task fromTry(final Try tried) {
    *
    * This method is not appropriate for long running or blocking callables.
    * If callable is long running or blocking use
-   * {@link #runInExecutor(String, Callable, Executor) blocking} method.
+   * {@link #callableInExecutor(String, Callable, Executor)} method.
    * 

* * @param the type of the return value for this task @@ -1402,7 +1402,7 @@ public static Task async(final Function1> f * @return a new task that will submit the callable to given executor and complete * with result returned by that callable */ - public static Task runInExecutor(final String name, final Callable callable, final Executor executor) { + public static Task callableInExecutor(final String name, final Callable callable, final Executor executor) { ArgumentUtil.requireNotNull(callable, "callable"); ArgumentUtil.requireNotNull(callable, "executor"); Task blockingTask = async(name, () -> { @@ -1421,28 +1421,28 @@ public static Task runInExecutor(final String name, final Callable Task runInExecutor(final Callable callable, final Executor executor) { - return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + public static Task callableInExecutor(final Callable callable, final Executor executor) { + return callableInExecutor("callableInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); } /** - * @deprecated please use {@link Task#runInExecutor(Callable, Executor)} + * @deprecated please use {@link Task#callableInExecutor(Callable, Executor)} */ @Deprecated public static Task blocking(final Callable callable, final Executor executor) { - return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + return callableInExecutor("callableInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); } /** - * @deprecated please use {@link Task#runInExecutor(String, Callable, Executor)} + * @deprecated please use {@link Task#callableInExecutor(String, Callable, Executor)} */ @Deprecated public static Task blocking(final String name, final Callable callable, final Executor executor) { - return runInExecutor(name, callable, executor); + return callableInExecutor(name, callable, executor); } diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java index 0c20b7d1..ec3dc0eb 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java @@ -91,10 +91,10 @@ public void testAsyncWithContext() { } @Test - public void testRunInExecutor() { + public void testcallableInExecutor() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { - Task task = Task.runInExecutor(() -> "from blocking", es); + Task task = Task.callableInExecutor(() -> "from blocking", es); runAndWait("TestTaskFactoryMethods.testBlocking", task); assertEquals(task.get(), "from blocking"); assertEquals(es.getCount(), 1); diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java index 8f557f40..d5638f72 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java @@ -28,7 +28,7 @@ public void testFusionTaskType() { public void testBlockingTaskType() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { - Task task = Task.runInExecutor(() -> "blocking task", es); + Task task = Task.callableInExecutor(() -> "blocking task", es); runAndWait("blockingTaskType", task); assertEquals(task.getShallowTrace().getTaskType(), TaskType.BLOCKING.getName()); } finally { From b537f1c97f31d691ff21fa2bfc1fa81faed1a6c4 Mon Sep 17 00:00:00 2001 From: Junchuan Wang Date: Fri, 29 Jan 2021 16:04:30 -0800 Subject: [PATCH 4/4] address comments --- .../parseq/src/main/java/com/linkedin/parseq/Task.java | 6 +++--- .../parseq/src/main/java/com/linkedin/parseq/TaskType.java | 2 +- .../src/test/java/com/linkedin/parseq/TestTaskType.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java index ff9e571e..9ae027d5 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java @@ -1405,7 +1405,7 @@ public static Task async(final Function1> f public static Task callableInExecutor(final String name, final Callable callable, final Executor executor) { ArgumentUtil.requireNotNull(callable, "callable"); ArgumentUtil.requireNotNull(callable, "executor"); - Task blockingTask = async(name, () -> { + Task asyncCallableTask = async(name, () -> { final SettablePromise promise = Promises.settable(); executor.execute(() -> { try { @@ -1416,8 +1416,8 @@ public static Task callableInExecutor(final String name, final Callable task = Task.callableInExecutor(() -> "blocking task", es); runAndWait("blockingTaskType", task); - assertEquals(task.getShallowTrace().getTaskType(), TaskType.BLOCKING.getName()); + assertEquals(task.getShallowTrace().getTaskType(), TaskType.CALLABLE_IN_EXECUTOR.getName()); } finally { es.shutdown(); }