diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueueReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueueReportHandler.java index 4a857d22028c..a6d9ea56f29e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueueReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueueReportHandler.java @@ -44,6 +44,7 @@ public void onMessage(CommandQueueReportFromDatanode queueReportFromDatanode, Preconditions.checkNotNull(dn, "QueueReport is " + "missing DatanodeDetails."); nodeManager.processNodeCommandQueueReport(dn, - queueReportFromDatanode.getReport()); + queueReportFromDatanode.getReport(), + queueReportFromDatanode.getCommandsToBeSent()); } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index 2d2415c8586c..a2d8bb2dffe8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -286,11 +286,20 @@ public void setNodeStatus(NodeStatus newNodeStatus) { * Set the current command counts for this datanode, as reported in the last * heartbeat. * @param cmds Proto message containing a list of command count pairs. + * @param commandsToBeSent Summary of commands which will be sent to the DN + * as the heartbeat is processed and should be added + * to the command count. */ - public void setCommandCounts(CommandQueueReportProto cmds) { + public void setCommandCounts(CommandQueueReportProto cmds, + Map commandsToBeSent) { try { int count = cmds.getCommandCount(); + Map mutableCmds + = new HashMap<>(commandsToBeSent); lock.writeLock().lock(); + // Purge the existing counts, as each report should completely replace + // the existing counts. + commandCounts.clear(); for (int i = 0; i < count; i++) { SCMCommandProto.Type command = cmds.getCommand(i); if (command == SCMCommandProto.Type.unknownScmCommand) { @@ -305,8 +314,19 @@ public void setCommandCounts(CommandQueueReportProto cmds) { "Setting it to zero", cmdCount, this); cmdCount = 0; } + cmdCount += mutableCmds.getOrDefault(command, 0); + // Each CommandType will be in the report once only. So we remove any + // we have seen, so we can add anything the DN has not reported but + // there is a command queued for. The DNs should return a count for all + // command types even if they have a zero count, so this is really to + // handle something being wrong on the DN where it sends a spare report. + // It really should never happen. + mutableCmds.remove(command); commandCounts.put(command, cmdCount); } + // Add any counts which the DN did not report. See comment above. This + // should not happen. + commandCounts.putAll(mutableCmds); } finally { lock.writeLock().unlock(); } @@ -322,12 +342,7 @@ public void setCommandCounts(CommandQueueReportProto cmds) { public int getCommandCount(SCMCommandProto.Type cmd) { try { lock.readLock().lock(); - Integer count = commandCounts.get(cmd); - if (count == null) { - return -1; - } else { - return count.intValue(); - } + return commandCounts.getOrDefault(cmd, -1); } finally { lock.readLock().unlock(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e2f69f77db79..b63bfd33425a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -301,11 +301,15 @@ void processLayoutVersionReport(DatanodeDetails datanodeDetails, /** * Process the Command Queue Report sent from datanodes as part of the * heartbeat message. - * @param datanodeDetails - * @param commandReport + * @param datanodeDetails DatanodeDetails the report is from + * @param commandReport Command summary report from the DN when the heartbeat + * was created. + * @param commandsToBeSent Summary of command counts that will be sent to + * the Datanode as part of the current heartbeat */ void processNodeCommandQueueReport(DatanodeDetails datanodeDetails, - CommandQueueReportProto commandReport); + CommandQueueReportProto commandReport, + Map commandsToBeSent); /** * Get the number of commands of the given type queued on the datanode at the diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 62ef30d27c77..46379b739175 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -698,10 +698,12 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails, * * @param datanodeDetails * @param commandQueueReportProto + * @param commandsToBeSent */ @Override public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails, - CommandQueueReportProto commandQueueReportProto) { + CommandQueueReportProto commandQueueReportProto, + Map commandsToBeSent) { LOG.debug("Processing Command Queue Report from [datanode={}]", datanodeDetails.getHostName()); if (LOG.isTraceEnabled()) { @@ -712,7 +714,8 @@ public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails, try { DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails); if (commandQueueReportProto != null) { - datanodeInfo.setCommandCounts(commandQueueReportProto); + datanodeInfo.setCommandCounts(commandQueueReportProto, + commandsToBeSent); metrics.incNumNodeCommandQueueReportProcessed(); } } catch (NodeNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 16b5ef8fba29..1e68b92e76b9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -48,7 +49,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; @@ -136,9 +139,14 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) { if (heartbeat.hasCommandQueueReport()) { LOG.debug("Dispatching Queued Command Report"); + Map cmdSummary = new HashMap<>(); + for (SCMCommand c : commands) { + cmdSummary.put(c.getType(), + cmdSummary.getOrDefault(c.getType(), 0) + 1); + } eventPublisher.fireEvent(COMMAND_QUEUE_REPORT, new CommandQueueReportFromDatanode(datanodeDetails, - heartbeat.getCommandQueueReport())); + heartbeat.getCommandQueueReport(), cmdSummary)); } if (heartbeat.hasContainerReport()) { @@ -247,9 +255,17 @@ public NodeReportFromDatanode(DatanodeDetails datanodeDetails, */ public static class CommandQueueReportFromDatanode extends ReportFromDatanode { + + private final Map commandsToBeSent; public CommandQueueReportFromDatanode(DatanodeDetails datanodeDetails, - CommandQueueReportProto report) { + CommandQueueReportProto report, + Map commandsToBeSent) { super(datanodeDetails, report); + this.commandsToBeSent = commandsToBeSent; + } + + public Map getCommandsToBeSent() { + return commandsToBeSent; } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 9083125f47cc..06c020b4dd66 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -595,10 +595,12 @@ public void processLayoutVersionReport(DatanodeDetails dnUuid, * heartbeat message. * @param datanodeDetails * @param commandReport + * @param commandsToBeSent */ @Override public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails, - CommandQueueReportProto commandReport) { + CommandQueueReportProto commandReport, + Map commandsToBeSent) { // do nothing. } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 35ce3cb8a9d5..bbd40d46c0d6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -293,7 +293,8 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails, @Override public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails, - CommandQueueReportProto commandReport) { + CommandQueueReportProto commandReport, + Map commandsToBeSent) { } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueueReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueueReportHandler.java index 63f91ed7d6a2..4c5f1cf89fbb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueueReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueueReportHandler.java @@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion; @@ -77,35 +79,51 @@ public void resetEventCollector() throws IOException { public void testQueueReportProcessed() throws NodeNotFoundException { DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); nodeManager.register(dn, null, null); + + // Add some queued commands to be sent to the DN on this heartbeat. This + // means the real queued count will be the value reported plus the commands + // sent to the DN. + Map commandsToBeSent = new HashMap<>(); + commandsToBeSent.put(SCMCommandProto.Type.valueOf(1), 2); + commandsToBeSent.put(SCMCommandProto.Type.valueOf(2), 1); + SCMDatanodeHeartbeatDispatcher.CommandQueueReportFromDatanode - commandReport = getQueueReport(dn); + commandReport = getQueueReport(dn, commandsToBeSent); commandQueueReportHandler.onMessage(commandReport, this); int commandCount = commandReport.getReport().getCommandCount(); for (int i = 0; i < commandCount; i++) { int storedCount = nodeManager.getNodeQueuedCommandCount(dn, commandReport.getReport().getCommand(i)); - Assertions.assertEquals(commandReport.getReport().getCount(i), - storedCount); + int expectedCount = commandReport.getReport().getCount(i); + // For the first two commands, we added extra commands + if (i == 0) { + expectedCount += 2; + } + if (i == 1) { + expectedCount += 1; + } + Assertions.assertEquals(expectedCount, storedCount); Assertions.assertTrue(storedCount > 0); } } private SCMDatanodeHeartbeatDispatcher.CommandQueueReportFromDatanode - getQueueReport(DatanodeDetails dn) { + getQueueReport(DatanodeDetails dn, + Map commandsToBeSent) { CommandQueueReportProto.Builder builder = CommandQueueReportProto.newBuilder(); int i = 10; for (SCMCommandProto.Type cmd : SCMCommandProto.Type.values()) { if (cmd == SCMCommandProto.Type.unknownScmCommand) { - // Do not include the unknow command type in the message. + // Do not include the unknown command type in the message. continue; } builder.addCommand(cmd); builder.addCount(i++); } return new SCMDatanodeHeartbeatDispatcher.CommandQueueReportFromDatanode( - dn, builder.build()); + dn, builder.build(), commandsToBeSent); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index a936e5652f76..87f87158e205 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.UUID; @@ -962,16 +963,36 @@ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); verify(eventPublisher, times(1)).fireEvent(NEW_NODE, node1); + Map commandsToBeSent = new HashMap<>(); + commandsToBeSent.put(SCMCommandProto.Type.replicateContainerCommand, 3); + commandsToBeSent.put(SCMCommandProto.Type.deleteBlocksCommand, 5); + nodeManager.processNodeCommandQueueReport(node1, CommandQueueReportProto.newBuilder() .addCommand(SCMCommandProto.Type.replicateContainerCommand) .addCount(123) .addCommand(SCMCommandProto.Type.closeContainerCommand) .addCount(11) - .build()); + .build(), + commandsToBeSent); assertEquals(-1, nodeManager.getNodeQueuedCommandCount( node1, SCMCommandProto.Type.closePipelineCommand)); - assertEquals(123, nodeManager.getNodeQueuedCommandCount( + assertEquals(126, nodeManager.getNodeQueuedCommandCount( + node1, SCMCommandProto.Type.replicateContainerCommand)); + assertEquals(11, nodeManager.getNodeQueuedCommandCount( + node1, SCMCommandProto.Type.closeContainerCommand)); + assertEquals(5, nodeManager.getNodeQueuedCommandCount( + node1, SCMCommandProto.Type.deleteBlocksCommand)); + + // Send another report missing an earlier entry, and ensure it is not + // still reported as a stale value. + nodeManager.processNodeCommandQueueReport(node1, + CommandQueueReportProto.newBuilder() + .addCommand(SCMCommandProto.Type.closeContainerCommand) + .addCount(11) + .build(), + Collections.emptyMap()); + assertEquals(-1, nodeManager.getNodeQueuedCommandCount( node1, SCMCommandProto.Type.replicateContainerCommand)); assertEquals(11, nodeManager.getNodeQueuedCommandCount( node1, SCMCommandProto.Type.closeContainerCommand)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 19b5d1a4b9a0..1a375e78338e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -434,7 +434,8 @@ public void processLayoutVersionReport(DatanodeDetails dnUuid, @Override public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails, - CommandQueueReportProto commandReport) { + CommandQueueReportProto commandReport, + Map commandsToBeSent) { // Do nothing. }