Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
Expand All @@ -43,6 +44,7 @@
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.IEventInfo;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

Expand All @@ -51,6 +53,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -200,13 +203,59 @@ public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
new CommandStatusReportFromDatanode(
datanodeDetails,
commandStatusReport));
// update commands
updateCommands(commands, commandStatusReport);
}
}
}

return commands;
}

private void updateCommands(List<SCMCommand> commands,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

  1. There is too much nesting which can be avoided using early returns.
  2. Use static import.
Suggested change
private void updateCommands(List<SCMCommand> commands,
private void updateCommands(List<SCMCommand> commands,
CommandStatusReportsProto commandStatusReport) {
if (commands == null) {
return;
}
List<StorageContainerDatanodeProtocolProtos.CommandStatus> cmdStatusList
= commandStatusReport.getCmdStatusList();
for (StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatus :
cmdStatusList) {
if (cmdStatus.getType() != deleteBlocksCommand ||
cmdStatus.getStatus() != EXECUTED) {
continue;
}
ContainerBlocksDeletionACKProto ackProto =
cmdStatus.getBlockDeletionAck();
List<DeleteBlockTransactionResult> results = ackProto.getResultsList();
for (SCMCommand command : commands) {
if (command.getType() != deleteBlocksCommand) {
continue;
}
DeleteBlocksCommand deleteBlocksCommand = (DeleteBlocksCommand) command;
List<DeletedBlocksTransaction> deletedBlocks =
deleteBlocksCommand.blocksTobeDeleted();
for (DeleteBlockTransactionResult result : results) {
deletedBlocks.removeIf(delete ->
delete.getTxID() == result.getTxID());
}
}
}
}

CommandStatusReportsProto commandStatusReport) {
if (commands != null) {
List<StorageContainerDatanodeProtocolProtos.CommandStatus> cmdStatusList
= commandStatusReport.getCmdStatusList();
cmdStatusList.forEach(cmdStatus -> {
if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand &&
cmdStatus.getStatus() == StorageContainerDatanodeProtocolProtos.
CommandStatus.Status.EXECUTED) {
StorageContainerDatanodeProtocolProtos.
ContainerBlocksDeletionACKProto ackProto =
cmdStatus.getBlockDeletionAck();
List<StorageContainerDatanodeProtocolProtos.
ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult>
results = ackProto.getResultsList();
for (SCMCommand command : commands) {
if (command.getType() == SCMCommandProto.Type.
deleteBlocksCommand) {
DeleteBlocksCommand deleteBlocksCommand =
(DeleteBlocksCommand) command;
List<StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction> deleteds =
deleteBlocksCommand.blocksTobeDeleted();
for (StorageContainerDatanodeProtocolProtos.
ContainerBlocksDeletionACKProto.
DeleteBlockTransactionResult result : results) {
Iterator<StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction> iterator = deleteds.iterator();
while (iterator.hasNext()) {
StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction delete = iterator.next();
if (delete.getTxID() == result.getTxID()) {
iterator.remove();
}
}
}
}
}
}
// Other commands
});
}
}

/**
* Wrapper class for events with the datanode origin.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ conf, getDatanodeAddressKey(), datanodeRpcAddr,
HddsServerUtil.addSuppressedLoggingExceptions(datanodeRpcServer);
}

@VisibleForTesting
public SCMDatanodeHeartbeatDispatcher getHeartbeatDispatcher() {
return heartbeatDispatcher;
}

public void start() {
LOG.info(
StorageContainerManager.buildRpcServerStartMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
Expand All @@ -49,6 +50,7 @@
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
Expand Down Expand Up @@ -239,6 +241,82 @@ public void testScmHeartbeat()
}
}

@Test
public void testCmdStateUpdate() throws Exception {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
DatanodeDetails datanodeDetails = registerWithCapacity(nodeManager);
// Generate DeletedBlocksTransaction set
List<StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction> dnTXs = new ArrayList<>();
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
transaction1 = StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction.newBuilder().
setContainerID(100).setCount(10).setTxID(101).build();
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
transaction2 = StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction.newBuilder().
setContainerID(110).setCount(10).setTxID(102).build();
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
transaction3 = StorageContainerDatanodeProtocolProtos.
DeletedBlocksTransaction.newBuilder().
setContainerID(120).setCount(10).setTxID(103).build();
dnTXs.add(transaction1);
dnTXs.add(transaction2);
dnTXs.add(transaction3);
SCMCommand deleteCommand = new DeleteBlocksCommand(dnTXs);
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), deleteCommand);

// Generate CommandStatusReports
StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.
DeleteBlockTransactionResult txResult =
StorageContainerDatanodeProtocolProtos.
ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult.
newBuilder().setTxID(103).setContainerID(120).
setSuccess(true).build();

List<StorageContainerDatanodeProtocolProtos.
ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult>
results = new ArrayList<>();
results.add(txResult);
StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
ackProto = StorageContainerDatanodeProtocolProtos.
ContainerBlocksDeletionACKProto.newBuilder().addAllResults(results).
setDnId(datanodeDetails.getUuidString()).build();

StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatus =
StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder().
setBlockDeletionAck(ackProto).setCmdId(deleteCommand.getId()).
setStatus(StorageContainerDatanodeProtocolProtos.
CommandStatus.Status.EXECUTED).
setType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.
Type.deleteBlocksCommand).build();
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto
commandStatusReport = StorageContainerDatanodeProtocolProtos.
CommandStatusReportsProto.newBuilder().
addCmdStatus(cmdStatus).build();

// Generate heartbeat
StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto
heartbeat = StorageContainerDatanodeProtocolProtos.
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.addCommandStatusReports(commandStatusReport).build();
SCMDatanodeHeartbeatDispatcher dispatcher = scm.
getDatanodeProtocolServer().getHeartbeatDispatcher();
List<SCMCommand> commands = dispatcher.dispatch(heartbeat);
for (SCMCommand command : commands) {
if (command instanceof DeleteBlocksCommand) {
DeleteBlocksCommand deleteBlocksCommand =
(DeleteBlocksCommand) command;
for (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
transaction: deleteBlocksCommand.blocksTobeDeleted()) {
assertTrue(transaction.getTxID() != 103);
}
}
}
}
}

/**
* Tests that node manager handles layout version changes from heartbeats
* correctly.
Expand Down