From c96aa3c85ff5339dcac69f6ee2162d24432189d1 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 00:57:34 +0200 Subject: [PATCH 1/9] replace twitter future with java future --- .../org/apache/hadoop/fs/common/BufferData.java | 7 ++----- .../org/apache/hadoop/fs/common/BufferPool.java | 4 ++-- .../hadoop/fs/common/CachingBlockManager.java | 15 +++++++-------- .../apache/hadoop/fs/common/TestBufferData.java | 11 +++++++---- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java index 34dd6d7ba3b8d..a855a1c2c390c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java @@ -22,10 +22,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import java.util.zip.CRC32; -import com.twitter.util.Awaitable.CanAwait; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,8 +262,6 @@ public boolean stateEqualsOneOf(State... states) { return false; } - private static final CanAwait CAN_AWAIT = () -> false; - public String toString() { return String.format( @@ -281,7 +278,7 @@ private String getFutureStr(Future f) { if (f == null) { return "--"; } else { - return this.action.isReady(CAN_AWAIT) ? "done" : "not done"; + return this.action.isDone() ? "done" : "not done"; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java index 91798e550064a..259f9834cea82 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java @@ -27,8 +27,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,7 +233,7 @@ public synchronized void close() { for (BufferData data : this.getAll()) { Future actionFuture = data.getActionFuture(); if (actionFuture != null) { - actionFuture.raise(new CancellationException("BufferPool is closing.")); + actionFuture.cancel(true); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 93417f3fe61e9..a8125f7bfea34 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -21,12 +21,11 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import com.twitter.util.Await; import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Future; import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,8 +246,8 @@ public void requestPrefetch(int blockNumber) { BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber); PrefetchTask prefetchTask = new PrefetchTask(data, this); - Future prefetchFuture = this.futurePool.apply(prefetchTask); - data.setPrefetch(prefetchFuture); + com.twitter.util.Future prefetchFuture = this.futurePool.apply(prefetchTask); + data.setPrefetch(prefetchFuture.toCompletableFuture()); this.ops.end(op); } } @@ -412,12 +411,12 @@ public void requestCaching(BufferData data) { if (state == BufferData.State.PREFETCHING) { blockFuture = data.getActionFuture(); } else { - blockFuture = Future.value(null); + blockFuture = (Future) com.twitter.util.Future.value(null).toJavaFuture(); } CachePutTask task = new CachePutTask(data, blockFuture, this); - Future actionFuture = this.futurePool.apply(task); - data.setCaching(actionFuture); + com.twitter.util.Future actionFuture = this.futurePool.apply(task); + data.setCaching(actionFuture.toCompletableFuture()); this.ops.end(op); } } @@ -433,7 +432,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { } try { - Await.result(blockFuture); + blockFuture.get(); if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. return; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java index 119e90ffebad5..4855d4c958184 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; -import com.twitter.util.Future; import org.junit.Test; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -83,13 +83,15 @@ public void testValidStateUpdates() { assertEquals(BufferData.State.BLANK, data.getState()); - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); data.setPrefetch(actionFuture); assertEquals(BufferData.State.PREFETCHING, data.getState()); assertNotNull(data.getActionFuture()); assertSame(actionFuture, data.getActionFuture()); - Future actionFuture2 = Future.value(null); + CompletableFuture actionFuture2 = new CompletableFuture<>(); + actionFuture.complete(null); data.setCaching(actionFuture2); assertEquals(BufferData.State.CACHING, data.getState()); assertNotNull(data.getActionFuture()); @@ -117,7 +119,8 @@ public void testValidStateUpdates() { @Test public void testInvalidStateUpdates() throws Exception { - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); testInvalidStateUpdatesHelper( (d) -> d.setPrefetch(actionFuture), BufferData.State.BLANK, From c2d90c7baf436336629ba31940cb1c24e94af113 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 01:22:57 +0200 Subject: [PATCH 2/9] remove use of some twitter util code --- .../hadoop/fs/common/CachingBlockManager.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index a8125f7bfea34..bbd7deb7d26c2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -24,8 +24,8 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; -import com.twitter.util.ExceptionalFunction0; import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,7 +246,7 @@ public void requestPrefetch(int blockNumber) { BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber); PrefetchTask prefetchTask = new PrefetchTask(data, this); - com.twitter.util.Future prefetchFuture = this.futurePool.apply(prefetchTask); + com.twitter.util.Future prefetchFuture = this.futurePool.apply(com.twitter.util.Function.func0(prefetchTask)); data.setPrefetch(prefetchFuture.toCompletableFuture()); this.ops.end(op); } @@ -343,7 +343,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... /** * Read task that is submitted to the future pool. */ - private static class PrefetchTask extends ExceptionalFunction0 { + private static class PrefetchTask implements Supplier { private final BufferData data; private final CachingBlockManager blockManager; @@ -353,7 +353,7 @@ private static class PrefetchTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { try { this.blockManager.prefetch(data); } catch (Exception e) { @@ -415,7 +415,7 @@ public void requestCaching(BufferData data) { } CachePutTask task = new CachePutTask(data, blockFuture, this); - com.twitter.util.Future actionFuture = this.futurePool.apply(task); + com.twitter.util.Future actionFuture = this.futurePool.apply(com.twitter.util.Function.func0(task)); data.setCaching(actionFuture.toCompletableFuture()); this.ops.end(op); } @@ -499,7 +499,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { this.cache.put(blockNumber, buffer); } - private static class CachePutTask extends ExceptionalFunction0 { + private static class CachePutTask implements Supplier { private final BufferData data; // Block being asynchronously fetched. @@ -518,7 +518,7 @@ private static class CachePutTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { this.blockManager.addToCacheAndRelease(this.data, this.blockFuture); return null; } From 0c48a0d0f85744087f79a22e58b750d519fa0334 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 02:28:10 +0200 Subject: [PATCH 3/9] port FuturePool to java --- .../hadoop/fs/common/CachingBlockManager.java | 13 ++-- .../fs/common/ExecutorServiceFuturePool.java | 64 +++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 5 +- .../hadoop/fs/s3a/S3AReadOpContext.java | 13 ++-- .../fs/s3a/read/S3CachingBlockManager.java | 4 +- .../fs/s3a/read/S3CachingInputStream.java | 4 +- .../org/apache/hadoop/fs/s3a/read/Fakes.java | 14 ++-- .../s3a/read/TestS3CachingBlockManager.java | 7 +- .../apache/hadoop/fs/s3a/read/TestS3File.java | 5 +- .../hadoop/fs/s3a/read/TestS3InputStream.java | 5 +- 10 files changed, 96 insertions(+), 38 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index bbd7deb7d26c2..3920419c5327b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +37,7 @@ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); // Asynchronous tasks are performed in this pool. - private final FuturePool futurePool; + private final ExecutorServiceFuturePool futurePool; // Pool of shared ByteBuffer instances. private BufferPool bufferPool; @@ -77,7 +76,7 @@ public abstract class CachingBlockManager extends BlockManager { * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, BlockData blockData, int bufferPoolSize) { super(blockData); @@ -246,8 +245,8 @@ public void requestPrefetch(int blockNumber) { BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber); PrefetchTask prefetchTask = new PrefetchTask(data, this); - com.twitter.util.Future prefetchFuture = this.futurePool.apply(com.twitter.util.Function.func0(prefetchTask)); - data.setPrefetch(prefetchFuture.toCompletableFuture()); + Future prefetchFuture = this.futurePool.apply(prefetchTask); + data.setPrefetch(prefetchFuture); this.ops.end(op); } } @@ -415,8 +414,8 @@ public void requestCaching(BufferData data) { } CachePutTask task = new CachePutTask(data, blockFuture, this); - com.twitter.util.Future actionFuture = this.futurePool.apply(com.twitter.util.Function.func0(task)); - data.setCaching(actionFuture.toCompletableFuture()); + Future actionFuture = this.futurePool.apply(task); + data.setCaching(actionFuture); this.ops.end(op); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java new file mode 100644 index 0000000000000..9205a21e0880f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.fs.common; + +import java.util.Locale; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Supplier; + +public class ExecutorServiceFuturePool { + private ExecutorService executor; + private boolean interruptible = false; + + public ExecutorServiceFuturePool(ExecutorService executor, boolean interruptible) { + this.executor = executor; + } + + public ExecutorServiceFuturePool(ExecutorService executor) { + this(executor, false); + } + + public Future apply(final Supplier f) { + CompletableFuture completableFuture = new CompletableFuture<>(); + executor.submit(() -> completableFuture.complete(f.get())); + return completableFuture; + } + + public String toString() { + return String.format(Locale.ROOT,"ExecutorServiceFuturePool(interruptible=%s, executor=%s)", + interruptible, executor); + } + + public int poolSize() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else { + return -1; + } + } + + public int numActiveTasks() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else { + return -1; + } + } + + public long numCompletedTasks() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getCompletedTaskCount(); + } else { + return -1; + } + } + + public long numPendingTasks() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getQueue().size(); + } else { + return -1; + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 35c26067fcca1..76fe678073df3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -76,8 +76,7 @@ import com.amazonaws.services.s3.transfer.model.UploadResult; import com.amazonaws.event.ProgressListener; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private ThreadPoolExecutor unboundedThreadPool; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // If true, the prefetching input stream is used for reads. private boolean prefetchEnabled; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index acfe6a415f1e3..23eb4691738af 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -18,11 +18,10 @@ package org.apache.hadoop.fs.s3a; -import com.twitter.util.FuturePool; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; @@ -61,7 +60,7 @@ public class S3AReadOpContext extends S3AOpContext { private final AuditSpan auditSpan; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // Size in bytes of a single prefetch block. private final int prefetchBlockSize; @@ -94,7 +93,7 @@ public S3AReadOpContext( ChangeDetectionPolicy changeDetectionPolicy, final long readahead, final AuditSpan auditSpan, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, int prefetchBlockSize, int prefetchBlockCount) { @@ -161,11 +160,11 @@ public AuditSpan getAuditSpan() { } /** - * Gets the {@code FuturePool} used for asynchronous prefetches. + * Gets the {@code ExecutorServiceFuturePool2} used for asynchronous prefetches. * - * @return the {@code FuturePool} used for asynchronous prefetches. + * @return the {@code ExecutorServiceFuturePool2} used for asynchronous prefetches. */ - public FuturePool getFuturePool() { + public ExecutorServiceFuturePool getFuturePool() { return this.futurePool; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java index c4fafd56f1d9b..674a5ccbdd8bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.nio.ByteBuffer; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.CachingBlockManager; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.Validate; /** @@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager { * @throws IllegalArgumentException if reader is null. */ public S3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java index 1117002526838..a1a9a22448ae3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java @@ -21,13 +21,13 @@ import java.io.IOException; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BlockManager; import org.apache.hadoop.fs.common.BufferData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -186,7 +186,7 @@ public String toString() { } protected BlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java index 5c2f7eb224151..1b02c495bc477 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java @@ -34,11 +34,11 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.twitter.util.FuturePool; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.common.BlockCache; import org.apache.hadoop.fs.common.BlockData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.SingleFilePerBlockCache; import org.apache.hadoop.fs.common.Validate; import org.apache.hadoop.fs.s3a.Invoker; @@ -109,7 +109,7 @@ public static S3ObjectAttributes createObjectAttributes( } public static S3AReadOpContext createReadContext( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String key, int fileSize, int prefetchBlockSize, @@ -195,7 +195,7 @@ public void close() { public static S3InputStream createInputStream( Class clazz, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -225,7 +225,7 @@ public static S3InputStream createInputStream( } public static TestS3InMemoryInputStream createS3InMemoryInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize) { @@ -235,7 +235,7 @@ public static TestS3InMemoryInputStream createS3InMemoryInputStream( } public static TestS3CachingInputStream createS3CachingInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -322,7 +322,7 @@ private static void randomDelay(int delay) { public static class TestS3CachingBlockManager extends S3CachingBlockManager { public TestS3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { @@ -359,7 +359,7 @@ protected S3File getS3File() { @Override protected S3CachingBlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java index 99836793decba..3f84e2e028339 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java @@ -24,13 +24,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BufferData; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -41,7 +40,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { static final int POOL_SIZE = 3; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE); @@ -106,7 +105,7 @@ public void testArgChecks() throws Exception { */ static class TestBlockManager extends S3CachingBlockManager { TestBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java index 2f555d2b62c47..d7754354db825 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java @@ -22,8 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.ExceptionAsserts; @@ -36,7 +35,7 @@ public class TestS3File extends AbstractHadoopTestBase { private final ExecutorService threadPool = Executors.newFixedThreadPool(1); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 503cd699002c7..318a789cb6889 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -24,8 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.junit.Test; import org.apache.hadoop.fs.FSExceptionMessages; @@ -45,7 +44,7 @@ public class TestS3InputStream extends AbstractHadoopTestBase { private static final int FILE_SIZE = 10; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test From 69426c9bf55672c35e62c9f4f7f6d17ab8affb7e Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 02:29:42 +0200 Subject: [PATCH 4/9] Update ExecutorServiceFuturePool.java --- .../apache/hadoop/fs/common/ExecutorServiceFuturePool.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 9205a21e0880f..3418065f2e41c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -11,12 +11,8 @@ public class ExecutorServiceFuturePool { private ExecutorService executor; private boolean interruptible = false; - public ExecutorServiceFuturePool(ExecutorService executor, boolean interruptible) { - this.executor = executor; - } - public ExecutorServiceFuturePool(ExecutorService executor) { - this(executor, false); + this.executor = executor; } public Future apply(final Supplier f) { From 8a6c186f4fd2e4f22badec22e9c69fd00d537fd1 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 02:43:02 +0200 Subject: [PATCH 5/9] remove twitter util dependency --- hadoop-tools/hadoop-aws/pom.xml | 6 ----- .../hadoop/fs/common/CachingBlockManager.java | 5 +++- .../fs/common/ExecutorServiceFuturePool.java | 26 +++++++++++++++---- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 2e6b8ff359512..5583bb7ad05ec 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -435,12 +435,6 @@ aws-java-sdk-bundle compile - - com.twitter - util-core_2.11 - 21.2.0 - compile - org.assertj assertj-core diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 3920419c5327b..17a85cb0d3f37 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -410,7 +411,9 @@ public void requestCaching(BufferData data) { if (state == BufferData.State.PREFETCHING) { blockFuture = data.getActionFuture(); } else { - blockFuture = (Future) com.twitter.util.Future.value(null).toJavaFuture(); + CompletableFuture cf = new CompletableFuture<>(); + cf.complete(null); + blockFuture = cf; } CachePutTask task = new CachePutTask(data, blockFuture, this); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 3418065f2e41c..524d8ece03850 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -1,7 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.hadoop.fs.common; import java.util.Locale; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -9,16 +27,14 @@ public class ExecutorServiceFuturePool { private ExecutorService executor; - private boolean interruptible = false; + private final boolean interruptible = false; public ExecutorServiceFuturePool(ExecutorService executor) { this.executor = executor; } public Future apply(final Supplier f) { - CompletableFuture completableFuture = new CompletableFuture<>(); - executor.submit(() -> completableFuture.complete(f.get())); - return completableFuture; + return executor.submit(f::get); } public String toString() { From 846235e326b9fb7b425611f437c73f4c6c3a5fb0 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 02:54:08 +0200 Subject: [PATCH 6/9] typo --- .../java/org/apache/hadoop/fs/common/CachingBlockManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 17a85cb0d3f37..5b66053a880f8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -440,7 +440,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { return; } } catch (Exception e) { - String message = String.format("error waitng on blockFuture: %s", data); + String message = String.format("error waiting on blockFuture: %s", data); LOG.error(message, e); data.setDone(); return; From e9ec63d4357a7f635dd4e61e1b33c1afac64396c Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 03:00:02 +0200 Subject: [PATCH 7/9] Update CachingBlockManager.java --- .../java/org/apache/hadoop/fs/common/CachingBlockManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 5b66053a880f8..a11d9965c146d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -434,7 +434,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { } try { - blockFuture.get(); + blockFuture.get(); //TODO consider calling get(long timeout, TimeUnit unit) instead if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. return; From f58656b83a344bf6349a95ac3302423342064b64 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 03:02:24 +0200 Subject: [PATCH 8/9] Update CachingBlockManager.java --- .../java/org/apache/hadoop/fs/common/CachingBlockManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index a11d9965c146d..078b9a894e070 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -411,7 +411,7 @@ public void requestCaching(BufferData data) { if (state == BufferData.State.PREFETCHING) { blockFuture = data.getActionFuture(); } else { - CompletableFuture cf = new CompletableFuture<>(); + CompletableFuture cf = new CompletableFuture<>(); cf.complete(null); blockFuture = cf; } From 9330eef4f15b0465c40664d4ac8efd1563519b74 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 03:10:52 +0200 Subject: [PATCH 9/9] Update ExecutorServiceFuturePool.java --- .../hadoop/fs/common/ExecutorServiceFuturePool.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 524d8ece03850..bc8142219d5ce 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -27,19 +27,23 @@ public class ExecutorServiceFuturePool { private ExecutorService executor; - private final boolean interruptible = false; public ExecutorServiceFuturePool(ExecutorService executor) { this.executor = executor; } + /** + * @param f function to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if f param is null + */ public Future apply(final Supplier f) { return executor.submit(f::get); } public String toString() { - return String.format(Locale.ROOT,"ExecutorServiceFuturePool(interruptible=%s, executor=%s)", - interruptible, executor); + return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); } public int poolSize() {