diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 0c5501c7922c..957f761ccbc2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -73,7 +73,7 @@ SortedMap> getCommitIndexMap() { return commitIndexMap; } - void updateCommitInfoMap(long index, List buffers) { + synchronized void updateCommitInfoMap(long index, List buffers) { commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 5ff5da60989e..5c0516d7bd4f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -182,8 +181,7 @@ public BlockOutputStream( (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs .getStreamBufferFlushSize()); - // A single thread executor handle the responses of async requests - responseExecutor = Executors.newSingleThreadExecutor(); + this.responseExecutor = blockOutputStreamResourceProvider.get(); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; @@ -657,7 +655,6 @@ public void cleanup(boolean invalidateClient) { bufferList.clear(); } bufferList = null; - responseExecutor.shutdown(); } /** diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 3c7f8a2360c8..aa339409eceb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -24,6 +24,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; /** * This class executes watchForCommit on ratis pipeline and releases @@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher { private final BufferPool bufferPool; // future Map to hold up all putBlock futures - private final ConcurrentMap> futureMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> + futureMap = new ConcurrentHashMap<>(); CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) { super(xceiverClient); @@ -67,11 +69,24 @@ void releaseBuffers(long index) { + totalLength + ": existing = " + futureMap.keySet()); } - ConcurrentMap> getFutureMap() { + @VisibleForTesting + ConcurrentMap> getFutureMap() { return futureMap; } + public void putFlushFuture(long flushPos, CompletableFuture flushFuture) { + futureMap.compute(flushPos, + (key, previous) -> previous == null ? flushFuture : + previous.thenCombine(flushFuture, (prev, curr) -> curr)); + } + + + public void waitOnFlushFutures() throws InterruptedException, ExecutionException { + // wait for all the transactions to complete + CompletableFuture.allOf(futureMap.values().toArray( + new CompletableFuture[0])).get(); + } + @Override public void cleanup() { super.cleanup(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index 6a2758d36486..b587b1d13171 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -113,16 +113,13 @@ void updateCommitInfo(XceiverClientReply reply, List buffers) { } @Override - void putFlushFuture(long flushPos, - CompletableFuture flushFuture) { - commitWatcher.getFutureMap().put(flushPos, flushFuture); + void putFlushFuture(long flushPos, CompletableFuture flushFuture) { + commitWatcher.putFlushFuture(flushPos, flushFuture); } @Override void waitOnFlushFutures() throws InterruptedException, ExecutionException { - // wait for all the transactions to complete - CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray( - new CompletableFuture[0])).get(); + commitWatcher.waitOnFlushFutures(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index a45c15844847..90756bbc8898 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable { private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used. - private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0; + private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5; private final ECContainerOperationClient containerOperationClient; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 878558073f75..0cb3973e0411 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -43,8 +43,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream private final int numParityBlks; private final ByteBufferPool bufferPool; private final RawErasureEncoder encoder; - private final ExecutorService flushExecutor; private final Future flushFuture; private final AtomicLong flushCheckpoint; @@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) { this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( builder.getReplicationConfig()); - this.flushExecutor = Executors.newSingleThreadExecutor(); S3Auth s3Auth = builder.getS3CredentialsProvider().get(); ThreadLocal s3CredentialsProvider = builder.getS3CredentialsProvider(); - flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); - this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); + this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> { + s3CredentialsProvider.set(s3Auth); + return flushStripeFromQueue(); + }); this.flushCheckpoint = new AtomicLong(0); this.atomicKeyCreation = builder.getAtomicKeyCreation(); } @@ -495,7 +493,6 @@ public void close() throws IOException { } catch (InterruptedException e) { throw new IOException("Flushing thread was interrupted", e); } finally { - flushExecutor.shutdownNow(); closeCurrentStreamEntry(); blockOutputStreamEntryPool.cleanup(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 74b22e7ca4c6..a6830ba9f771 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -196,8 +196,7 @@ public class RpcClient implements ClientProtocol { // for reconstruction. private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - // TODO: Adjusts to the appropriate value when the writeThreadPool is used. - private static final int WRITE_POOL_MIN_SIZE = 0; + private static final int WRITE_POOL_MIN_SIZE = 1; private final ConfigurationSource conf; private final OzoneManagerClientProtocol ozoneManagerClient; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java index 2b13daaca291..c3ea911f1935 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java @@ -209,7 +209,7 @@ public void testReleaseBuffers() throws Exception { return v; }); futures.add(future); - watcher.getFutureMap().put(length, future); + watcher.putFlushFuture(length, future); replies.add(reply); } @@ -282,7 +282,7 @@ public void testReleaseBuffersOnException() throws Exception { return v; }); futures.add(future); - watcher.getFutureMap().put(length, future); + watcher.putFlushFuture(length, future); replies.add(reply); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java index 29cf1bc5e117..44303ed2ff23 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java @@ -213,6 +213,14 @@ static void shutdown() throws IOException { } } + static void reInitClient() throws IOException { + ozClient = OzoneClientFactory.getRpcClient(conf); + store = ozClient.getObjectStore(); + TestOzoneRpcClient.setOzClient(ozClient); + TestOzoneRpcClient.setStore(store); + } + + @ParameterizedTest @EnumSource void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception { @@ -770,9 +778,7 @@ void testGetKeyProvider() throws Exception { KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider(); assertNotEquals(kp3, kpSpy); - // Restore ozClient and store - TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf)); - TestOzoneRpcClient.setStore(ozClient.getObjectStore()); + reInitClient(); } private static RepeatedOmKeyInfo getMatchedKeyInfo( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index da41561f8ce2..cd77706b862c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -1651,6 +1651,7 @@ public void testPutKeyRatisThreeNodesParallel() throws IOException, } latch.countDown(); } catch (IOException ex) { + LOG.error("Execution failed: ", ex); latch.countDown(); failCount.incrementAndGet(); }