diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index bc995854a8f1..91cfaa5a21a0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -102,7 +102,7 @@ public class StateContext { static final Logger LOG = LoggerFactory.getLogger(StateContext.class); - private final Queue commandQueue; + private final Queue> commandQueue; private final Map cmdStatusMap; private final Lock lock; private final DatanodeStateMachine parentDatanodeStateMachine; @@ -738,7 +738,7 @@ public OptionalLong getTermOfLeaderSCM() { * * @return SCMCommand or Null. */ - public SCMCommand getNextCommand() { + public SCMCommand getNextCommand() { lock.lock(); try { initTermOfLeaderSCM(); @@ -772,7 +772,7 @@ public SCMCommand getNextCommand() { * * @param command - SCMCommand. */ - public void addCommand(SCMCommand command) { + public void addCommand(SCMCommand command) { lock.lock(); try { if (commandQueue.size() >= maxCommandQueueLimit) { @@ -792,7 +792,7 @@ public Map getCommandQueueSummary() { Map summary = new HashMap<>(); lock.lock(); try { - for (SCMCommand cmd : commandQueue) { + for (SCMCommand cmd : commandQueue) { summary.put(cmd.getType(), summary.getOrDefault(cmd.getType(), 0) + 1); } } finally { @@ -832,7 +832,7 @@ public void addCmdStatus(Long key, CommandStatus status) { * * @param cmd - {@link SCMCommand}. */ - public void addCmdStatus(SCMCommand cmd) { + public void addCmdStatus(SCMCommand cmd) { if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) { addCmdStatus(cmd.getId(), DeleteBlockCommandStatusBuilder.newBuilder() diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 700303ee0c90..1a1594cf8a99 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -84,7 +84,7 @@ public CloseContainerCommandHandler( * @param connectionManager - The SCMs that we are talking to. */ @Override - public void handle(SCMCommand command, OzoneContainer ozoneContainer, + public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { queuedCount.incrementAndGet(); CompletableFuture.runAsync(() -> { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index 8fcd192fe53e..5cbe47268970 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -93,7 +93,7 @@ public ClosePipelineCommandHandler( * @param connectionManager - The SCMs that we are talking to. */ @Override - public void handle(SCMCommand command, OzoneContainer ozoneContainer, + public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { queuedCount.incrementAndGet(); CompletableFuture.runAsync(() -> { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 69a40e1f1ad2..696b04defe36 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -85,7 +85,7 @@ public CommandHandler getDeleteBlocksCommandHandler() { * * @param command - SCM Command. */ - public void handle(SCMCommand command) { + public void handle(SCMCommand command) { Preconditions.checkNotNull(command); CommandHandler handler = handlerMap.get(command.getType()); if (handler != null) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 68ab8087d6ba..d516977838e5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -38,7 +38,7 @@ public interface CommandHandler { * @param context - Current Context. * @param connectionManager - The SCMs that we are talking to. */ - void handle(SCMCommand command, OzoneContainer container, + void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager); /** @@ -68,7 +68,7 @@ void handle(SCMCommand command, OzoneContainer container, /** * Default implementation for updating command status. */ - default void updateCommandStatus(StateContext context, SCMCommand command, + default void updateCommandStatus(StateContext context, SCMCommand command, Consumer cmdStatusUpdater, Logger log) { if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) { log.warn("{} with Id:{} not found.", command.getType(), diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index d86c0287516d..30ffe7ed4159 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -90,7 +90,7 @@ public CreatePipelineCommandHandler(ConfigurationSource conf, * @param connectionManager - The SCMs that we are talking to. */ @Override - public void handle(SCMCommand command, OzoneContainer ozoneContainer, + public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { queuedCount.incrementAndGet(); CompletableFuture.runAsync(() -> { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 80c078c5087d..71277c063770 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -123,7 +123,7 @@ public DeleteBlocksCommandHandler(OzoneContainer container, } @Override - public void handle(SCMCommand command, OzoneContainer container, + public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { LOG.warn("Skipping handling command, expected command " diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index 1d23da794a16..ae036a1c8f82 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -76,7 +76,7 @@ protected DeleteContainerCommandHandler(Clock clock, this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms"); } @Override - public void handle(final SCMCommand command, + public void handle(final SCMCommand command, final OzoneContainer ozoneContainer, final StateContext context, final SCMConnectionManager connectionManager) { @@ -93,7 +93,7 @@ public void handle(final SCMCommand command, } } - private void handleInternal(SCMCommand command, StateContext context, + private void handleInternal(SCMCommand command, StateContext context, DeleteContainerCommand deleteContainerCommand, ContainerController controller) { final long startTime = Time.monotonicNow(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java index 6e1c566343da..a27b94b76a3e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java @@ -63,7 +63,7 @@ public FinalizeNewLayoutVersionCommandHandler() { * @param connectionManager - The SCMs that we are talking to. */ @Override - public void handle(SCMCommand command, OzoneContainer ozoneContainer, + public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command."); invocationCount.incrementAndGet(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 4366a912188d..b2159aa44f7a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -47,7 +47,7 @@ public ReconstructECContainersCommandHandler(ConfigurationSource conf, } @Override - public void handle(SCMCommand command, OzoneContainer container, + public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { ReconstructECContainersCommand ecContainersCommand = (ReconstructECContainersCommand) command; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java index bc8b69a50ae9..f26329792b08 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java @@ -48,7 +48,7 @@ public RefreshVolumeUsageCommandHandler() { } @Override - public void handle(SCMCommand command, OzoneContainer container, + public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { LOG.info("receive command to refresh usage info of all volumes"); invocationCount.incrementAndGet(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index d52c51e298e9..17bb10fc7eaf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -65,7 +65,7 @@ public String getMetricsName() { } @Override - public void handle(SCMCommand command, OzoneContainer container, + public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { final ReplicateContainerCommand replicateCommand = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java index 548a54917430..25a158bb45d4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java @@ -76,7 +76,7 @@ public SetNodeOperationalStateCommandHandler(ConfigurationSource conf, * @param connectionManager - The SCMs that we are talking to. */ @Override - public void handle(SCMCommand command, OzoneContainer container, + public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { long startTime = Time.monotonicNow(); invocationCount.incrementAndGet(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index 2d0ed82d902a..ffe5a40fb498 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -69,7 +69,7 @@ RegisteredCommand register(DatanodeDetails datanodeDetails, * @param datanodeDetails - Datanode ID. * @return Commands to be sent to the datanode. */ - default List processHeartbeat(DatanodeDetails datanodeDetails) { + default List> processHeartbeat(DatanodeDetails datanodeDetails) { return processHeartbeat(datanodeDetails, null); }; @@ -80,7 +80,7 @@ default List processHeartbeat(DatanodeDetails datanodeDetails) { * heartbeating datanode. * @return Commands to be sent to the datanode. */ - List processHeartbeat(DatanodeDetails datanodeDetails, + List> processHeartbeat(DatanodeDetails datanodeDetails, CommandQueueReportProto queueReport); /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java index bbcf498ec565..bf4f0b92fdd6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java @@ -40,7 +40,7 @@ public enum HealthState { private final ContainerInfo containerInfo; private final HealthState healthState; - private final List commands = new ArrayList<>(); + private final List> commands = new ArrayList<>(); public ContainerHealthResult(ContainerInfo containerInfo, HealthState healthState) { @@ -52,11 +52,11 @@ public HealthState getHealthState() { return healthState; } - public void addCommand(SCMCommand command) { + public void addCommand(SCMCommand command) { commands.add(command); } - public List getCommands() { + public List> getCommands() { return commands; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java index 568328210c7d..f122215105f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java @@ -27,7 +27,6 @@ import java.util.UUID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; /** * Command Queue is queue of commands for the datanode. @@ -52,8 +51,6 @@ public long getCommandsInQueue() { /** * Constructs a Command Queue. - * TODO : Add a flusher thread that throws away commands older than a certain - * time period. */ public CommandQueue() { commandMap = new HashMap<>(); @@ -78,9 +75,9 @@ public void clear() { * @return List of SCM Commands. */ @SuppressWarnings("unchecked") - List getCommand(final UUID datanodeUuid) { + List> getCommand(final UUID datanodeUuid) { Commands cmds = commandMap.remove(datanodeUuid); - List cmdList = null; + List> cmdList = null; if (cmds != null) { cmdList = cmds.getCommands(); commandsInQueue -= !cmdList.isEmpty() ? cmdList.size() : 0; @@ -134,8 +131,7 @@ public Map getDatanodeCommandSummary( * @param datanodeUuid DatanodeDetails.Uuid * @param command - Command */ - public void addCommand(final UUID datanodeUuid, final SCMCommand - command) { + public void addCommand(final UUID datanodeUuid, final SCMCommand command) { commandMap.computeIfAbsent(datanodeUuid, s -> new Commands()).add(command); commandsInQueue++; } @@ -144,39 +140,20 @@ public void addCommand(final UUID datanodeUuid, final SCMCommand * Class that stores commands for a datanode. */ private static class Commands { - private long updateTime = 0; - private long readTime = 0; - private List commands = new ArrayList<>(); + private List> commands = new ArrayList<>(); private final Map summary = new HashMap<>(); - /** - * Gets the last time the commands for this node was updated. - * @return Time stamp - */ - public long getUpdateTime() { - return updateTime; - } - - /** - * Gets the last read time. - * @return last time when these commands were read from this queue. - */ - public long getReadTime() { - return readTime; - } - /** * Adds a command to the list. * * @param command SCMCommand */ - public void add(SCMCommand command) { + public void add(SCMCommand command) { this.commands.add(command); if (command.contributesToQueueSize()) { summary.put(command.getType(), summary.getOrDefault(command.getType(), 0) + 1); } - updateTime = Time.monotonicNow(); } public int getCommandSummary(SCMCommandProto.Type commandType) { @@ -191,11 +168,10 @@ public Map getAllCommandsSummary() { * Returns the commands for this datanode. * @return command list. */ - public List getCommands() { - List temp = this.commands; + public List> getCommands() { + List> temp = this.commands; this.commands = new ArrayList<>(); summary.clear(); - readTime = Time.monotonicNow(); return temp; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 20dc5aea7861..f582623b8c12 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -98,7 +98,7 @@ public void onMessage(final DatanodeDetails datanodeDetails, } // remove commands in command queue for the DN - final List cmdList = nodeManager.getCommandQueue( + final List> cmdList = nodeManager.getCommandQueue( datanodeDetails.getUuid()); LOG.info("Clearing command queue of size {} for DN {}", cmdList.size(), datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e6a74b395f77..275665ec38ca 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -282,7 +282,7 @@ Set getContainers(DatanodeDetails datanodeDetails) * @param dnId datanode uuid * @param command */ - void addDatanodeCommand(UUID dnId, SCMCommand command); + void addDatanodeCommand(UUID dnId, SCMCommand command); /** @@ -368,7 +368,7 @@ Map getTotalDatanodeCommandCounts( * @return list of commands */ // TODO: We can give better name to this method! - List getCommandQueue(UUID dnID); + List> getCommandQueue(UUID dnID); /** * Given datanode uuid, returns the DatanodeDetails for the node. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 43d13e4ae6de..ee6ad2b33801 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -533,7 +533,7 @@ private boolean isVersionChange(String oldVersion, String newVersion) { * @return SCMheartbeat response. */ @Override - public List processHeartbeat(DatanodeDetails datanodeDetails, + public List> processHeartbeat(DatanodeDetails datanodeDetails, CommandQueueReportProto queueReport) { Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " + "DatanodeDetails."); @@ -550,7 +550,7 @@ public List processHeartbeat(DatanodeDetails datanodeDetails, try { Map summary = commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid()); - List commands = + List> commands = commandQueue.getCommand(datanodeDetails.getUuid()); // Update the SCMCommand of deleteBlocksCommand Status @@ -1635,7 +1635,7 @@ public int getPipeLineCount(DatanodeDetails datanodeDetails) } @Override - public void addDatanodeCommand(UUID dnId, SCMCommand command) { + public void addDatanodeCommand(UUID dnId, SCMCommand command) { writeLock().lock(); try { this.commandQueue.addCommand(dnId, command); @@ -1678,7 +1678,7 @@ public void onMessage(CommandForDatanode commandForDatanode, } @Override - public List getCommandQueue(UUID dnID) { + public List> getCommandQueue(UUID dnID) { // Getting the queue actually clears it and returns the commands, so this // is a write operation and not a read as the method name suggests. writeLock().lock(); @@ -1846,7 +1846,7 @@ public void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExcep } nodeStateManager.removeNode(datanodeDetails); removeFromDnsToUuidMap(datanodeDetails.getUuid(), datanodeDetails.getIpAddress()); - final List cmdList = getCommandQueue(datanodeDetails.getUuid()); + final List> cmdList = getCommandQueue(datanodeDetails.getUuid()); LOG.info("Clearing command queue of size {} for DN {}", cmdList.size(), datanodeDetails); } else { LOG.warn("Node not decommissioned or dead, cannot remove: {}", datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index ddc87da038e4..58d2a8164ea3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -82,10 +82,10 @@ public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager, * * @return list of SCMCommand */ - public List dispatch(SCMHeartbeatRequestProto heartbeat) { + public List> dispatch(SCMHeartbeatRequestProto heartbeat) { DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails()); - List commands; + List> commands; // If node is not registered, ask the node to re-register. Do not process // Heartbeat for unregistered nodes. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 4a549afa7730..50b6d25f079b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -302,7 +302,7 @@ private String constructCommandAuditMap(List cmds) { public SCMHeartbeatResponseProto sendHeartbeat( SCMHeartbeatRequestProto heartbeat) throws IOException, TimeoutException { List cmdResponses = new ArrayList<>(); - for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { + for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { cmdResponses.add(getCommandResponse(cmd, scm)); } final OptionalLong term = getTermIfLeader(); @@ -352,7 +352,7 @@ private OptionalLong getTermIfLeader() { * @throws IOException */ @VisibleForTesting - public static SCMCommandProto getCommandResponse(SCMCommand cmd, + public static SCMCommandProto getCommandResponse(SCMCommand cmd, OzoneStorageContainerManager scm) throws IOException, TimeoutException { SCMCommandProto.Builder builder = SCMCommandProto.newBuilder() .setEncodedToken(cmd.getEncodedToken()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 244f86e79540..a0d39b0f01c3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -102,7 +102,7 @@ public class MockNodeManager implements NodeManager { private final List deadNodes; private final Map nodeMetricMap; private final SCMNodeStat aggregateStat; - private final Map> commandMap; + private final Map>> commandMap; private Node2PipelineMap node2PipelineMap; private final Node2ContainerMap node2ContainerMap; private NetworkTopology clusterMap; @@ -533,13 +533,13 @@ public void removeContainer(DatanodeDetails dd, } @Override - public void addDatanodeCommand(UUID dnId, SCMCommand command) { + public void addDatanodeCommand(UUID dnId, SCMCommand command) { if (commandMap.containsKey(dnId)) { - List commandList = commandMap.get(dnId); + List> commandList = commandMap.get(dnId); Preconditions.checkNotNull(commandList); commandList.add(command); } else { - List commandList = new LinkedList<>(); + List> commandList = new LinkedList<>(); commandList.add(command); commandMap.put(dnId, commandList); } @@ -656,7 +656,7 @@ public Set getContainers(DatanodeDetails uuid) { // Returns the number of commands that is queued to this node manager. public int getCommandCount(DatanodeDetails dd) { - List list = commandMap.get(dd.getUuid()); + List> list = commandMap.get(dd.getUuid()); return (list == null) ? 0 : list.size(); } @@ -760,7 +760,7 @@ private synchronized void addEntryTodnsToUuidMap( * @return SCMheartbeat response list */ @Override - public List processHeartbeat(DatanodeDetails datanodeDetails, + public List> processHeartbeat(DatanodeDetails datanodeDetails, CommandQueueReportProto commandQueueReportProto) { return null; } @@ -847,7 +847,7 @@ public void onMessage(CommandForDatanode commandForDatanode, } @Override - public List getCommandQueue(UUID dnID) { + public List> getCommandQueue(UUID dnID) { return null; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index ea1054784d09..085a28244404 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -269,7 +269,7 @@ public void removeContainer(DatanodeDetails datanodeDetails, } @Override - public void addDatanodeCommand(UUID dnId, SCMCommand command) { + public void addDatanodeCommand(UUID dnId, SCMCommand command) { } /** @@ -341,7 +341,7 @@ public Map getTotalDatanodeCommandCounts( } @Override - public List getCommandQueue(UUID dnID) { + public List> getCommandQueue(UUID dnID) { return null; } @@ -426,7 +426,7 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, } @Override - public List processHeartbeat(DatanodeDetails datanodeDetails, + public List> processHeartbeat(DatanodeDetails datanodeDetails, CommandQueueReportProto commandQueueReportProto) { return null; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 6e37d3de262c..ee7c192d972a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -1431,7 +1431,7 @@ public void testSendLowPriorityReplicateContainerCommand() replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo, 0, src, target, scmDeadline); - ArgumentCaptor command = + ArgumentCaptor> command = ArgumentCaptor.forClass(SCMCommand.class); ArgumentCaptor targetUUID = ArgumentCaptor.forClass(UUID.class); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java index a917bc022510..d47a37f26317 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java @@ -101,7 +101,7 @@ public void testSummaryUpdated() { datanode2UUID, SCMCommandProto.Type.createPipelineCommand)); // Ensure the counts are cleared when the commands are retrieved - List cmds = commandQueue.getCommand(datanode1UUID); + List> cmds = commandQueue.getCommand(datanode1UUID); assertEquals(5, cmds.size()); assertEquals(0, commandQueue.getDatanodeCommandCount( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 25802ddb8131..cb2315f7fd56 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -644,7 +644,7 @@ public void testSetNodeOpStateAndCommandFired() // If found mismatch, leader SCM fires a SetNodeOperationalStateCommand // to update the opState persisted in Datanode. scm.getScmContext().updateLeaderAndTerm(true, 1); - List commands = nodeManager.processHeartbeat(dn); + List> commands = nodeManager.processHeartbeat(dn); assertEquals(SetNodeOperationalStateCommand.class, commands.get(0).getClass()); @@ -1763,7 +1763,7 @@ public void testHandlingSCMCommandEvent() PipelineID.randomId()))); eq.processAll(1000L); - List command = + List> command = nodemanager.processHeartbeat(datanodeDetails); // With dh registered, SCM will send create pipeline command to dn assertThat(command.size()).isGreaterThanOrEqualTo(1); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 6eb7d7c943f5..48508891f6d0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -372,7 +372,7 @@ public RegisteredCommand register(DatanodeDetails dd, * @return SCMheartbeat response list */ @Override - public List processHeartbeat(DatanodeDetails dd, + public List> processHeartbeat(DatanodeDetails dd, CommandQueueReportProto commandQueueReportProto) { return null; } @@ -401,7 +401,7 @@ public void addNode(DatanodeDetails id, NodeStatus status) { } @Override - public void addDatanodeCommand(UUID dnId, SCMCommand command) { + public void addDatanodeCommand(UUID dnId, SCMCommand command) { this.commandQueue.addCommand(dnId, command); } @@ -491,7 +491,7 @@ public void onMessage(CommandForDatanode commandForDatanode, } @Override - public List getCommandQueue(UUID dnID) { + public List> getCommandQueue(UUID dnID) { return null; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java index 70e7fb5e5bda..23c7bf3930e8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java @@ -454,10 +454,10 @@ public void testBlockDeletingThrottling() throws Exception { GenericTestUtils.waitFor(() -> { NodeManager nodeManager = cluster.getStorageContainerManager() .getScmNodeManager(); - List commands = nodeManager.processHeartbeat( + List> commands = nodeManager.processHeartbeat( nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0)); if (commands != null) { - for (SCMCommand cmd : commands) { + for (SCMCommand cmd : commands) { if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) { List deletedTXs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java index d9d1fd5b4c48..fbbb58a124e3 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java @@ -222,9 +222,9 @@ public void onMessage(CommandForDatanode commandForDatanode, * @return SCMheartbeat response. */ @Override - public List processHeartbeat(DatanodeDetails datanodeDetails, + public List> processHeartbeat(DatanodeDetails datanodeDetails, CommandQueueReportProto queueReport) { - List cmds = new ArrayList<>(); + List> cmds = new ArrayList<>(); long currentTime = Time.now(); if (needUpdate(datanodeDetails, currentTime)) { cmds.add(new ReregisterCommand()); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java index e484295feb4d..9fa743407681 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java @@ -174,7 +174,7 @@ public void testReconNodeDB() throws IOException, NodeNotFoundException { .getOpStateExpiryEpochSeconds()); // Upon processing the heartbeat, the illegal command should be filtered out - List returnedCmds = + List> returnedCmds = reconNodeManager.processHeartbeat(datanodeDetails); assertEquals(1, returnedCmds.size()); assertEquals(SCMCommandProto.Type.reregisterCommand, @@ -272,7 +272,7 @@ public void testDatanodeUpdate() throws IOException { datanodeDetails.setHostName("hostname2"); // Upon processing the heartbeat, the illegal command should be filtered out - List returnedCmds = + List> returnedCmds = reconNodeManager.processHeartbeat(datanodeDetails); assertEquals(1, returnedCmds.size()); assertEquals(SCMCommandProto.Type.reregisterCommand,