From cbb1dad01075c3032dff1b99943039c5c88d4cae Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 9 Mar 2023 19:00:56 -0800 Subject: [PATCH] HDDS-8129. ContainerStateMachine allows two different tasks with the same container id running in parallel. --- .../server/ratis/ContainerStateMachine.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index f6f5a99927ca..4883ab9dd205 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -25,10 +25,10 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -137,6 +137,29 @@ public class ContainerStateMachine extends BaseStateMachine { static final Logger LOG = LoggerFactory.getLogger(ContainerStateMachine.class); + + static class TaskQueueMap { + private final Map map = new HashMap<>(); + + synchronized CompletableFuture submit( + long containerId, + CheckedSupplier task, + ExecutorService executor) { + final TaskQueue queue = map.computeIfAbsent( + containerId, id -> new TaskQueue("container" + id)); + final CompletableFuture f + = queue.submit(task, executor); + // after the task is completed, remove the queue if the queue is empty. + f.thenAccept(dummy -> removeIfEmpty(containerId)); + return f; + } + + synchronized void removeIfEmpty(long containerId) { + map.computeIfPresent(containerId, + (id, q) -> q.isEmpty() ? null : q); + } + } + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final RaftGroupId gid; @@ -148,7 +171,7 @@ public class ContainerStateMachine extends BaseStateMachine { // keeps track of the containers created per pipeline private final Map container2BCSIDMap; - private final ConcurrentMap containerTaskQueues; + private final TaskQueueMap containerTaskQueues = new TaskQueueMap(); private final ExecutorService executor; private final List chunkExecutors; private final Map applyTransactionCompletionMap; @@ -207,7 +230,6 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, .setNameFormat("ContainerOp-" + gid.getUuid() + "-%d") .build()); - this.containerTaskQueues = new ConcurrentHashMap<>(); this.waitOnBothFollowers = conf.getObject( DatanodeConfiguration.class).waitOnAllFollowers(); @@ -806,8 +828,6 @@ private CompletableFuture submitTask( ContainerCommandRequestProto request, DispatcherContext.Builder context, Consumer exceptionHandler) { final long containerId = request.getContainerID(); - final TaskQueue queue = containerTaskQueues.computeIfAbsent( - containerId, id -> new TaskQueue("container" + id)); final CheckedSupplier task = () -> { try { @@ -817,12 +837,7 @@ private CompletableFuture submitTask( throw e; } }; - final CompletableFuture f - = queue.submit(task, executor); - // after the task is completed, remove the queue if the queue is empty. - f.thenAccept(dummy -> containerTaskQueues.computeIfPresent(containerId, - (id, q) -> q.isEmpty() ? null : q)); - return f; + return containerTaskQueues.submit(containerId, task, executor); } // Removes the stateMachine data from cache once both followers catch up