Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void onMessage(CommandQueueReportFromDatanode queueReportFromDatanode,
Preconditions.checkNotNull(dn, "QueueReport is "
+ "missing DatanodeDetails.");
nodeManager.processNodeCommandQueueReport(dn,
queueReportFromDatanode.getReport());
queueReportFromDatanode.getReport(),
queueReportFromDatanode.getCommandsToBeSent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,20 @@ public void setNodeStatus(NodeStatus newNodeStatus) {
* Set the current command counts for this datanode, as reported in the last
* heartbeat.
* @param cmds Proto message containing a list of command count pairs.
* @param commandsToBeSent Summary of commands which will be sent to the DN
* as the heartbeat is processed and should be added
* to the command count.
*/
public void setCommandCounts(CommandQueueReportProto cmds) {
public void setCommandCounts(CommandQueueReportProto cmds,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
try {
int count = cmds.getCommandCount();
Map<SCMCommandProto.Type, Integer> mutableCmds
= new HashMap<>(commandsToBeSent);
lock.writeLock().lock();
// Purge the existing counts, as each report should completely replace
// the existing counts.
commandCounts.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick question, why was this not done before and done now? I understand we would like to keep the current count and replace the stale counters. I thought it was already done by commandCounts.put(command, cmdCount) and also now with commandCounts.putAll(mutableCmds).

Copy link
Contributor Author

@sodonnel sodonnel Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow your question.

There are two counts. The ones reported from the DN on its heartbeart - these are queued on the DN.

Then there are commands queued in SCM ready to be sent to the DN as part of processing the heartbeat.

After the heartbeat, the real queue on the DN is the sum of the two (queued on DN + queued on SCM).

Before this change, we only stored the "queued on the DN count". This count is not yet used anywhere, but I missed adding the commands the heartbeat will add to the queue, so the old count is not correct.

The line commandCounts.put(command, cmdCount) should do all the updates as per the comment in the code. The line commandCounts.putAll(mutableCmds) is a "catch all" incase the DN is not reporting counts for a command that is queued on SCM - it shouldn't happen in practice. I have also removed the entry from mutableCommands after adding it to the cmdCount, so in practice mutableCommands should be empty by the time it gets to the putAll() call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation. My initial doubt was commandCounts.clear(); was added now in this patch, I was just wondering why wasn't it added before. I guess the initial thought was that the command not reported by DN should be 0, it would be cleared anyway (with 0) by the commandCounts.put(command, cmdCount).

The changes look good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea not sure why I didn't have that before. I guess we could omit it, but I should not do any harm either.

for (int i = 0; i < count; i++) {
SCMCommandProto.Type command = cmds.getCommand(i);
if (command == SCMCommandProto.Type.unknownScmCommand) {
Expand All @@ -305,8 +314,19 @@ public void setCommandCounts(CommandQueueReportProto cmds) {
"Setting it to zero", cmdCount, this);
cmdCount = 0;
}
cmdCount += mutableCmds.getOrDefault(command, 0);
// Each CommandType will be in the report once only. So we remove any
// we have seen, so we can add anything the DN has not reported but
// there is a command queued for. The DNs should return a count for all
// command types even if they have a zero count, so this is really to
// handle something being wrong on the DN where it sends a spare report.
// It really should never happen.
mutableCmds.remove(command);
commandCounts.put(command, cmdCount);
}
// Add any counts which the DN did not report. See comment above. This
// should not happen.
commandCounts.putAll(mutableCmds);
} finally {
lock.writeLock().unlock();
}
Expand All @@ -322,12 +342,7 @@ public void setCommandCounts(CommandQueueReportProto cmds) {
public int getCommandCount(SCMCommandProto.Type cmd) {
try {
lock.readLock().lock();
Integer count = commandCounts.get(cmd);
if (count == null) {
return -1;
} else {
return count.intValue();
}
return commandCounts.getOrDefault(cmd, -1);
} finally {
lock.readLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,15 @@ void processLayoutVersionReport(DatanodeDetails datanodeDetails,
/**
* Process the Command Queue Report sent from datanodes as part of the
* heartbeat message.
* @param datanodeDetails
* @param commandReport
* @param datanodeDetails DatanodeDetails the report is from
* @param commandReport Command summary report from the DN when the heartbeat
* was created.
* @param commandsToBeSent Summary of command counts that will be sent to
* the Datanode as part of the current heartbeat
*/
void processNodeCommandQueueReport(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandReport);
CommandQueueReportProto commandReport,
Map<SCMCommandProto.Type, Integer> commandsToBeSent);

/**
* Get the number of commands of the given type queued on the datanode at the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,12 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails,
*
* @param datanodeDetails
* @param commandQueueReportProto
* @param commandsToBeSent
*/
@Override
public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandQueueReportProto) {
CommandQueueReportProto commandQueueReportProto,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
LOG.debug("Processing Command Queue Report from [datanode={}]",
datanodeDetails.getHostName());
if (LOG.isTraceEnabled()) {
Expand All @@ -712,7 +714,8 @@ public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails,
try {
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
if (commandQueueReportProto != null) {
datanodeInfo.setCommandCounts(commandQueueReportProto);
datanodeInfo.setCommandCounts(commandQueueReportProto,
commandsToBeSent);
metrics.incNumNodeCommandQueueReportProcessed();
}
} catch (NodeNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
Expand All @@ -48,7 +49,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
Expand Down Expand Up @@ -136,9 +139,14 @@ public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {

if (heartbeat.hasCommandQueueReport()) {
LOG.debug("Dispatching Queued Command Report");
Map<SCMCommandProto.Type, Integer> cmdSummary = new HashMap<>();
for (SCMCommand<?> c : commands) {
cmdSummary.put(c.getType(),
cmdSummary.getOrDefault(c.getType(), 0) + 1);
}
eventPublisher.fireEvent(COMMAND_QUEUE_REPORT,
new CommandQueueReportFromDatanode(datanodeDetails,
heartbeat.getCommandQueueReport()));
heartbeat.getCommandQueueReport(), cmdSummary));
}

if (heartbeat.hasContainerReport()) {
Expand Down Expand Up @@ -247,9 +255,17 @@ public NodeReportFromDatanode(DatanodeDetails datanodeDetails,
*/
public static class CommandQueueReportFromDatanode
extends ReportFromDatanode<CommandQueueReportProto> {

private final Map<SCMCommandProto.Type, Integer> commandsToBeSent;
public CommandQueueReportFromDatanode(DatanodeDetails datanodeDetails,
CommandQueueReportProto report) {
CommandQueueReportProto report,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
super(datanodeDetails, report);
this.commandsToBeSent = commandsToBeSent;
}

public Map<SCMCommandProto.Type, Integer> getCommandsToBeSent() {
return commandsToBeSent;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,12 @@ public void processLayoutVersionReport(DatanodeDetails dnUuid,
* heartbeat message.
* @param datanodeDetails
* @param commandReport
* @param commandsToBeSent
*/
@Override
public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandReport) {
CommandQueueReportProto commandReport,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
// do nothing.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails,

@Override
public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandReport) {
CommandQueueReportProto commandReport,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;

Expand Down Expand Up @@ -77,35 +79,51 @@ public void resetEventCollector() throws IOException {
public void testQueueReportProcessed() throws NodeNotFoundException {
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
nodeManager.register(dn, null, null);

// Add some queued commands to be sent to the DN on this heartbeat. This
// means the real queued count will be the value reported plus the commands
// sent to the DN.
Map<SCMCommandProto.Type, Integer> commandsToBeSent = new HashMap<>();
commandsToBeSent.put(SCMCommandProto.Type.valueOf(1), 2);
commandsToBeSent.put(SCMCommandProto.Type.valueOf(2), 1);

SCMDatanodeHeartbeatDispatcher.CommandQueueReportFromDatanode
commandReport = getQueueReport(dn);
commandReport = getQueueReport(dn, commandsToBeSent);
commandQueueReportHandler.onMessage(commandReport, this);

int commandCount = commandReport.getReport().getCommandCount();
for (int i = 0; i < commandCount; i++) {
int storedCount = nodeManager.getNodeQueuedCommandCount(dn,
commandReport.getReport().getCommand(i));
Assertions.assertEquals(commandReport.getReport().getCount(i),
storedCount);
int expectedCount = commandReport.getReport().getCount(i);
// For the first two commands, we added extra commands
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Stephen! Just for my self-learning, why the first two commands we add extra commands~? Thanks~ 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to test the new part of the code - that the queued commands in the list are added to the totals in the report count. I did not add extra commands to all of them, so I could see the others work ok when there isn't extra commands.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! thanks Stephen!

if (i == 0) {
expectedCount += 2;
}
if (i == 1) {
expectedCount += 1;
}
Assertions.assertEquals(expectedCount, storedCount);
Assertions.assertTrue(storedCount > 0);
}
}

private SCMDatanodeHeartbeatDispatcher.CommandQueueReportFromDatanode
getQueueReport(DatanodeDetails dn) {
getQueueReport(DatanodeDetails dn,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
CommandQueueReportProto.Builder builder =
CommandQueueReportProto.newBuilder();
int i = 10;
for (SCMCommandProto.Type cmd : SCMCommandProto.Type.values()) {
if (cmd == SCMCommandProto.Type.unknownScmCommand) {
// Do not include the unknow command type in the message.
// Do not include the unknown command type in the message.
continue;
}
builder.addCommand(cmd);
builder.addCount(i++);
}
return new SCMDatanodeHeartbeatDispatcher.CommandQueueReportFromDatanode(
dn, builder.build());
dn, builder.build(), commandsToBeSent);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -962,16 +963,36 @@ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
HddsTestUtils.createRandomDatanodeAndRegister(nodeManager);
verify(eventPublisher,
times(1)).fireEvent(NEW_NODE, node1);
Map<SCMCommandProto.Type, Integer> commandsToBeSent = new HashMap<>();
commandsToBeSent.put(SCMCommandProto.Type.replicateContainerCommand, 3);
commandsToBeSent.put(SCMCommandProto.Type.deleteBlocksCommand, 5);

nodeManager.processNodeCommandQueueReport(node1,
CommandQueueReportProto.newBuilder()
.addCommand(SCMCommandProto.Type.replicateContainerCommand)
.addCount(123)
.addCommand(SCMCommandProto.Type.closeContainerCommand)
.addCount(11)
.build());
.build(),
commandsToBeSent);
assertEquals(-1, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.closePipelineCommand));
assertEquals(123, nodeManager.getNodeQueuedCommandCount(
assertEquals(126, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.replicateContainerCommand));
assertEquals(11, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.closeContainerCommand));
assertEquals(5, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.deleteBlocksCommand));

// Send another report missing an earlier entry, and ensure it is not
// still reported as a stale value.
nodeManager.processNodeCommandQueueReport(node1,
CommandQueueReportProto.newBuilder()
.addCommand(SCMCommandProto.Type.closeContainerCommand)
.addCount(11)
.build(),
Collections.emptyMap());
assertEquals(-1, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.replicateContainerCommand));
assertEquals(11, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.closeContainerCommand));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ public void processLayoutVersionReport(DatanodeDetails dnUuid,

@Override
public void processNodeCommandQueueReport(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandReport) {
CommandQueueReportProto commandReport,
Map<SCMCommandProto.Type, Integer> commandsToBeSent) {
// Do nothing.
}

Expand Down