Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -137,6 +137,29 @@
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG =
LoggerFactory.getLogger(ContainerStateMachine.class);

static class TaskQueueMap {
private final Map<Long, TaskQueue> map = new HashMap<>();

synchronized CompletableFuture<ContainerCommandResponseProto> submit(
long containerId,
CheckedSupplier<ContainerCommandResponseProto, Exception> task,
ExecutorService executor) {
final TaskQueue queue = map.computeIfAbsent(
containerId, id -> new TaskQueue("container" + id));
final CompletableFuture<ContainerCommandResponseProto> f
= queue.submit(task, executor);
// after the task is completed, remove the queue if the queue is empty.
f.thenAccept(dummy -> removeIfEmpty(containerId));
return f;
}

synchronized void removeIfEmpty(long containerId) {
map.computeIfPresent(containerId,
(id, q) -> q.isEmpty() ? null : q);
}
}

private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final RaftGroupId gid;
Expand All @@ -148,7 +171,7 @@ public class ContainerStateMachine extends BaseStateMachine {

// keeps track of the containers created per pipeline
private final Map<Long, Long> container2BCSIDMap;
private final ConcurrentMap<Long, TaskQueue> containerTaskQueues;
private final TaskQueueMap containerTaskQueues = new TaskQueueMap();
private final ExecutorService executor;
private final List<ThreadPoolExecutor> chunkExecutors;
private final Map<Long, Long> applyTransactionCompletionMap;
Expand Down Expand Up @@ -207,7 +230,6 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
.setNameFormat("ContainerOp-" + gid.getUuid() + "-%d")
.build());

this.containerTaskQueues = new ConcurrentHashMap<>();
this.waitOnBothFollowers = conf.getObject(
DatanodeConfiguration.class).waitOnAllFollowers();

Expand Down Expand Up @@ -806,8 +828,6 @@ private CompletableFuture<ContainerCommandResponseProto> submitTask(
ContainerCommandRequestProto request, DispatcherContext.Builder context,
Consumer<Exception> exceptionHandler) {
final long containerId = request.getContainerID();
final TaskQueue queue = containerTaskQueues.computeIfAbsent(
containerId, id -> new TaskQueue("container" + id));
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
Expand All @@ -817,12 +837,7 @@ private CompletableFuture<ContainerCommandResponseProto> submitTask(
throw e;
}
};
final CompletableFuture<ContainerCommandResponseProto> f
= queue.submit(task, executor);
// after the task is completed, remove the queue if the queue is empty.
f.thenAccept(dummy -> containerTaskQueues.computeIfPresent(containerId,
(id, q) -> q.isEmpty() ? null : q));
return f;
return containerTaskQueues.submit(containerId, task, executor);
}

// Removes the stateMachine data from cache once both followers catch up
Expand Down