From b0b89cdd8b26b23d5b48baae82f7f0582e50e36c Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 15 Feb 2024 18:18:20 +0800 Subject: [PATCH 1/6] HDDS-10383. Introduce a Provider for client-side thread resources passing --- .../hadoop/hdds/scm/OzoneClientConfig.java | 15 +++++ .../hdds/scm/storage/BlockOutputStream.java | 6 +- .../hdds/scm/storage/ECBlockOutputStream.java | 9 ++- .../scm/storage/RatisBlockOutputStream.java | 13 ++-- .../io/BlockOutputStreamResourceProvider.java | 60 +++++++++++++++++ .../TestBlockOutputStreamCorrectness.java | 6 +- .../ECReconstructionCoordinator.java | 45 +++++++++---- .../client/io/BlockOutputStreamEntry.java | 28 ++++---- .../client/io/BlockOutputStreamEntryPool.java | 22 ++++--- .../client/io/ECBlockOutputStreamEntry.java | 23 +++---- .../io/ECBlockOutputStreamEntryPool.java | 8 +-- .../ozone/client/io/ECKeyOutputStream.java | 7 +- .../ozone/client/io/KeyOutputStream.java | 35 +++++----- .../hadoop/ozone/client/rpc/RpcClient.java | 66 ++++++++++++++----- .../ozone/client/OzoneOutputStreamStub.java | 7 +- 15 files changed, 254 insertions(+), 96 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 44af34cb919c..65e466529773 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -201,6 +201,13 @@ public enum ChecksumCombineMode { // 3 concurrent stripe read should be enough. private int ecReconstructStripeReadPoolLimit = 10 * 3; + @Config(key = "ec.reconstruct.stripe.write.pool.limit", + defaultValue = "30", + description = "Thread pool max size for parallelly write" + + " available ec chunks to reconstruct the whole stripe.", + tags = ConfigTag.CLIENT) + private int ecReconstructStripeWritePoolLimit = 10 * 3; + @Config(key = "checksum.combine.mode", defaultValue = "COMPOSITE_CRC", description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] " @@ -387,6 +394,14 @@ public int getEcReconstructStripeReadPoolLimit() { return ecReconstructStripeReadPoolLimit; } + public void setEcReconstructStripeWritePoolLimit(int poolLimit) { + this.ecReconstructStripeWritePoolLimit = poolLimit; + } + + public int getEcReconstructStripeWritePoolLimit() { + return ecReconstructStripeWritePoolLimit; + } + public void setFsDefaultBucketLayout(String bucketLayout) { if (!bucketLayout.isEmpty()) { this.fsDefaultBucketLayout = bucketLayout; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index bbc461669584..53f7f0f8404b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -145,7 +146,8 @@ public BlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -189,7 +191,7 @@ public BlockOutputStream( ioException = new AtomicReference<>(null); checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); - this.clientMetrics = clientMetrics; + this.clientMetrics = blockOutputStreamResourceProvider.getClientMetrics(); this.pipeline = pipeline; this.streamBufferArgs = streamBufferArgs; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 0abc2274bf08..87644338f5f0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -23,13 +23,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.security.token.Token; @@ -75,10 +75,13 @@ public ECBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) throws IOException { super(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); + pipeline, bufferPool, config, token, streamBufferArgs, + blockOutputStreamResourceProvider + ); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index b52fc2af9178..fe1f095c075e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -21,12 +21,12 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -65,8 +65,8 @@ public class RatisBlockOutputStream extends BlockOutputStream /** * Creates a new BlockOutputStream. * - * @param blockID block ID - * @param bufferPool pool of buffers + * @param blockID block ID + * @param bufferPool pool of buffers */ @SuppressWarnings("checkstyle:ParameterNumber") public RatisBlockOutputStream( @@ -76,10 +76,13 @@ public RatisBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) throws IOException { super(blockID, xceiverClientManager, pipeline, - bufferPool, config, token, clientMetrics, streamBufferArgs); + bufferPool, config, token, streamBufferArgs, + blockOutputStreamResourceProvider + ); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java new file mode 100644 index 000000000000..fb289fcaa5d1 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; + +/** + * Provides resources for BlockOutputStream, including executor service, + * and client metrics. + */ +public final class BlockOutputStreamResourceProvider { + private final Supplier executorServiceSupplier; + private final ContainerClientMetrics clientMetrics; + + /** + * Creates an instance of blockOutputStreamResourceProvider. + */ + public static BlockOutputStreamResourceProvider create( + Supplier executorServiceSupplier, ContainerClientMetrics clientMetrics) { + return new BlockOutputStreamResourceProvider(executorServiceSupplier, clientMetrics); + } + + private BlockOutputStreamResourceProvider(Supplier executorServiceSupplier, + ContainerClientMetrics clientMetrics) { + this.executorServiceSupplier = executorServiceSupplier; + this.clientMetrics = clientMetrics; + } + + /** + * Provides an ExecutorService, lazily initialized upon first request. + */ + public ExecutorService getExecutorService() { + return executorServiceSupplier.get(); + } + + /** + * Returns the ContainerClientMetrics instance. + */ + public ContainerClientMetrics getClientMetrics() { + return clientMetrics; + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 9b061f5392d3..7094c5312fe7 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -43,10 +43,12 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -108,7 +110,9 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) bufferPool, config, null, - ContainerClientMetrics.acquire(), streamBufferArgs); + streamBufferArgs, + BlockOutputStreamResourceProvider.create(() -> newFixedThreadPool(10), + ContainerClientMetrics.acquire())); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 234439a00c24..59dc97652760 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy; import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -73,6 +74,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.hadoop.ozone.container.ec.reconstruction.TokenHelper.encode; @@ -101,18 +103,22 @@ public class ECReconstructionCoordinator implements Closeable { private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; + // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used. + private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0; + private final ECContainerOperationClient containerOperationClient; private final ByteBufferPool byteBufferPool; - private final ExecutorService ecReconstructExecutor; - + private final ExecutorService ecReconstructReadExecutor; + private volatile ExecutorService ecReconstructWriteExecutor; private final BlockInputStreamFactory blockInputStreamFactory; private final TokenHelper tokenHelper; - private final ContainerClientMetrics clientMetrics; private final ECReconstructionMetrics metrics; private final StateContext context; private final OzoneClientConfig ozoneClientConfig; + private final BlockOutputStreamResourceProvider + blockOutputStreamResourceProvider; public ECReconstructionCoordinator( ConfigurationSource conf, CertificateClient certificateClient, @@ -123,22 +129,33 @@ public ECReconstructionCoordinator( this.containerOperationClient = new ECContainerOperationClient(conf, certificateClient); this.byteBufferPool = new ElasticByteBufferPool(); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d") + Function threadFactoryCreator = name -> + new ThreadFactoryBuilder() + .setNameFormat(threadNamePrefix + name) .build(); ozoneClientConfig = conf.getObject(OzoneClientConfig.class); - this.ecReconstructExecutor = + this.ecReconstructReadExecutor = new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, ozoneClientConfig.getEcReconstructStripeReadPoolLimit(), 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - threadFactory, + threadFactoryCreator.apply("ec-reconstruct-reader-TID-%d"), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.ecReconstructWriteExecutor = + new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE, + conf.getObject(OzoneClientConfig.class) + .getEcReconstructStripeWritePoolLimit(), + 60, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + threadFactoryCreator.apply("ec-reconstruct-writer-TID-%d"), new ThreadPoolExecutor.CallerRunsPolicy()); this.blockInputStreamFactory = BlockInputStreamFactoryImpl - .getInstance(byteBufferPool, () -> ecReconstructExecutor); + .getInstance(byteBufferPool, () -> ecReconstructReadExecutor); + blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider.create( + () -> ecReconstructWriteExecutor, ContainerClientMetrics.acquire()); tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient); - this.clientMetrics = ContainerClientMetrics.acquire(); this.metrics = metrics; } @@ -232,7 +249,9 @@ private ECBlockOutputStream getECBlockOutputStream( containerOperationClient.singleNodePipeline(datanodeDetails, repConfig, replicaIndex), BufferPool.empty(), ozoneClientConfig, - blockLocationInfo.getToken(), clientMetrics, streamBufferArgs); + blockLocationInfo.getToken(), streamBufferArgs, + blockOutputStreamResourceProvider + ); } @VisibleForTesting @@ -272,7 +291,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, repConfig, blockLocationInfo, true, this.containerOperationClient.getXceiverClientManager(), null, this.blockInputStreamFactory, byteBufferPool, - this.ecReconstructExecutor)) { + this.ecReconstructReadExecutor)) { ECBlockOutputStream[] targetBlockStreams = new ECBlockOutputStream[toReconstructIndexes.size()]; @@ -457,6 +476,10 @@ public void close() throws IOException { if (containerOperationClient != null) { containerOperationClient.close(); } + if (ecReconstructWriteExecutor != null) { + ecReconstructWriteExecutor.shutdownNow(); + ecReconstructWriteExecutor = null; + } } private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 9bdec27f534f..0e42bc36d30b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -61,8 +60,8 @@ public class BlockOutputStreamEntry extends OutputStream { private final Token token; private BufferPool bufferPool; - private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; + private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; @SuppressWarnings({"parameternumber", "squid:S00107"}) BlockOutputStreamEntry( @@ -73,7 +72,8 @@ public class BlockOutputStreamEntry extends OutputStream { BufferPool bufferPool, Token token, OzoneClientConfig config, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) { this.config = config; this.outputStream = null; @@ -85,8 +85,8 @@ public class BlockOutputStreamEntry extends OutputStream { this.length = length; this.currentPosition = 0; this.bufferPool = bufferPool; - this.clientMetrics = clientMetrics; this.streamBufferArgs = streamBufferArgs; + this.blockOutputStreamResourceProvider = blockOutputStreamResourceProvider; } /** @@ -108,11 +108,13 @@ void checkStream() throws IOException { */ void createOutputStream() throws IOException { outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); + pipeline, bufferPool, config, token, streamBufferArgs, + blockOutputStreamResourceProvider + ); } - ContainerClientMetrics getClientMetrics() { - return clientMetrics; + BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { + return blockOutputStreamResourceProvider; } StreamBufferArgs getStreamBufferArgs() { @@ -359,8 +361,8 @@ public static class Builder { private BufferPool bufferPool; private Token token; private OzoneClientConfig config; - private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; + private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -402,8 +404,10 @@ public Builder setToken(Token bToken) { this.token = bToken; return this; } - public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { - this.clientMetrics = clientMetrics; + + public Builder setblockOutputStreamResourceProvider( + BlockOutputStreamResourceProvider provider) { + this.blockOutputStreamResourceProvider = provider; return this; } public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { @@ -418,7 +422,9 @@ public BlockOutputStreamEntry build() { pipeline, length, bufferPool, - token, config, clientMetrics, streamBufferArgs); + token, config, streamBufferArgs, + blockOutputStreamResourceProvider + ); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index d0f3b5728a8b..417795b45268 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ByteStringConversion; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -83,8 +82,9 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; - private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; + private final BlockOutputStreamResourceProvider + blockOutputStreamResourceProvider; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockOutputStreamEntryPool( @@ -95,7 +95,8 @@ public BlockOutputStreamEntryPool( boolean isMultipart, OmKeyInfo info, boolean unsafeByteBufferConversion, XceiverClientFactory xceiverClientFactory, long openID, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) { this.config = config; this.xceiverClientFactory = xceiverClientFactory; @@ -117,8 +118,8 @@ public BlockOutputStreamEntryPool( .getStreamBufferSize()), ByteStringConversion .createByteBufferConversion(unsafeByteBufferConversion)); - this.clientMetrics = clientMetrics; this.streamBufferArgs = streamBufferArgs; + this.blockOutputStreamResourceProvider = blockOutputStreamResourceProvider; } ExcludeList createExcludeList() { @@ -126,8 +127,9 @@ ExcludeList createExcludeList() { Clock.system(ZoneOffset.UTC)); } - BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics, - OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) { + BlockOutputStreamEntryPool( + StreamBufferArgs streamBufferArgs, OzoneClientConfig clientConfig, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider) { streamEntries = new ArrayList<>(); omClient = null; keyArgs = null; @@ -141,8 +143,8 @@ ExcludeList createExcludeList() { currentStreamIndex = 0; openID = -1; excludeList = createExcludeList(); - this.clientMetrics = clientMetrics; this.streamBufferArgs = null; + this.blockOutputStreamResourceProvider = blockOutputStreamResourceProvider; } /** @@ -188,8 +190,8 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setLength(subKeyInfo.getLength()) .setBufferPool(bufferPool) .setToken(subKeyInfo.getToken()) - .setClientMetrics(clientMetrics) .setStreamBufferArgs(streamBufferArgs) + .setblockOutputStreamResourceProvider(blockOutputStreamResourceProvider) .build(); } @@ -252,8 +254,8 @@ OzoneClientConfig getConfig() { return config; } - ContainerClientMetrics getClientMetrics() { - return clientMetrics; + BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { + return blockOutputStreamResourceProvider; } StreamBufferArgs getStreamBufferArgs() { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 07d0f46069ca..574e7c7d8166 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -79,10 +78,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry { ECBlockOutputStreamEntry(BlockID blockID, String key, XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length, BufferPool bufferPool, Token token, - OzoneClientConfig config, ContainerClientMetrics clientMetrics, - StreamBufferArgs streamBufferArgs) { + OzoneClientConfig config, StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider provider) { super(blockID, key, xceiverClientManager, pipeline, length, bufferPool, - token, config, clientMetrics, streamBufferArgs); + token, config, streamBufferArgs, provider); assertInstanceOf( pipeline.getReplicationConfig(), ECReplicationConfig.class); this.replicationConfig = @@ -101,7 +100,8 @@ void checkStream() throws IOException { streams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), - getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs()); + getBufferPool(), getConf(), getToken(), getStreamBufferArgs(), + getblockOutputStreamResourceProvider()); } blockOutputStreams = streams; } @@ -442,8 +442,8 @@ public static class Builder { private BufferPool bufferPool; private Token token; private OzoneClientConfig config; - private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; + private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -488,10 +488,9 @@ public ECBlockOutputStreamEntry.Builder setToken( this.token = bToken; return this; } - - public ECBlockOutputStreamEntry.Builder setClientMetrics( - ContainerClientMetrics containerClientMetrics) { - this.clientMetrics = containerClientMetrics; + public ECBlockOutputStreamEntry.Builder setblockOutputStreamResourceProvider( + BlockOutputStreamResourceProvider provider) { + this.blockOutputStreamResourceProvider = provider; return this; } @@ -508,7 +507,9 @@ public ECBlockOutputStreamEntry build() { pipeline, length, bufferPool, - token, config, clientMetrics, streamBufferArgs); + token, config, streamBufferArgs, + blockOutputStreamResourceProvider + ); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index e551605d842d..f9e9b417ee15 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -19,7 +19,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -57,10 +56,11 @@ public ECBlockOutputStreamEntryPool(OzoneClientConfig config, boolean unsafeByteBufferConversion, XceiverClientFactory xceiverClientFactory, long openID, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs) { + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider) { super(config, omClient, requestId, replicationConfig, uploadID, partNumber, isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory, - openID, clientMetrics, streamBufferArgs); + openID, streamBufferArgs, blockOutputStreamResourceProvider); assert replicationConfig instanceof ECReplicationConfig; } @@ -82,8 +82,8 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setLength(subKeyInfo.getLength()) .setBufferPool(getBufferPool()) .setToken(subKeyInfo.getToken()) - .setClientMetrics(getClientMetrics()) .setStreamBufferArgs(getStreamBufferArgs()) + .setblockOutputStreamResourceProvider(getblockOutputStreamResourceProvider()) .build(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index b5c36474ff9e..6fdcdf125ba6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -128,8 +128,8 @@ public long getFlushCheckpoint() { } private ECKeyOutputStream(Builder builder) { - super(builder.getReplicationConfig(), builder.getClientMetrics(), - builder.getClientConfig(), builder.getStreamBufferArgs()); + super(builder.getReplicationConfig(), builder.getStreamBufferArgs(), + builder.getClientConfig(), builder.getblockOutputStreamResourceProvider()); this.config = builder.getClientConfig(); this.bufferPool = builder.getByteBufferPool(); // For EC, cell/chunk size and buffer size can be same for now. @@ -149,7 +149,8 @@ private ECKeyOutputStream(Builder builder) { builder.isMultipartKey(), info, builder.isUnsafeByteBufferConversionEnabled(), builder.getXceiverManager(), builder.getOpenHandler().getId(), - builder.getClientMetrics(), builder.getStreamBufferArgs()); + builder.getStreamBufferArgs(), + builder.getblockOutputStreamResourceProvider()); this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 8b128e9cd945..7eb335471f0e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -106,8 +105,8 @@ enum StreamAction { private boolean atomicKeyCreation; public KeyOutputStream(ReplicationConfig replicationConfig, - ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig, - StreamBufferArgs streamBufferArgs) { + StreamBufferArgs streamBufferArgs, OzoneClientConfig clientConfig, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider) { this.replication = replicationConfig; this.config = clientConfig; closed = false; @@ -117,8 +116,8 @@ public KeyOutputStream(ReplicationConfig replicationConfig, e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); retryCount = 0; offset = 0; - blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool(clientMetrics, clientConfig, streamBufferArgs); + blockOutputStreamEntryPool = new BlockOutputStreamEntryPool( + streamBufferArgs, clientConfig, blockOutputStreamResourceProvider); } @VisibleForTesting @@ -155,9 +154,9 @@ public KeyOutputStream( String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion, - ContainerClientMetrics clientMetrics, - boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs - ) { + boolean atomicKeyCreation, + StreamBufferArgs streamBufferArgs, + BlockOutputStreamResourceProvider blockOutputStreamResourceProvider) { this.config = config; this.replication = replicationConfig; blockOutputStreamEntryPool = @@ -170,7 +169,9 @@ public KeyOutputStream( unsafeByteBufferConversion, xceiverClientManager, handler.getId(), - clientMetrics, streamBufferArgs); + streamBufferArgs, + blockOutputStreamResourceProvider + ); this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval()); this.retryCount = 0; @@ -612,9 +613,9 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; - private ContainerClientMetrics clientMetrics; private boolean atomicKeyCreation = false; private StreamBufferArgs streamBufferArgs; + private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; public String getMultipartUploadID() { return multipartUploadID; @@ -715,13 +716,14 @@ public Builder setAtomicKeyCreation(boolean atomicKey) { return this; } - public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { - this.clientMetrics = clientMetrics; + public Builder setblockOutputStreamResourceProvider( + BlockOutputStreamResourceProvider provider) { + this.blockOutputStreamResourceProvider = provider; return this; } - public ContainerClientMetrics getClientMetrics() { - return clientMetrics; + public BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { + return blockOutputStreamResourceProvider; } public boolean getAtomicKeyCreation() { @@ -740,9 +742,10 @@ public KeyOutputStream build() { multipartNumber, isMultipartKey, unsafeByteBufferConversion, - clientMetrics, atomicKeyCreation, - streamBufferArgs); + streamBufferArgs, + blockOutputStreamResourceProvider + ); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 94d6ae9769dc..86cbe8efa6d6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -80,6 +80,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.client.io.CipherOutputStreamOzone; import org.apache.hadoop.ozone.client.io.ECKeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; @@ -195,6 +196,9 @@ public class RpcClient implements ClientProtocol { // for reconstruction. private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; + // TODO: Adjusts to the appropriate value when the writeThreadPool is used. + private static final int WRITE_POOL_MIN_SIZE = 0; + private final ConfigurationSource conf; private final OzoneManagerClientProtocol ozoneManagerClient; private final XceiverClientFactory xceiverClientManager; @@ -214,8 +218,10 @@ public class RpcClient implements ClientProtocol { private final BlockInputStreamFactory blockInputStreamFactory; private final OzoneManagerVersion omVersion; private volatile ExecutorService ecReconstructExecutor; - private final ContainerClientMetrics clientMetrics; + private volatile ExecutorService writeExecutor; private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); + private final BlockOutputStreamResourceProvider + blockOutputStreamResourceProvider; /** * Creates RpcClient instance with the given configuration. @@ -312,7 +318,8 @@ public void onRemoval( this.byteBufferPool = new ElasticByteBufferPool(); this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, this::getECReconstructExecutor); - this.clientMetrics = ContainerClientMetrics.acquire(); + this.blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider + .create(this::getWriteThreadPool, ContainerClientMetrics.acquire()); } public XceiverClientFactory getXceiverClientManager() { @@ -1756,6 +1763,10 @@ public void close() throws IOException { ecReconstructExecutor.shutdownNow(); ecReconstructExecutor = null; } + if (writeExecutor != null) { + writeExecutor.shutdownNow(); + writeExecutor = null; + } IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager); keyProviderCache.invalidateAll(); keyProviderCache.cleanUp(); @@ -2374,7 +2385,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) - .setClientMetrics(clientMetrics) + .setblockOutputStreamResourceProvider(blockOutputStreamResourceProvider) .setStreamBufferArgs(streamBufferArgs); } @@ -2496,26 +2507,45 @@ public void setTimes(OzoneObj obj, String keyName, long mtime, long atime) ozoneManagerClient.setTimes(builder.build(), mtime, atime); } + private ExecutorService createThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, String threadNameFormat) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + new ThreadFactoryBuilder() + .setNameFormat(threadNameFormat) + .build(), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + public ExecutorService getECReconstructExecutor() { - // local ref to a volatile to ensure access - // to a completed initialized object - ExecutorService executor = ecReconstructExecutor; - if (executor == null) { + ExecutorService localRef = ecReconstructExecutor; + if (localRef == null) { synchronized (this) { - executor = ecReconstructExecutor; - if (executor == null) { - ecReconstructExecutor = new ThreadPoolExecutor( - EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, + localRef = ecReconstructExecutor; + if (localRef == null) { + localRef = createThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, clientConfig.getEcReconstructStripeReadPoolLimit(), - 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new ThreadFactoryBuilder() - .setNameFormat("ec-reconstruct-reader-TID-%d") - .build(), - new ThreadPoolExecutor.CallerRunsPolicy()); - executor = ecReconstructExecutor; + "ec-reconstruct-reader-TID-%d"); + ecReconstructExecutor = localRef; + } + } + } + return localRef; + } + + public ExecutorService getWriteThreadPool() { + ExecutorService localRef = writeExecutor; + if (localRef == null) { + synchronized (this) { + localRef = writeExecutor; + if (localRef == null) { + localRef = createThreadPoolExecutor(WRITE_POOL_MIN_SIZE, + Integer.MAX_VALUE, + "client-write-TID-%d"); + writeExecutor = localRef; } } } - return executor; + return localRef; } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index da2fb26ec8f5..97eb1e6b3604 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.io.KeyMetadataAware; import org.apache.hadoop.hdds.scm.OzoneClientConfig; @@ -32,6 +33,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.Executors; /** * OzoneOutputStream stub for testing. @@ -84,7 +86,10 @@ public KeyOutputStream getKeyOutputStream() { OzoneClientConfig ozoneClientConfig = conf.getObject(OzoneClientConfig.class); StreamBufferArgs streamBufferArgs = StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig, ozoneClientConfig); - return new KeyOutputStream(replicationConfig, null, ozoneClientConfig, streamBufferArgs) { + return new KeyOutputStream(replicationConfig, + streamBufferArgs, ozoneClientConfig, + BlockOutputStreamResourceProvider.create(Executors::newSingleThreadExecutor, null) + ) { @Override public synchronized OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { From 9b460fbaac1048c0d8b8b38d2ce52bb2fc444cb1 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 21 Feb 2024 23:10:18 +0800 Subject: [PATCH 2/6] Fix conflict --- .../hdds/scm/storage/BlockOutputStream.java | 4 ++-- .../hdds/scm/storage/ECBlockOutputStream.java | 7 +++--- .../scm/storage/RatisBlockOutputStream.java | 7 +++--- .../io/BlockOutputStreamResourceProvider.java | 18 +++------------ .../TestBlockOutputStreamCorrectness.java | 4 ++-- .../ECReconstructionCoordinator.java | 9 ++++---- .../client/io/BlockOutputStreamEntry.java | 23 ++++++++++++++----- .../client/io/BlockOutputStreamEntryPool.java | 13 ++++++++--- .../client/io/ECBlockOutputStreamEntry.java | 5 +--- .../io/ECBlockOutputStreamEntryPool.java | 9 +------- .../ozone/client/io/KeyOutputStream.java | 19 +++++++++++---- .../hadoop/ozone/client/rpc/RpcClient.java | 5 +++- .../ozone/client/OzoneOutputStreamStub.java | 2 -- 13 files changed, 65 insertions(+), 60 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 53f7f0f8404b..2885f1721b20 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -146,7 +146,7 @@ public BlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - StreamBufferArgs streamBufferArgs, + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) throws IOException { this.xceiverClientFactory = xceiverClientManager; @@ -191,7 +191,7 @@ public BlockOutputStream( ioException = new AtomicReference<>(null); checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); - this.clientMetrics = blockOutputStreamResourceProvider.getClientMetrics(); + this.clientMetrics = clientMetrics; this.pipeline = pipeline; this.streamBufferArgs = streamBufferArgs; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 87644338f5f0..b71846fb03ba 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -75,13 +76,11 @@ public ECBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - StreamBufferArgs streamBufferArgs, + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) throws IOException { super(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, streamBufferArgs, - blockOutputStreamResourceProvider - ); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index fe1f095c075e..5b4a1156531e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -76,13 +77,11 @@ public RatisBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - StreamBufferArgs streamBufferArgs, + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, BlockOutputStreamResourceProvider blockOutputStreamResourceProvider ) throws IOException { super(blockID, xceiverClientManager, pipeline, - bufferPool, config, token, streamBufferArgs, - blockOutputStreamResourceProvider - ); + bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java index fb289fcaa5d1..ab97e3e4340a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java @@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; /** * Provides resources for BlockOutputStream, including executor service, @@ -28,20 +27,16 @@ */ public final class BlockOutputStreamResourceProvider { private final Supplier executorServiceSupplier; - private final ContainerClientMetrics clientMetrics; - /** * Creates an instance of blockOutputStreamResourceProvider. */ public static BlockOutputStreamResourceProvider create( - Supplier executorServiceSupplier, ContainerClientMetrics clientMetrics) { - return new BlockOutputStreamResourceProvider(executorServiceSupplier, clientMetrics); + Supplier executorServiceSupplier) { + return new BlockOutputStreamResourceProvider(executorServiceSupplier); } - private BlockOutputStreamResourceProvider(Supplier executorServiceSupplier, - ContainerClientMetrics clientMetrics) { + private BlockOutputStreamResourceProvider(Supplier executorServiceSupplier) { this.executorServiceSupplier = executorServiceSupplier; - this.clientMetrics = clientMetrics; } /** @@ -50,11 +45,4 @@ private BlockOutputStreamResourceProvider(Supplier executorServ public ExecutorService getExecutorService() { return executorServiceSupplier.get(); } - - /** - * Returns the ContainerClientMetrics instance. - */ - public ContainerClientMetrics getClientMetrics() { - return clientMetrics; - } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 7094c5312fe7..b2fb049073f4 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -110,9 +110,9 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) bufferPool, config, null, + ContainerClientMetrics.acquire(), streamBufferArgs, - BlockOutputStreamResourceProvider.create(() -> newFixedThreadPool(10), - ContainerClientMetrics.acquire())); + BlockOutputStreamResourceProvider.create(() -> newFixedThreadPool(10))); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 59dc97652760..9c99e2317119 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -114,6 +114,7 @@ public class ECReconstructionCoordinator implements Closeable { private volatile ExecutorService ecReconstructWriteExecutor; private final BlockInputStreamFactory blockInputStreamFactory; private final TokenHelper tokenHelper; + private final ContainerClientMetrics clientMetrics; private final ECReconstructionMetrics metrics; private final StateContext context; private final OzoneClientConfig ozoneClientConfig; @@ -153,9 +154,9 @@ public ECReconstructionCoordinator( new ThreadPoolExecutor.CallerRunsPolicy()); this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, () -> ecReconstructReadExecutor); - blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider.create( - () -> ecReconstructWriteExecutor, ContainerClientMetrics.acquire()); + blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider.create(() -> ecReconstructWriteExecutor); tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient); + this.clientMetrics = ContainerClientMetrics.acquire(); this.metrics = metrics; } @@ -249,9 +250,7 @@ private ECBlockOutputStream getECBlockOutputStream( containerOperationClient.singleNodePipeline(datanodeDetails, repConfig, replicaIndex), BufferPool.empty(), ozoneClientConfig, - blockLocationInfo.getToken(), streamBufferArgs, - blockOutputStreamResourceProvider - ); + blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); } @VisibleForTesting diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 400c15df8213..e6accdf1d2e3 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -105,9 +106,12 @@ void checkStream() throws IOException { */ void createOutputStream() throws IOException { outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, streamBufferArgs, - blockOutputStreamResourceProvider - ); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, + blockOutputStreamResourceProvider); + } + + ContainerClientMetrics getClientMetrics() { + return clientMetrics; } BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { @@ -358,6 +362,7 @@ public static class Builder { private BufferPool bufferPool; private Token token; private OzoneClientConfig config; + private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; @@ -410,16 +415,22 @@ public Builder setToken(Token bToken) { return this; } - public Builder setblockOutputStreamResourceProvider( - BlockOutputStreamResourceProvider provider) { - this.blockOutputStreamResourceProvider = provider; + public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { + this.clientMetrics = clientMetrics; return this; } + public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { this.streamBufferArgs = streamBufferArgs; return this; } + public Builder setblockOutputStreamResourceProvider( + BlockOutputStreamResourceProvider provider) { + this.blockOutputStreamResourceProvider = provider; + return this; + } + public BlockOutputStreamEntry build() { return new BlockOutputStreamEntry(this); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 189efcb98475..910f5559ba48 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -80,6 +81,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; + private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; private final BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; @@ -108,7 +110,7 @@ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) { ByteStringConversion .createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled())); this.clientMetrics = b.getClientMetrics(); - this.blockOutputStreamResourceProvider = blockOutputStreamResourceProvider; + this.blockOutputStreamResourceProvider = b.getblockOutputStreamResourceProvider(); } ExcludeList createExcludeList() { @@ -157,6 +159,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setLength(subKeyInfo.getLength()) .setBufferPool(bufferPool) .setToken(subKeyInfo.getToken()) + .setClientMetrics(clientMetrics) .setStreamBufferArgs(streamBufferArgs) .setblockOutputStreamResourceProvider(blockOutputStreamResourceProvider) .build(); @@ -221,14 +224,18 @@ OzoneClientConfig getConfig() { return config; } - BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { - return blockOutputStreamResourceProvider; + ContainerClientMetrics getClientMetrics() { + return clientMetrics; } StreamBufferArgs getStreamBufferArgs() { return streamBufferArgs; } + BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { + return blockOutputStreamResourceProvider; + } + /** * Discards the subsequent pre allocated blocks and removes the streamEntries * from the streamEntries list for the container which is closed. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index aeacf9ec6239..0d6982c80ac7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -23,9 +23,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; @@ -88,7 +85,7 @@ void checkStream() throws IOException { streams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), - getBufferPool(), getConf(), getToken(), getStreamBufferArgs(), + getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs(), getblockOutputStreamResourceProvider()); } blockOutputStreams = streams; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index 6723da1b35be..eb04396015fd 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -17,13 +17,6 @@ */ package org.apache.hadoop.ozone.client.io; -import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; /** @@ -56,7 +49,7 @@ ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setToken(subKeyInfo.getToken()) .setClientMetrics(getClientMetrics()) .setStreamBufferArgs(getStreamBufferArgs()) - .setblockOutputStreamResourceProvider(getblockOutputStreamResourceProvider()) + .setblockOutputStreamResourceProvider(getblockOutputStreamResourceProvider()); return b.build(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 07ae16bd6d9b..5cf55b62ef23 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -582,6 +583,7 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; + private ContainerClientMetrics clientMetrics; private boolean atomicKeyCreation = false; private StreamBufferArgs streamBufferArgs; private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; @@ -685,6 +687,19 @@ public Builder setAtomicKeyCreation(boolean atomicKey) { return this; } + public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { + this.clientMetrics = clientMetrics; + return this; + } + + public ContainerClientMetrics getClientMetrics() { + return clientMetrics; + } + + public boolean getAtomicKeyCreation() { + return atomicKeyCreation; + } + public Builder setblockOutputStreamResourceProvider( BlockOutputStreamResourceProvider provider) { this.blockOutputStreamResourceProvider = provider; @@ -695,10 +710,6 @@ public BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() return blockOutputStreamResourceProvider; } - public boolean getAtomicKeyCreation() { - return atomicKeyCreation; - } - public KeyOutputStream build() { return new KeyOutputStream(this); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 8947f2ca6de4..54d1fc152250 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -218,6 +218,7 @@ public class RpcClient implements ClientProtocol { private final BlockInputStreamFactory blockInputStreamFactory; private final OzoneManagerVersion omVersion; private volatile ExecutorService ecReconstructExecutor; + private final ContainerClientMetrics clientMetrics; private volatile ExecutorService writeExecutor; private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); private final BlockOutputStreamResourceProvider @@ -318,8 +319,9 @@ public void onRemoval( this.byteBufferPool = new ElasticByteBufferPool(); this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, this::getECReconstructExecutor); + this.clientMetrics = ContainerClientMetrics.acquire(); this.blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider - .create(this::getWriteThreadPool, ContainerClientMetrics.acquire()); + .create(this::getWriteThreadPool); } public XceiverClientFactory getXceiverClientManager() { @@ -2410,6 +2412,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) + .setClientMetrics(clientMetrics) .setblockOutputStreamResourceProvider(blockOutputStreamResourceProvider) .setStreamBufferArgs(streamBufferArgs); } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index 8357e05aa11c..ca3caa4ee777 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.io.KeyMetadataAware; import org.apache.hadoop.ozone.client.io.KeyOutputStream; @@ -31,7 +30,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.util.concurrent.Executors; /** * OzoneOutputStream stub for testing. From 2279ade2e777c6f3be9d3fa6330c06bb02b554de Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 21 Feb 2024 23:27:58 +0800 Subject: [PATCH 3/6] Use MemoizedSupplier Simplified Code --- .../hadoop/ozone/client/rpc/RpcClient.java | 68 ++++++------------- 1 file changed, 19 insertions(+), 49 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 54d1fc152250..a55a24553740 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -146,6 +146,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,9 +218,9 @@ public class RpcClient implements ClientProtocol { private final ByteBufferPool byteBufferPool; private final BlockInputStreamFactory blockInputStreamFactory; private final OzoneManagerVersion omVersion; - private volatile ExecutorService ecReconstructExecutor; + private final MemoizedSupplier ecReconstructExecutor; private final ContainerClientMetrics clientMetrics; - private volatile ExecutorService writeExecutor; + private final MemoizedSupplier writeExecutor; private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); private final BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; @@ -244,6 +245,11 @@ public RpcClient(ConfigurationSource conf, String omServiceId) this.groupRights = aclConfig.getGroupDefaultRights(); this.clientConfig = conf.getObject(OzoneClientConfig.class); + this.ecReconstructExecutor = MemoizedSupplier.valueOf(() -> createThreadPoolExecutor( + EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, clientConfig.getEcReconstructStripeReadPoolLimit(), + "ec-reconstruct-reader-TID-%d")); + this.writeExecutor = MemoizedSupplier.valueOf(() -> createThreadPoolExecutor( + WRITE_POOL_MIN_SIZE, Integer.MAX_VALUE, "client-write-TID-%d")); OmTransport omTransport = createOmTransport(omServiceId); OzoneManagerProtocolClientSideTranslatorPB @@ -318,10 +324,10 @@ public void onRemoval( }).build(); this.byteBufferPool = new ElasticByteBufferPool(); this.blockInputStreamFactory = BlockInputStreamFactoryImpl - .getInstance(byteBufferPool, this::getECReconstructExecutor); + .getInstance(byteBufferPool, ecReconstructExecutor); this.clientMetrics = ContainerClientMetrics.acquire(); this.blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider - .create(this::getWriteThreadPool); + .create(writeExecutor); } public XceiverClientFactory getXceiverClientManager() { @@ -1786,13 +1792,11 @@ private OmKeyInfo getKeyInfo(OmKeyArgs keyArgs) throws IOException { @Override public void close() throws IOException { - if (ecReconstructExecutor != null) { - ecReconstructExecutor.shutdownNow(); - ecReconstructExecutor = null; + if (ecReconstructExecutor.isInitialized()) { + ecReconstructExecutor.get().shutdownNow(); } - if (writeExecutor != null) { - writeExecutor.shutdownNow(); - writeExecutor = null; + if (writeExecutor.isInitialized()) { + writeExecutor.get().shutdownNow(); } IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager); keyProviderCache.invalidateAll(); @@ -2535,45 +2539,11 @@ public void setTimes(OzoneObj obj, String keyName, long mtime, long atime) ozoneManagerClient.setTimes(builder.build(), mtime, atime); } - private ExecutorService createThreadPoolExecutor( - int corePoolSize, int maximumPoolSize, String threadNameFormat) { + private static ExecutorService createThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, String threadNameFormat) { return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, - 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new ThreadFactoryBuilder() - .setNameFormat(threadNameFormat) - .build(), - new ThreadPoolExecutor.CallerRunsPolicy()); - } - - public ExecutorService getECReconstructExecutor() { - ExecutorService localRef = ecReconstructExecutor; - if (localRef == null) { - synchronized (this) { - localRef = ecReconstructExecutor; - if (localRef == null) { - localRef = createThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, - clientConfig.getEcReconstructStripeReadPoolLimit(), - "ec-reconstruct-reader-TID-%d"); - ecReconstructExecutor = localRef; - } - } - } - return localRef; - } - - public ExecutorService getWriteThreadPool() { - ExecutorService localRef = writeExecutor; - if (localRef == null) { - synchronized (this) { - localRef = writeExecutor; - if (localRef == null) { - localRef = createThreadPoolExecutor(WRITE_POOL_MIN_SIZE, - Integer.MAX_VALUE, - "client-write-TID-%d"); - writeExecutor = localRef; - } - } - } - return localRef; + 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(), + new ThreadPoolExecutor.CallerRunsPolicy()); } } From 4e0acfa9c2d33aea3d9e548eb16af3b7816cc492 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 22 Feb 2024 22:50:04 +0800 Subject: [PATCH 4/6] Pass writeExecutor directly --- .../hdds/scm/storage/BlockOutputStream.java | 4 +- .../hdds/scm/storage/ECBlockOutputStream.java | 7 +-- .../scm/storage/RatisBlockOutputStream.java | 6 +-- .../io/BlockOutputStreamResourceProvider.java | 48 ------------------- .../TestBlockOutputStreamCorrectness.java | 5 +- .../ECReconstructionCoordinator.java | 6 +-- .../client/io/BlockOutputStreamEntry.java | 21 ++++---- .../client/io/BlockOutputStreamEntryPool.java | 12 +++-- .../client/io/ECBlockOutputStreamEntry.java | 2 +- .../io/ECBlockOutputStreamEntryPool.java | 2 +- .../ozone/client/io/KeyOutputStream.java | 13 ++--- .../hadoop/ozone/client/rpc/RpcClient.java | 7 +-- 12 files changed, 38 insertions(+), 95 deletions(-) delete mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 2885f1721b20..bbc461669584 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -146,8 +145,7 @@ public BlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, - BlockOutputStreamResourceProvider blockOutputStreamResourceProvider + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index b71846fb03ba..78ac506367f6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.security.token.Token; @@ -45,6 +44,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; @@ -77,10 +78,10 @@ public ECBlockOutputStream( OzoneClientConfig config, Token token, ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, - BlockOutputStreamResourceProvider blockOutputStreamResourceProvider + Supplier executorServiceSupplier ) throws IOException { super(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index 5b4a1156531e..f8bf6d839f54 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -77,11 +76,10 @@ public RatisBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, - BlockOutputStreamResourceProvider blockOutputStreamResourceProvider + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) throws IOException { super(blockID, xceiverClientManager, pipeline, - bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); + bufferPool, config, token, clientMetrics, streamBufferArgs); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java deleted file mode 100644 index ab97e3e4340a..000000000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamResourceProvider.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.io; - -import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; - -/** - * Provides resources for BlockOutputStream, including executor service, - * and client metrics. - */ -public final class BlockOutputStreamResourceProvider { - private final Supplier executorServiceSupplier; - /** - * Creates an instance of blockOutputStreamResourceProvider. - */ - public static BlockOutputStreamResourceProvider create( - Supplier executorServiceSupplier) { - return new BlockOutputStreamResourceProvider(executorServiceSupplier); - } - - private BlockOutputStreamResourceProvider(Supplier executorServiceSupplier) { - this.executorServiceSupplier = executorServiceSupplier; - } - - /** - * Provides an ExecutorService, lazily initialized upon first request. - */ - public ExecutorService getExecutorService() { - return executorServiceSupplier.get(); - } -} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index b2fb049073f4..2fb32cd11c12 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -111,8 +110,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) config, null, ContainerClientMetrics.acquire(), - streamBufferArgs, - BlockOutputStreamResourceProvider.create(() -> newFixedThreadPool(10))); + streamBufferArgs + ); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 9c99e2317119..46b0ddd46566 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -35,8 +35,8 @@ import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; -import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.SecurityConfig; +import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.IOUtils; @@ -45,7 +45,6 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy; import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -118,8 +117,6 @@ public class ECReconstructionCoordinator implements Closeable { private final ECReconstructionMetrics metrics; private final StateContext context; private final OzoneClientConfig ozoneClientConfig; - private final BlockOutputStreamResourceProvider - blockOutputStreamResourceProvider; public ECReconstructionCoordinator( ConfigurationSource conf, CertificateClient certificateClient, @@ -154,7 +151,6 @@ public ECReconstructionCoordinator( new ThreadPoolExecutor.CallerRunsPolicy()); this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, () -> ecReconstructReadExecutor); - blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider.create(() -> ecReconstructWriteExecutor); tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient); this.clientMetrics = ContainerClientMetrics.acquire(); this.metrics = metrics; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index e6accdf1d2e3..a94f028197db 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -21,6 +21,8 @@ import java.io.OutputStream; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.BlockID; @@ -64,7 +66,7 @@ public class BlockOutputStreamEntry extends OutputStream { private final BufferPool bufferPool; private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; - private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; + private final Supplier executorServiceSupplier; BlockOutputStreamEntry(Builder b) { this.config = b.config; @@ -79,7 +81,7 @@ public class BlockOutputStreamEntry extends OutputStream { this.bufferPool = b.bufferPool; this.clientMetrics = b.clientMetrics; this.streamBufferArgs = b.streamBufferArgs; - this.blockOutputStreamResourceProvider = b.blockOutputStreamResourceProvider; + this.executorServiceSupplier = b.executorServiceSupplier; } @Override @@ -106,16 +108,16 @@ void checkStream() throws IOException { */ void createOutputStream() throws IOException { outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, - blockOutputStreamResourceProvider); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs + ); } ContainerClientMetrics getClientMetrics() { return clientMetrics; } - BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { - return blockOutputStreamResourceProvider; + Supplier getExecutorServiceSupplier() { + return executorServiceSupplier; } StreamBufferArgs getStreamBufferArgs() { @@ -364,7 +366,7 @@ public static class Builder { private OzoneClientConfig config; private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; - private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; + private Supplier executorServiceSupplier; public Pipeline getPipeline() { return pipeline; @@ -425,9 +427,8 @@ public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { return this; } - public Builder setblockOutputStreamResourceProvider( - BlockOutputStreamResourceProvider provider) { - this.blockOutputStreamResourceProvider = provider; + public Builder setExecutorServiceSupplier(Supplier executorServiceSupplier) { + this.executorServiceSupplier = executorServiceSupplier; return this; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 910f5559ba48..51383e8717ab 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; @@ -83,7 +85,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final ExcludeList excludeList; private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; - private final BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; + private final Supplier executorServiceSupplier; public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) { this.config = b.getClientConfig(); @@ -110,7 +112,7 @@ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) { ByteStringConversion .createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled())); this.clientMetrics = b.getClientMetrics(); - this.blockOutputStreamResourceProvider = b.getblockOutputStreamResourceProvider(); + this.executorServiceSupplier = b.getExecutorServiceSupplier(); } ExcludeList createExcludeList() { @@ -161,7 +163,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setToken(subKeyInfo.getToken()) .setClientMetrics(clientMetrics) .setStreamBufferArgs(streamBufferArgs) - .setblockOutputStreamResourceProvider(blockOutputStreamResourceProvider) + .setExecutorServiceSupplier(executorServiceSupplier) .build(); } @@ -232,8 +234,8 @@ StreamBufferArgs getStreamBufferArgs() { return streamBufferArgs; } - BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { - return blockOutputStreamResourceProvider; + public Supplier getExecutorServiceSupplier() { + return executorServiceSupplier; } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 0d6982c80ac7..241754a57f19 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -86,7 +86,7 @@ void checkStream() throws IOException { new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs(), - getblockOutputStreamResourceProvider()); + getExecutorServiceSupplier()); } blockOutputStreams = streams; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index eb04396015fd..6eb9aed0d3ad 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -49,7 +49,7 @@ ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setToken(subKeyInfo.getToken()) .setClientMetrics(getClientMetrics()) .setStreamBufferArgs(getStreamBufferArgs()) - .setblockOutputStreamResourceProvider(getblockOutputStreamResourceProvider()); + .setExecutorServiceSupplier(getExecutorServiceSupplier()); return b.build(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 5cf55b62ef23..d9e735cd7c8c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.hadoop.fs.FSExceptionMessages; @@ -586,7 +588,7 @@ public static class Builder { private ContainerClientMetrics clientMetrics; private boolean atomicKeyCreation = false; private StreamBufferArgs streamBufferArgs; - private BlockOutputStreamResourceProvider blockOutputStreamResourceProvider; + private Supplier executorServiceSupplier; public String getMultipartUploadID() { return multipartUploadID; @@ -700,14 +702,13 @@ public boolean getAtomicKeyCreation() { return atomicKeyCreation; } - public Builder setblockOutputStreamResourceProvider( - BlockOutputStreamResourceProvider provider) { - this.blockOutputStreamResourceProvider = provider; + public Builder setExecutorServiceSupplier(Supplier executorServiceSupplier) { + this.executorServiceSupplier = executorServiceSupplier; return this; } - public BlockOutputStreamResourceProvider getblockOutputStreamResourceProvider() { - return blockOutputStreamResourceProvider; + public Supplier getExecutorServiceSupplier() { + return executorServiceSupplier; } public KeyOutputStream build() { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index a55a24553740..74b22e7ca4c6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -80,7 +80,6 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamResourceProvider; import org.apache.hadoop.ozone.client.io.CipherOutputStreamOzone; import org.apache.hadoop.ozone.client.io.ECKeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; @@ -222,8 +221,6 @@ public class RpcClient implements ClientProtocol { private final ContainerClientMetrics clientMetrics; private final MemoizedSupplier writeExecutor; private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); - private final BlockOutputStreamResourceProvider - blockOutputStreamResourceProvider; /** * Creates RpcClient instance with the given configuration. @@ -326,8 +323,6 @@ public void onRemoval( this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, ecReconstructExecutor); this.clientMetrics = ContainerClientMetrics.acquire(); - this.blockOutputStreamResourceProvider = BlockOutputStreamResourceProvider - .create(writeExecutor); } public XceiverClientFactory getXceiverClientManager() { @@ -2417,7 +2412,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) .setClientMetrics(clientMetrics) - .setblockOutputStreamResourceProvider(blockOutputStreamResourceProvider) + .setExecutorServiceSupplier(writeExecutor) .setStreamBufferArgs(streamBufferArgs); } From 204401a293f6aa187c49b885823b1eddd7d7dbb4 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 22 Feb 2024 22:52:45 +0800 Subject: [PATCH 5/6] Fix checkstyle --- .../hdds/scm/storage/TestBlockOutputStreamCorrectness.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 2fb32cd11c12..887a3451c14c 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -47,7 +47,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; From ae840f026aea8b1f25637c1caab6a8d8c2b9d301 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 22 Feb 2024 23:37:19 +0800 Subject: [PATCH 6/6] Fix building issue --- .../hdds/scm/storage/BlockOutputStream.java | 4 +- .../hdds/scm/storage/ECBlockOutputStream.java | 2 +- .../scm/storage/RatisBlockOutputStream.java | 7 ++- .../TestBlockOutputStreamCorrectness.java | 5 +- .../ECReconstructionCoordinator.java | 50 ++++++++----------- .../client/io/BlockOutputStreamEntry.java | 4 +- 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index bbc461669584..5ff5da60989e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -145,7 +146,8 @@ public BlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, + Supplier blockOutputStreamResourceProvider ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 78ac506367f6..adecc3e4c1e2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -81,7 +81,7 @@ public ECBlockOutputStream( Supplier executorServiceSupplier ) throws IOException { super(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index f8bf6d839f54..6a2758d36486 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -37,6 +37,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; /** * An {@link OutputStream} used by the REST service in combination with the @@ -76,10 +78,11 @@ public RatisBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, + Supplier blockOutputStreamResourceProvider ) throws IOException { super(blockID, xceiverClientManager, pipeline, - bufferPool, config, token, clientMetrics, streamBufferArgs); + bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 887a3451c14c..d06c9cf684f4 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -47,6 +47,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -109,8 +110,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) config, null, ContainerClientMetrics.acquire(), - streamBufferArgs - ); + streamBufferArgs, + () -> newFixedThreadPool(10)); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 46b0ddd46566..a45c15844847 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.security.token.Token; +import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,10 +71,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.hadoop.ozone.container.ec.reconstruction.TokenHelper.encode; @@ -110,7 +109,7 @@ public class ECReconstructionCoordinator implements Closeable { private final ByteBufferPool byteBufferPool; private final ExecutorService ecReconstructReadExecutor; - private volatile ExecutorService ecReconstructWriteExecutor; + private final MemoizedSupplier ecReconstructWriteExecutor; private final BlockInputStreamFactory blockInputStreamFactory; private final TokenHelper tokenHelper; private final ContainerClientMetrics clientMetrics; @@ -127,28 +126,16 @@ public ECReconstructionCoordinator( this.containerOperationClient = new ECContainerOperationClient(conf, certificateClient); this.byteBufferPool = new ElasticByteBufferPool(); - Function threadFactoryCreator = name -> - new ThreadFactoryBuilder() - .setNameFormat(threadNamePrefix + name) - .build(); ozoneClientConfig = conf.getObject(OzoneClientConfig.class); - this.ecReconstructReadExecutor = - new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, - ozoneClientConfig.getEcReconstructStripeReadPoolLimit(), - 60, - TimeUnit.SECONDS, - new SynchronousQueue<>(), - threadFactoryCreator.apply("ec-reconstruct-reader-TID-%d"), - new ThreadPoolExecutor.CallerRunsPolicy()); - this.ecReconstructWriteExecutor = - new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE, - conf.getObject(OzoneClientConfig.class) - .getEcReconstructStripeWritePoolLimit(), - 60, - TimeUnit.SECONDS, - new SynchronousQueue<>(), - threadFactoryCreator.apply("ec-reconstruct-writer-TID-%d"), - new ThreadPoolExecutor.CallerRunsPolicy()); + this.ecReconstructReadExecutor = createThreadPoolExecutor( + EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, + ozoneClientConfig.getEcReconstructStripeReadPoolLimit(), + threadNamePrefix + "ec-reconstruct-reader-TID-%d"); + this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf( + () -> createThreadPoolExecutor( + EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE, + ozoneClientConfig.getEcReconstructStripeWritePoolLimit(), + threadNamePrefix + "ec-reconstruct-writer-TID-%d")); this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, () -> ecReconstructReadExecutor); tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient); @@ -246,7 +233,7 @@ private ECBlockOutputStream getECBlockOutputStream( containerOperationClient.singleNodePipeline(datanodeDetails, repConfig, replicaIndex), BufferPool.empty(), ozoneClientConfig, - blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); + blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor); } @VisibleForTesting @@ -471,9 +458,8 @@ public void close() throws IOException { if (containerOperationClient != null) { containerOperationClient.close(); } - if (ecReconstructWriteExecutor != null) { - ecReconstructWriteExecutor.shutdownNow(); - ecReconstructWriteExecutor = null; + if (ecReconstructWriteExecutor.isInitialized()) { + ecReconstructWriteExecutor.get().shutdownNow(); } } @@ -608,4 +594,12 @@ OptionalLong getTermOfLeaderSCM() { .map(StateContext::getTermOfLeaderSCM) .orElse(OptionalLong.empty()); } + + private static ExecutorService createThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, String threadNameFormat) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(), + new ThreadPoolExecutor.CallerRunsPolicy()); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index a94f028197db..ba3850ff3947 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -108,8 +108,8 @@ void checkStream() throws IOException { */ void createOutputStream() throws IOException { outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs - ); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, + executorServiceSupplier); } ContainerClientMetrics getClientMetrics() {