Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ public enum ChecksumCombineMode {
// 3 concurrent stripe read should be enough.
private int ecReconstructStripeReadPoolLimit = 10 * 3;

@Config(key = "ec.reconstruct.stripe.write.pool.limit",
defaultValue = "30",
description = "Thread pool max size for parallelly write" +
" available ec chunks to reconstruct the whole stripe.",
tags = ConfigTag.CLIENT)
private int ecReconstructStripeWritePoolLimit = 10 * 3;

@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
Expand Down Expand Up @@ -387,6 +394,14 @@ public int getEcReconstructStripeReadPoolLimit() {
return ecReconstructStripeReadPoolLimit;
}

public void setEcReconstructStripeWritePoolLimit(int poolLimit) {
this.ecReconstructStripeWritePoolLimit = poolLimit;
}

public int getEcReconstructStripeWritePoolLimit() {
return ecReconstructStripeWritePoolLimit;
}

public void setFsDefaultBucketLayout(String bucketLayout) {
if (!bucketLayout.isEmpty()) {
this.fsDefaultBucketLayout = bucketLayout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -145,7 +146,8 @@ public BlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
Expand Down Expand Up @@ -75,10 +77,11 @@ public ECBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/**
* An {@link OutputStream} used by the REST service in combination with the
Expand Down Expand Up @@ -65,8 +67,8 @@ public class RatisBlockOutputStream extends BlockOutputStream
/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param bufferPool pool of buffers
* @param blockID block ID
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
Expand All @@ -76,10 +78,11 @@ public RatisBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs);
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -108,7 +109,9 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
bufferPool,
config,
null,
ContainerClientMetrics.acquire(), streamBufferArgs);
ContainerClientMetrics.acquire(),
streamBufferArgs,
() -> newFixedThreadPool(10));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
Expand All @@ -50,6 +50,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,7 +71,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,12 +101,15 @@ public class ECReconstructionCoordinator implements Closeable {

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used.
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;

private final ECContainerOperationClient containerOperationClient;

private final ByteBufferPool byteBufferPool;

private final ExecutorService ecReconstructExecutor;

private final ExecutorService ecReconstructReadExecutor;
private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ContainerClientMetrics clientMetrics;
Expand All @@ -123,20 +126,18 @@ public ECReconstructionCoordinator(
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
.build();
ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
this.ecReconstructExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
this.ecReconstructReadExecutor = createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
threadNamePrefix + "ec-reconstruct-reader-TID-%d");
this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
() -> createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructExecutor);
.getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
Expand Down Expand Up @@ -232,7 +233,7 @@ private ECBlockOutputStream getECBlockOutputStream(
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), ozoneClientConfig,
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor);
}

@VisibleForTesting
Expand Down Expand Up @@ -272,7 +273,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
repConfig, blockLocationInfo, true,
this.containerOperationClient.getXceiverClientManager(), null,
this.blockInputStreamFactory, byteBufferPool,
this.ecReconstructExecutor)) {
this.ecReconstructReadExecutor)) {

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
Expand Down Expand Up @@ -457,6 +458,9 @@ public void close() throws IOException {
if (containerOperationClient != null) {
containerOperationClient.close();
}
if (ecReconstructWriteExecutor.isInitialized()) {
ecReconstructWriteExecutor.get().shutdownNow();
}
}

private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
Expand Down Expand Up @@ -590,4 +594,12 @@ OptionalLong getTermOfLeaderSCM() {
.map(StateContext::getTermOfLeaderSCM)
.orElse(OptionalLong.empty());
}

private static ExecutorService createThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, String threadNameFormat) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private final BufferPool bufferPool;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
private final Supplier<ExecutorService> executorServiceSupplier;

BlockOutputStreamEntry(Builder b) {
this.config = b.config;
Expand All @@ -78,6 +81,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.bufferPool = b.bufferPool;
this.clientMetrics = b.clientMetrics;
this.streamBufferArgs = b.streamBufferArgs;
this.executorServiceSupplier = b.executorServiceSupplier;
}

@Override
Expand All @@ -104,13 +108,18 @@ void checkStream() throws IOException {
*/
void createOutputStream() throws IOException {
outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
executorServiceSupplier);
}

ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}

Supplier<ExecutorService> getExecutorServiceSupplier() {
return executorServiceSupplier;
}

StreamBufferArgs getStreamBufferArgs() {
return streamBufferArgs;
}
Expand Down Expand Up @@ -357,6 +366,7 @@ public static class Builder {
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;

public Pipeline getPipeline() {
return pipeline;
Expand Down Expand Up @@ -406,15 +416,22 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}

public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
}

public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
this.streamBufferArgs = streamBufferArgs;
return this;
}

public Builder setExecutorServiceSupplier(Supplier<ExecutorService> executorServiceSupplier) {
this.executorServiceSupplier = executorServiceSupplier;
return this;
}

public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final ExcludeList excludeList;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
private final Supplier<ExecutorService> executorServiceSupplier;

public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
this.config = b.getClientConfig();
Expand All @@ -109,6 +112,7 @@ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
ByteStringConversion
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
this.clientMetrics = b.getClientMetrics();
this.executorServiceSupplier = b.getExecutorServiceSupplier();
}

ExcludeList createExcludeList() {
Expand Down Expand Up @@ -159,6 +163,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
.setToken(subKeyInfo.getToken())
.setClientMetrics(clientMetrics)
.setStreamBufferArgs(streamBufferArgs)
.setExecutorServiceSupplier(executorServiceSupplier)
.build();
}

Expand Down Expand Up @@ -229,6 +234,10 @@ StreamBufferArgs getStreamBufferArgs() {
return streamBufferArgs;
}

public Supplier<ExecutorService> getExecutorServiceSupplier() {
return executorServiceSupplier;
}

/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ void checkStream() throws IOException {
streams[i] =
new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1),
getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs());
getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs(),
getExecutorServiceSupplier());
}
blockOutputStreams = streams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
.setBufferPool(getBufferPool())
.setToken(subKeyInfo.getToken())
.setClientMetrics(getClientMetrics())
.setStreamBufferArgs(getStreamBufferArgs());
.setStreamBufferArgs(getStreamBufferArgs())
.setExecutorServiceSupplier(getExecutorServiceSupplier());
return b.build();
}

Expand Down
Loading