Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.protocol.commands;

import com.google.protobuf.Message;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
Expand All @@ -40,9 +39,8 @@ public CommandForDatanode(DatanodeID datanodeId, SCMCommand<T> command) {
this.command = command;
}

@Deprecated
public UUID getDatanodeId() {
return datanodeId.getUuid();
public DatanodeID getDatanodeId() {
return datanodeId;
}

public SCMCommand<T> getCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public void sendDatanodeCommand(SCMCommand<?> command,
scmDeadlineEpochMs);
command.setTerm(getScmTerm());
command.setDeadline(datanodeDeadline);
nodeManager.addDatanodeCommand(target.getUuid(), command);
nodeManager.addDatanodeCommand(target.getID(), command);
adjustPendingOpsAndMetrics(containerInfo, command, target,
scmDeadlineEpochMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,10 @@ Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
throws NodeNotFoundException;

/**
* Add a {@link SCMCommand} to the command queue, which are
* handled by HB thread asynchronously.
* @param dnId datanode uuid
* @param command
* Add a {@link SCMCommand} to the command queue of the given datanode.
* The command will be handled by the HB thread asynchronously.
*/
void addDatanodeCommand(UUID dnId, SCMCommand<?> command);

void addDatanodeCommand(DatanodeID datanodeID, SCMCommand<?> command);

/**
* send refresh command to all the healthy datanodes to refresh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ protected void updateDatanodeOpState(DatanodeDetails reportedDn)
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
command.setTerm(scmContext.getTermOfLeader());
addDatanodeCommand(reportedDn.getUuid(), command);
addDatanodeCommand(reportedDn.getID(), command);
} catch (NotLeaderException nle) {
LOG.warn("Skip sending SetNodeOperationalStateCommand,"
+ " since current SCM is not leader.", nle);
Expand Down Expand Up @@ -1646,7 +1646,8 @@ public int getPipeLineCount(DatanodeDetails datanodeDetails)
}

@Override
public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
public void addDatanodeCommand(DatanodeID datanodeID, SCMCommand<?> command) {
final UUID dnId = datanodeID.getUuid();
writeLock().lock();
try {
this.commandQueue.addCommand(dnId, command);
Expand All @@ -1671,7 +1672,7 @@ public void refreshAllHealthyDnUsageInfo() {
return;
}
getNodes(IN_SERVICE, HEALTHY).forEach(datanode ->
addDatanodeCommand(datanode.getUuid(), refreshVolumeUsageCommand));
addDatanodeCommand(datanode.getID(), refreshVolumeUsageCommand));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public List<SCMCommand<?>> dispatch(SCMHeartbeatRequestProto heartbeat) {
LOG.info("SCM received heartbeat from an unregistered datanode {}. " +
"Asking datanode to re-register.", datanodeDetails);
UUID dnID = datanodeDetails.getUuid();
nodeManager.addDatanodeCommand(dnID, new ReregisterCommand());
nodeManager.addDatanodeCommand(datanodeDetails.getID(), new ReregisterCommand());

commands = nodeManager.getCommandQueue(dnID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
import java.time.Clock;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
Expand Down Expand Up @@ -120,13 +119,13 @@ public void testCall() throws Exception {
verify(eventPublisher, times(3)).fireEvent(
eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture());
List<CommandForDatanode> actualCommands = argumentCaptor.getAllValues();
List<UUID> actualDnIds = actualCommands.stream()
final Set<DatanodeID> actualDnIds = actualCommands.stream()
.map(CommandForDatanode::getDatanodeId)
.collect(Collectors.toList());
Set<UUID> expectedDnIdsSet = datanodeDetails.stream()
.map(DatanodeDetails::getUuid).collect(Collectors.toSet());
.collect(Collectors.toSet());
final Set<DatanodeID> expectedDnIdsSet = datanodeDetails.stream()
.map(DatanodeDetails::getID).collect(Collectors.toSet());

assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds));
assertEquals(expectedDnIdsSet, actualDnIds);
assertEquals(datanodeDetails.size(),
metrics.getNumBlockDeletionCommandSent());
// Echo Command has one Transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ public void removeContainer(DatanodeDetails dd,
}

@Override
public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
public void addDatanodeCommand(DatanodeID datanodeID, SCMCommand<?> command) {
final UUID dnId = datanodeID.getUuid();
if (commandMap.containsKey(dnId)) {
List<SCMCommand<?>> commandList = commandMap.get(dnId);
Preconditions.checkNotNull(commandList);
Expand Down Expand Up @@ -819,7 +820,7 @@ public void delContainer(DatanodeDetails datanodeDetails, long size) {
@Override
public void onMessage(CommandForDatanode commandForDatanode,
EventPublisher publisher) {
addDatanodeCommand(commandForDatanode.getDatanodeId(),
this.addDatanodeCommand(commandForDatanode.getDatanodeId(),
commandForDatanode.getCommand());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void removeContainer(DatanodeDetails datanodeDetails,
}

@Override
public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
public void addDatanodeCommand(DatanodeID datanodeID, SCMCommand<?> command) {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand Down Expand Up @@ -201,10 +201,10 @@ private void closeContainerForValidContainer(ReplicationConfig repConfig,
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());

List<CommandForDatanode> cmds = commandCaptor.getAllValues();
Set<UUID> pipelineDNs = pipeline
final Set<DatanodeID> pipelineDNs = pipeline
.getNodes()
.stream()
.map(d -> d.getUuid())
.map(DatanodeDetails::getID)
.collect(Collectors.toSet());
for (CommandForDatanode c : cmds) {
assertThat(pipelineDNs).contains(c.getDatanodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -132,7 +131,7 @@ public class TestReplicationManager {
private ReplicationConfig repConfig;
private ReplicationManagerReport repReport;
private ReplicationQueue repQueue;
private Set<Pair<UUID, SCMCommand<?>>> commandsSent;
private Set<Pair<DatanodeID, SCMCommand<?>>> commandsSent;

@BeforeEach
public void setup() throws IOException {
Expand Down Expand Up @@ -396,7 +395,7 @@ public void testQuasiClosedContainerWithExcessUnhealthyReplica()
handler.processAndSendCommands(replicas, Collections.emptyList(),
repQueue.dequeueOverReplicatedContainer(), 2);
assertTrue(commandsSent.iterator().hasNext());
assertEquals(unhealthy.getDatanodeDetails().getUuid(),
assertEquals(unhealthy.getDatanodeDetails().getID(),
commandsSent.iterator().next().getKey());
assertEquals(SCMCommandProto.Type.deleteContainerCommand,
commandsSent.iterator().next().getValue().getType());
Expand Down Expand Up @@ -874,12 +873,12 @@ public void testUnderReplicationBlockedByUnhealthyReplicas()
// a delete command should also have been sent for UNHEALTHY replica of
// index 1
assertEquals(1, commandsSent.size());
Pair<UUID, SCMCommand<?>> command = commandsSent.iterator().next();
Pair<DatanodeID, SCMCommand<?>> command = commandsSent.iterator().next();
assertEquals(SCMCommandProto.Type.deleteContainerCommand,
command.getValue().getType());
DeleteContainerCommand deleteCommand =
(DeleteContainerCommand) command.getValue();
assertEquals(unhealthyReplica1.getDatanodeDetails().getUuid(),
assertEquals(unhealthyReplica1.getDatanodeDetails().getID(),
command.getKey());
assertEquals(container.containerID(),
ContainerID.valueOf(deleteCommand.getContainerID()));
Expand Down Expand Up @@ -1318,15 +1317,14 @@ public void testSendLowPriorityReplicateContainerCommand()

ArgumentCaptor<SCMCommand<?>> command =
ArgumentCaptor.forClass(SCMCommand.class);
ArgumentCaptor<UUID> targetUUID =
ArgumentCaptor.forClass(UUID.class);
ArgumentCaptor<DatanodeID> targetUUID = ArgumentCaptor.forClass(DatanodeID.class);
verify(nodeManager).addDatanodeCommand(targetUUID.capture(), command.capture());

ReplicateContainerCommand sentCommand =
(ReplicateContainerCommand)command.getValue();
assertEquals(datanodeDeadline, sentCommand.getDeadline());
assertEquals(LOW, sentCommand.getPriority());
assertEquals(src.getUuid(), targetUUID.getValue());
assertEquals(src.getID(), targetUUID.getValue());
assertEquals(target, sentCommand.getTargetDatanode());
}

Expand Down Expand Up @@ -1381,8 +1379,8 @@ private void testReplicationCommand(
container, new ArrayList<>(sourceNodes), destination, replicaIndex);

assertEquals(1, commandsSent.size());
Pair<UUID, SCMCommand<?>> cmdWithTarget = commandsSent.iterator().next();
assertEquals(expectedTarget.getUuid(), cmdWithTarget.getLeft());
Pair<DatanodeID, SCMCommand<?>> cmdWithTarget = commandsSent.iterator().next();
assertEquals(expectedTarget.getID(), cmdWithTarget.getLeft());
assertEquals(ReplicateContainerCommand.class,
cmdWithTarget.getRight().getClass());
ReplicateContainerCommand cmd =
Expand Down Expand Up @@ -1443,8 +1441,8 @@ public void testSendThrottledReconstructionCommand()
replicationManager.sendThrottledReconstructionCommand(container, command);

assertEquals(1, commandsSent.size());
Pair<UUID, SCMCommand<?>> cmd = commandsSent.iterator().next();
assertEquals(cmdTarget.getUuid(), cmd.getLeft());
Pair<DatanodeID, SCMCommand<?>> cmd = commandsSent.iterator().next();
assertEquals(cmdTarget.getID(), cmd.getLeft());
assertEquals(0, replicationManager.getMetrics()
.getEcReconstructionCmdsDeferredTotal());
}
Expand Down Expand Up @@ -1647,9 +1645,9 @@ public void testPendingOpExpiry() throws ContainerNotFoundException {

replicationManager.opCompleted(delOp, ContainerID.valueOf(1L), true);
assertEquals(1, commandsSent.size());
Pair<UUID, SCMCommand<?>> sentCommand = commandsSent.iterator().next();
Pair<DatanodeID, SCMCommand<?>> sentCommand = commandsSent.iterator().next();
// The target should be DN2 and the deadline should have been updated from the value set in commandDeadline above
assertEquals(dn2.getUuid(), sentCommand.getLeft());
assertEquals(dn2.getID(), sentCommand.getLeft());
assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand Down Expand Up @@ -101,7 +100,7 @@ public class TestReplicationManagerScenarios {
private Map<ContainerID, Set<ContainerReplica>> containerReplicaMap;
private Set<ContainerInfo> containerInfoSet;
private ContainerReplicaPendingOps containerReplicaPendingOps;
private Set<Pair<UUID, SCMCommand<?>>> commandsSent;
private Set<Pair<DatanodeID, SCMCommand<?>>> commandsSent;

private OzoneConfiguration configuration;
private ReplicationManager replicationManager;
Expand Down Expand Up @@ -332,11 +331,11 @@ private void assertExpectedCommands(Scenario scenario,
// datanodes.
for (ExpectedCommands expectedCommand : expectedCommands) {
boolean found = false;
for (Pair<UUID, SCMCommand<?>> command : commandsSent) {
for (Pair<DatanodeID, SCMCommand<?>> command : commandsSent) {
if (command.getRight().getType() == expectedCommand.getType()) {
if (expectedCommand.hasExpectedDatanode()) {
// We need to assert against the command the datanode is sent to
DatanodeDetails commandDatanode = findDatanodeFromUUID(command.getKey());
DatanodeDetails commandDatanode = findDatanode(command.getKey());
if (commandDatanode != null && expectedCommand.isTargetExpected(commandDatanode)) {
found = true;
commandsSent.remove(command);
Expand All @@ -354,9 +353,9 @@ private void assertExpectedCommands(Scenario scenario,
}
}

private DatanodeDetails findDatanodeFromUUID(UUID uuid) {
private DatanodeDetails findDatanode(DatanodeID uuid) {
for (DatanodeDetails dn : DATANODE_ALIASES.values()) {
if (dn.getUuid().equals(uuid)) {
if (dn.getID().equals(uuid)) {
return dn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testOnMessage(@TempDir File tempDir) throws Exception {
// Now set the node to anything other than IN_MAINTENANCE and the relevant
// replicas should be removed
DeleteBlocksCommand cmd = new DeleteBlocksCommand(Collections.emptyList());
nodeManager.addDatanodeCommand(datanode1.getUuid(), cmd);
nodeManager.addDatanodeCommand(datanode1.getID(), cmd);
nodeManager.setNodeOperationalState(datanode1,
HddsProtos.NodeOperationalState.IN_SERVICE);
deadNodeHandler.onMessage(datanode1, publisher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
// should be instructed to finalize.
verify(eventPublisher, times(1))
.fireEvent(eq(DATANODE_COMMAND), captor.capture());
assertEquals(captor.getValue().getDatanodeId(), node1.getUuid());
assertEquals(captor.getValue().getDatanodeId(), node1.getID());
assertEquals(captor.getValue().getCommand().getType(),
finalizeNewLayoutVersionCommand);
} else {
Expand Down Expand Up @@ -976,11 +976,11 @@ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
verify(eventPublisher,
times(1)).fireEvent(NEW_NODE, node1);
for (int i = 0; i < 3; i++) {
nodeManager.addDatanodeCommand(node1.getUuid(), ReplicateContainerCommand
nodeManager.addDatanodeCommand(node1.getID(), ReplicateContainerCommand
.toTarget(1, MockDatanodeDetails.randomDatanodeDetails()));
}
for (int i = 0; i < 5; i++) {
nodeManager.addDatanodeCommand(node1.getUuid(),
nodeManager.addDatanodeCommand(node1.getID(),
new DeleteBlocksCommand(emptyList()));
}

Expand Down Expand Up @@ -1036,7 +1036,7 @@ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),

// Add a a few more commands to the queue and check the counts are the sum.
for (int i = 0; i < 5; i++) {
nodeManager.addDatanodeCommand(node1.getUuid(),
nodeManager.addDatanodeCommand(node1.getID(),
new CloseContainerCommand(1, PipelineID.randomId()));
}
assertEquals(0, nodeManager.getTotalDatanodeCommandCount(
Expand Down
Loading