diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 93a45905975a..55cd57d9dc78 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -27,6 +27,10 @@ import java.util.OptionalLong; import java.util.Queue; import java.util.Set; +import java.util.Objects; +import java.util.LinkedHashMap; +import java.util.Collections; +import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -45,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors.Descriptor; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; @@ -53,10 +58,12 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -112,7 +119,7 @@ public class StateContext { private final Map> incrementalReportsQueue; private final Map> containerActions; - private final Map> pipelineActions; + private final Map pipelineActions; private DatanodeStateMachine.DatanodeStates state; private boolean shutdownOnError = false; private boolean shutdownGracefully = false; @@ -178,7 +185,7 @@ public StateContext(ConfigurationSource conf, pipelineReports = new AtomicReference<>(); endpoints = new HashSet<>(); containerActions = new HashMap<>(); - pipelineActions = new HashMap<>(); + pipelineActions = new ConcurrentHashMap<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); threadPoolNotAvailableCount = new AtomicLong(0); @@ -518,47 +525,16 @@ public List getPendingContainerAction( } } - /** - * Helper function for addPipelineActionIfAbsent that check if inputs are the - * same close pipeline action. - * - * Important Note: Make sure to double check for correctness before using this - * helper function for other purposes! - * - * @return true if a1 and a2 are the same close pipeline action, - * false otherwise - */ - boolean isSameClosePipelineAction(PipelineAction a1, PipelineAction a2) { - return a1.getAction() == a2.getAction() - && a1.hasClosePipeline() - && a2.hasClosePipeline() - && a1.getClosePipeline().getPipelineID() - .equals(a2.getClosePipeline().getPipelineID()); - } - /** * Add PipelineAction to PipelineAction queue if it's not present. * * @param pipelineAction PipelineAction to be added */ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { - synchronized (pipelineActions) { - /** - * If pipelineAction queue already contains entry for the pipeline id - * with same action, we should just return. - * Note: We should not use pipelineActions.contains(pipelineAction) here - * as, pipelineAction has a msg string. So even if two msgs differ though - * action remains same on the given pipeline, it will end up adding it - * multiple times here. - */ - for (InetSocketAddress endpoint : endpoints) { - final Queue actionsForEndpoint = - pipelineActions.get(endpoint); - if (actionsForEndpoint.stream().noneMatch( - action -> isSameClosePipelineAction(action, pipelineAction))) { - actionsForEndpoint.add(pipelineAction); - } - } + // Put only if the pipeline id with the same action is absent. + final PipelineKey key = new PipelineKey(pipelineAction); + for (InetSocketAddress endpoint : endpoints) { + pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction); } } @@ -571,34 +547,17 @@ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { public List getPendingPipelineAction( InetSocketAddress endpoint, int maxLimit) { - List pipelineActionList = new ArrayList<>(); - List persistPipelineAction = new ArrayList<>(); - synchronized (pipelineActions) { - if (!pipelineActions.isEmpty() && - CollectionUtils.isNotEmpty(pipelineActions.get(endpoint))) { - Queue actionsForEndpoint = - this.pipelineActions.get(endpoint); - int size = actionsForEndpoint.size(); - int limit = size > maxLimit ? maxLimit : size; - for (int count = 0; count < limit; count++) { - // Add closePipeline back to the pipelineAction queue until - // pipeline is closed and removed from the DN. - PipelineAction action = actionsForEndpoint.poll(); - if (action.hasClosePipeline()) { - if (parentDatanodeStateMachine.getContainer().getPipelineReport() - .getPipelineReportList().stream().noneMatch( - report -> action.getClosePipeline().getPipelineID() - .equals(report.getPipelineID()))) { - continue; - } - persistPipelineAction.add(action); - } - pipelineActionList.add(action); - } - actionsForEndpoint.addAll(persistPipelineAction); - } - return pipelineActionList; + final PipelineActionMap map = pipelineActions.get(endpoint); + if (map == null) { + return Collections.emptyList(); } + final OzoneContainer ozoneContainer = parentDatanodeStateMachine. + getContainer(); + if (ozoneContainer == null) { + return Collections.emptyList(); + } + final PipelineReportsProto reports = ozoneContainer.getPipelineReport(); + return map.getActions(reports.getPipelineReportList(), maxLimit); } /** @@ -927,7 +886,7 @@ public void addEndpoint(InetSocketAddress endpoint) { if (!endpoints.contains(endpoint)) { this.endpoints.add(endpoint); this.containerActions.put(endpoint, new LinkedList<>()); - this.pipelineActions.put(endpoint, new LinkedList<>()); + this.pipelineActions.put(endpoint, new PipelineActionMap()); this.incrementalReportsQueue.put(endpoint, new LinkedList<>()); Map mp = new HashMap<>(); fullReportTypeList.forEach(e -> { @@ -988,4 +947,79 @@ public DatanodeQueueMetrics getQueueMetrics() { public String getThreadNamePrefix() { return threadNamePrefix; } + + static class PipelineActionMap { + private final LinkedHashMap map = + new LinkedHashMap<>(); + + synchronized int size() { + return map.size(); + } + + synchronized void putIfAbsent(PipelineKey key, + PipelineAction pipelineAction) { + map.putIfAbsent(key, pipelineAction); + } + + synchronized List getActions(List reports, + int max) { + if (map.isEmpty()) { + return Collections.emptyList(); + } + final List pipelineActionList = new ArrayList<>(); + final int limit = Math.min(map.size(), max); + final Iterator> i = + map.entrySet().iterator(); + for (int count = 0; count < limit && i.hasNext(); count++) { + final Map.Entry entry = i.next(); + final PipelineAction action = entry.getValue(); + // Add closePipeline back to the pipelineAction queue until + // pipeline is closed and removed from the DN. + if (action.hasClosePipeline()) { + if (reports.stream().noneMatch(entry.getKey()::equalsId)) { + // pipeline is removed from the DN, this action is no longer needed. + i.remove(); + continue; + } + // pipeline is closed but not yet removed from the DN. + } else { + i.remove(); + } + pipelineActionList.add(action); + } + // add all + return pipelineActionList; + } + } + + static class PipelineKey { + private final HddsProtos.PipelineID pipelineID; + private final PipelineAction.Action action; + + PipelineKey(PipelineAction p) { + this.pipelineID = p.getClosePipeline().getPipelineID(); + this.action = p.getAction(); + } + + boolean equalsId(PipelineReport report) { + return pipelineID.equals(report.getPipelineID()); + } + + @Override + public int hashCode() { + return Objects.hashCode(pipelineID); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (!(obj instanceof PipelineKey)) { + return false; + } + final PipelineKey that = (PipelineKey) obj; + return Objects.equals(this.action, that.action) + && Objects.equals(this.pipelineID, that.pipelineID); + } + } }