Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ public enum ChecksumCombineMode {
// 3 concurrent stripe read should be enough.
private int ecReconstructStripeReadPoolLimit = 10 * 3;

@Config(key = "ec.reconstruct.stripe.write.pool.limit",
defaultValue = "30",
description = "Thread pool max size for parallelly write" +
" available ec chunks to reconstruct the whole stripe.",
tags = ConfigTag.CLIENT)
private int ecReconstructStripeWritePoolLimit = 10 * 3;

@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
Expand Down Expand Up @@ -387,6 +394,14 @@ public int getEcReconstructStripeReadPoolLimit() {
return ecReconstructStripeReadPoolLimit;
}

public void setEcReconstructStripeWritePoolLimit(int poolLimit) {
this.ecReconstructStripeWritePoolLimit = poolLimit;
}

public int getEcReconstructStripeWritePoolLimit() {
return ecReconstructStripeWritePoolLimit;
}

public void setFsDefaultBucketLayout(String bucketLayout) {
if (!bucketLayout.isEmpty()) {
this.fsDefaultBucketLayout = bucketLayout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
return commitIndexMap;
}

void updateCommitInfoMap(long index, List<BUFFER> buffers) {
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -144,7 +144,8 @@ public BlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand Down Expand Up @@ -177,8 +178,7 @@ public BlockOutputStream(
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand Down Expand Up @@ -660,7 +660,6 @@ public void cleanup(boolean invalidateClient) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

/**
* This class executes watchForCommit on ratis pipeline and releases
Expand All @@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
private final BufferPool bufferPool;

// future Map to hold up all putBlock futures
private final ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>>
futureMap = new ConcurrentHashMap<>();

CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
super(xceiverClient);
Expand All @@ -67,11 +69,24 @@ void releaseBuffers(long index) {
+ totalLength + ": existing = " + futureMap.keySet());
}

ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> getFutureMap() {
@VisibleForTesting
ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}

public void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
futureMap.compute(flushPos,
(key, previous) -> previous == null ? flushFuture :
previous.thenCombine(flushFuture, (prev, curr) -> curr));
}


public void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(futureMap.values().toArray(
new CompletableFuture[0])).get();
}

@Override
public void cleanup() {
super.cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
Expand Down Expand Up @@ -75,10 +77,11 @@ public ECBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/**
* An {@link OutputStream} used by the REST service in combination with the
Expand Down Expand Up @@ -69,8 +71,8 @@ public class RatisBlockOutputStream extends BlockOutputStream
/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param bufferPool pool of buffers
* @param blockID block ID
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
Expand All @@ -80,10 +82,11 @@ public RatisBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs);
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

Expand Down Expand Up @@ -114,16 +117,13 @@ void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
}

@Override
void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.getFutureMap().put(flushPos, flushFuture);
void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.putFlushFuture(flushPos, flushFuture);
}

@Override
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[0])).get();
commitWatcher.waitOnFlushFutures();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand Down Expand Up @@ -114,7 +115,9 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
bufferPool,
config,
null,
ContainerClientMetrics.acquire(), streamBufferArgs);
ContainerClientMetrics.acquire(),
streamBufferArgs,
() -> newFixedThreadPool(10));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,22 @@ public void handle(SCMCommand command, OzoneContainer container,
SCMCommandProto.Type.deleteBlocksCommand, command.getType());
return;
}

DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command,
container, context, connectionManager);
try {
DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command,
container, context, connectionManager);
deleteCommandQueues.add(cmd);
} catch (IllegalStateException e) {
String dnId = context.getParent().getDatanodeDetails().getUuidString();
Consumer<CommandStatus> updateFailure = (cmdStatus) -> {
cmdStatus.markAsFailed();
ContainerBlocksDeletionACKProto emptyACK =
ContainerBlocksDeletionACKProto
.newBuilder()
.setDnId(dnId)
.build();
((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(emptyACK);
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), updateFailure, LOG);
LOG.warn("Command is discarded because of the command queue is full");
}
}
Expand Down Expand Up @@ -382,9 +392,13 @@ private void processCmd(DeleteCmdInfo cmd) {
} finally {
final ContainerBlocksDeletionACKProto deleteAck =
blockDeletionACK;
final boolean status = cmdExecuted;
final boolean executedStatus = cmdExecuted;
Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
cmdStatus.setStatus(status);
if (executedStatus) {
cmdStatus.markAsExecuted();
} else {
cmdStatus.markAsFailed();
}
((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(deleteAck);
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
Expand All @@ -50,6 +50,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,7 +71,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,12 +101,14 @@ public class ECReconstructionCoordinator implements Closeable {

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;

private final ECContainerOperationClient containerOperationClient;

private final ByteBufferPool byteBufferPool;

private final ExecutorService ecReconstructExecutor;

private final ExecutorService ecReconstructReadExecutor;
private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ContainerClientMetrics clientMetrics;
Expand All @@ -123,20 +125,18 @@ public ECReconstructionCoordinator(
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
.build();
ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
this.ecReconstructExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
this.ecReconstructReadExecutor = createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
threadNamePrefix + "ec-reconstruct-reader-TID-%d");
this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
() -> createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructExecutor);
.getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
Expand Down Expand Up @@ -232,7 +232,7 @@ private ECBlockOutputStream getECBlockOutputStream(
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), ozoneClientConfig,
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor);
}

@VisibleForTesting
Expand Down Expand Up @@ -272,7 +272,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
repConfig, blockLocationInfo, true,
this.containerOperationClient.getXceiverClientManager(), null,
this.blockInputStreamFactory, byteBufferPool,
this.ecReconstructExecutor)) {
this.ecReconstructReadExecutor)) {

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
Expand Down Expand Up @@ -457,6 +457,9 @@ public void close() throws IOException {
if (containerOperationClient != null) {
containerOperationClient.close();
}
if (ecReconstructWriteExecutor.isInitialized()) {
ecReconstructWriteExecutor.get().shutdownNow();
}
}

private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
Expand Down Expand Up @@ -590,4 +593,12 @@ OptionalLong getTermOfLeaderSCM() {
.map(StateContext::getTermOfLeaderSCM)
.orElse(OptionalLong.empty());
}

private static ExecutorService createThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, String threadNameFormat) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
Loading