diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index dd0fa6722ea6..3d2a834a7083 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -35,7 +35,14 @@ public class DatanodeConfiguration { * The maximum number of replication commands a single datanode can execute * simultaneously. */ - private int replicationMaxStreams = 10; + private final int replicationMaxStreamsDefault = 10; + private int replicationMaxStreams = replicationMaxStreamsDefault; + /** + * The maximum number of threads used to delete containers on a datanode + * simultaneously. + */ + private final int containerDeleteThreadsDefault = 2; + private int containerDeleteThreads = containerDeleteThreadsDefault; @Config(key = "replication.streams.limit", type = ConfigType.INT, @@ -48,7 +55,8 @@ public void setReplicationMaxStreams(int val) { if (val < 1) { LOG.warn("hdds.datanode.replication.streams.limit must be greater than" + "zero and was set to {}. Defaulting to {}", - val, replicationMaxStreams); + val, replicationMaxStreamsDefault); + replicationMaxStreams = replicationMaxStreamsDefault; } else { this.replicationMaxStreams = val; } @@ -58,4 +66,26 @@ public int getReplicationMaxStreams() { return replicationMaxStreams; } + @Config(key = "container.delete.threads.max", + type = ConfigType.INT, + defaultValue = "2", + tags = {DATANODE}, + description = "The maximum number of threads used to delete containers " + + "on a datanode" + ) + public void setContainerDeleteThreads(int val) { + if (val < 1) { + LOG.warn("hdds.datanode.container.delete.threads.max must be greater " + + "than zero and was set to {}. Defaulting to {}", + val, containerDeleteThreadsDefault); + containerDeleteThreads = containerDeleteThreadsDefault; + } else { + this.containerDeleteThreads = val; + } + } + + public int getContainerDeleteThreads() { + return containerDeleteThreads; + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 2763278b0d2b..5424b6b1da3e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -130,7 +130,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) - .addHandler(new DeleteContainerCommandHandler()) + .addHandler(new DeleteContainerCommandHandler( + dnConf.getContainerDeleteThreads())) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) @@ -278,6 +279,10 @@ public void close() throws IOException { if (jvmPauseMonitor != null) { jvmPauseMonitor.stop(); } + + if (commandDispatcher != null) { + commandDispatcher.stop(); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index af854ec3d61a..911b1b8053a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -104,6 +104,12 @@ public void handle(SCMCommand command) { } } + public void stop() { + for (CommandHandler c : handlerMap.values()) { + c.stop(); + } + } + public static Builder newBuilder() { return new Builder(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 1ea0ea845150..70ed9cab88c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -72,4 +72,12 @@ default void updateCommandStatus(StateContext context, SCMCommand command, command.getId()); } } + + /** + * Override for any command with an internal threadpool, and stop the + * executor when this method is invoked. + */ + default void stop() { + // Default implementation does nothing + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index b54fb1a17ac0..9dc6021bfdcf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.ozone.container.common.statemachine @@ -31,6 +32,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Handler to process the DeleteContainerCommand from SCM. @@ -40,28 +47,39 @@ public class DeleteContainerCommandHandler implements CommandHandler { private static final Logger LOG = LoggerFactory.getLogger(DeleteContainerCommandHandler.class); - private int invocationCount; - private long totalTime; + private final AtomicInteger invocationCount = new AtomicInteger(0); + private final AtomicLong totalTime = new AtomicLong(0); + private final ExecutorService executor; + + public DeleteContainerCommandHandler(int threadPoolSize) { + this.executor = new ThreadPoolExecutor( + 0, threadPoolSize, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DeleteContainerThread-%d") + .build()); + } @Override public void handle(final SCMCommand command, final OzoneContainer ozoneContainer, final StateContext context, final SCMConnectionManager connectionManager) { - final long startTime = Time.monotonicNow(); - invocationCount++; - try { - final DeleteContainerCommand deleteContainerCommand = - (DeleteContainerCommand) command; - final ContainerController controller = ozoneContainer.getController(); - controller.deleteContainer(deleteContainerCommand.getContainerID(), - deleteContainerCommand.isForce()); - } catch (IOException e) { - LOG.error("Exception occurred while deleting the container.", e); - } finally { - totalTime += Time.monotonicNow() - startTime; - } - + final DeleteContainerCommand deleteContainerCommand = + (DeleteContainerCommand) command; + final ContainerController controller = ozoneContainer.getController(); + executor.execute(() -> { + final long startTime = Time.monotonicNow(); + invocationCount.incrementAndGet(); + try { + controller.deleteContainer(deleteContainerCommand.getContainerID(), + deleteContainerCommand.isForce()); + } catch (IOException e) { + LOG.error("Exception occurred while deleting the container.", e); + } finally { + totalTime.getAndAdd(Time.monotonicNow() - startTime); + } + }); } @Override @@ -71,11 +89,27 @@ public SCMCommandProto.Type getCommandType() { @Override public int getInvocationCount() { - return this.invocationCount; + return this.invocationCount.get(); } @Override public long getAverageRunTime() { - return invocationCount == 0 ? 0 : totalTime / invocationCount; + final int invocations = invocationCount.get(); + return invocations == 0 ? + 0 : totalTime.get() / invocations; } + + @Override + public void stop() { + try { + executor.shutdown(); + if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // Ignore, we don't really care about the failure. + Thread.currentThread().interrupt(); + } + } + }