Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -31,16 +30,16 @@
public final class MockDatanodeDetails {

/**
* Creates DatanodeDetails with random UUID and random IP address.
* Creates DatanodeDetails with random ID and random IP address.
*
* @return DatanodeDetails
*/
public static DatanodeDetails randomDatanodeDetails() {
return createDatanodeDetails(UUID.randomUUID());
return createDatanodeDetails(DatanodeID.randomID());
}

/**
* Creates DatanodeDetails with random UUID, specific hostname and network
* Creates DatanodeDetails with random DatanodeID, specific hostname and network
* location.
*
* @return DatanodeDetails
Expand All @@ -52,46 +51,46 @@ public static DatanodeDetails createDatanodeDetails(String hostname,
+ "." + random.nextInt(256)
+ "." + random.nextInt(256)
+ "." + random.nextInt(256);
return createDatanodeDetails(UUID.randomUUID().toString(), hostname,
return createDatanodeDetails(DatanodeID.randomID(), hostname,
ipAddress, loc);
}

/**
* Creates DatanodeDetails using the given UUID.
* Creates DatanodeDetails using the given DatanodeID.
*
* @param uuid Datanode's UUID
* @param id Datanode's ID
*
* @return DatanodeDetails
*/
public static DatanodeDetails createDatanodeDetails(UUID uuid) {
public static DatanodeDetails createDatanodeDetails(DatanodeID id) {
Random random = ThreadLocalRandom.current();
String ipAddress = random.nextInt(256)
+ "." + random.nextInt(256)
+ "." + random.nextInt(256)
+ "." + random.nextInt(256);
return createDatanodeDetails(uuid.toString(), "localhost" + "-" + ipAddress,
return createDatanodeDetails(id, "localhost" + "-" + ipAddress,
ipAddress, null);
}

/**
* Creates DatanodeDetails with the given information.
*
* @param uuid Datanode's UUID
* @param id Datanode's ID
* @param hostname hostname of Datanode
* @param ipAddress ip address of Datanode
*
* @return DatanodeDetails
*/
public static DatanodeDetails createDatanodeDetails(String uuid,
public static DatanodeDetails createDatanodeDetails(DatanodeID id,
String hostname, String ipAddress, String networkLocation) {
return createDatanodeDetails(uuid, hostname, ipAddress, networkLocation, 0);
return createDatanodeDetails(id, hostname, ipAddress, networkLocation, 0);
}

public static DatanodeDetails createDatanodeDetails(String uuid,
public static DatanodeDetails createDatanodeDetails(DatanodeID id,
String hostname, String ipAddress, String networkLocation, int port) {

DatanodeDetails.Builder dn = DatanodeDetails.newBuilder()
.setUuid(UUID.fromString(uuid))
.setID(id)
.setHostName(hostname)
.setIpAddress(ipAddress)
.setNetworkLocation(networkLocation)
Expand All @@ -106,12 +105,12 @@ public static DatanodeDetails createDatanodeDetails(String uuid,
}

/**
* Creates DatanodeDetails with random UUID and valid local address and port.
* Creates DatanodeDetails with random ID and valid local address and port.
*
* @return DatanodeDetails
*/
public static DatanodeDetails randomLocalDatanodeDetails() {
return createDatanodeDetails(UUID.randomUUID().toString(),
return createDatanodeDetails(DatanodeID.randomID(),
GenericTestUtils.PortAllocator.HOSTNAME,
GenericTestUtils.PortAllocator.HOST_ADDRESS, null,
GenericTestUtils.PortAllocator.getFreePort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ protected Set<DatanodeDetails> getDatanodesWithinCommandLimit(
final Set<DatanodeDetails> included = new HashSet<>();
for (DatanodeDetails dn : datanodes) {
if (nodeManager.getTotalDatanodeCommandCount(dn, Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit
&& nodeManager.getCommandQueueCount(dn.getUuid(), Type.deleteBlocksCommand) < 2) {
&& nodeManager.getCommandQueueCount(dn.getID(), Type.deleteBlocksCommand) < 2) {
included.add(dn);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

Expand All @@ -38,7 +38,7 @@
* Note this class is not thread safe, and accesses must be protected by a lock.
*/
public class CommandQueue {
private final Map<UUID, Commands> commandMap;
private final Map<DatanodeID, Commands> commandMap;
private long commandsInQueue;

/**
Expand Down Expand Up @@ -71,12 +71,11 @@ public void clear() {
* commands returns a empty list otherwise the current set of
* commands are returned and command map set to empty list again.
*
* @param datanodeUuid Datanode UUID
* @return List of SCM Commands.
*/
@SuppressWarnings("unchecked")
List<SCMCommand<?>> getCommand(final UUID datanodeUuid) {
Commands cmds = commandMap.remove(datanodeUuid);
List<SCMCommand<?>> getCommand(final DatanodeID datanodeID) {
Commands cmds = commandMap.remove(datanodeID);
List<SCMCommand<?>> cmdList = null;
if (cmds != null) {
cmdList = cmds.getCommands();
Expand All @@ -93,13 +92,13 @@ List<SCMCommand<?>> getCommand(final UUID datanodeUuid) {
* Command.contributesToQueueSize() method will not be included in the count.
* At the current time, only low priority ReplicateContainerCommands meet this
* condition.
* @param datanodeUuid Datanode UUID.
* @param datanodeID Datanode ID.
* @param commandType The type of command for which to get the count.
* @return The currently queued command count, or zero if none are queued.
*/
public int getDatanodeCommandCount(
final UUID datanodeUuid, SCMCommandProto.Type commandType) {
Commands commands = commandMap.get(datanodeUuid);
final DatanodeID datanodeID, SCMCommandProto.Type commandType) {
Commands commands = commandMap.get(datanodeID);
if (commands == null) {
return 0;
}
Expand All @@ -112,27 +111,21 @@ public int getDatanodeCommandCount(
* Command.contributesToQueueSize() method will not be included in the count.
* At the current time, only low priority ReplicateContainerCommands meet this
* condition.
* @param datanodeUuid Datanode UUID
* @return A map containing the command summary. Note the returned map is a
* copy of the internal map and can be modified safely by the caller.
*/
public Map<SCMCommandProto.Type, Integer> getDatanodeCommandSummary(
final UUID datanodeUuid) {
Commands commands = commandMap.get(datanodeUuid);
final DatanodeID datanodeID) {
Commands commands = commandMap.get(datanodeID);
if (commands == null) {
return Collections.emptyMap();
}
return commands.getAllCommandsSummary();
}

/**
* Adds a Command to the SCM Queue to send the command to container.
*
* @param datanodeUuid DatanodeDetails.Uuid
* @param command - Command
*/
public void addCommand(final UUID datanodeUuid, final SCMCommand<?> command) {
commandMap.computeIfAbsent(datanodeUuid, s -> new Commands()).add(command);
/** Adds a Command to the SCM Queue to send the command to container. */
public void addCommand(final DatanodeID datanodeID, final SCMCommand<?> command) {
commandMap.computeIfAbsent(datanodeID, s -> new Commands()).add(command);
commandsInQueue++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,

// remove commands in command queue for the DN
final List<SCMCommand<?>> cmdList = nodeManager.getCommandQueue(
datanodeDetails.getUuid());
datanodeDetails.getID());
LOG.info("Clearing command queue of size {} for DN {}",
cmdList.size(), datanodeDetails);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
Expand Down Expand Up @@ -312,11 +311,11 @@ int getNodeQueuedCommandCount(DatanodeDetails datanodeDetails,
/**
* Get the number of commands of the given type queued in the SCM CommandQueue
* for the given datanode.
* @param dnID The UUID of the datanode.
* @param dnID The ID of the datanode.
* @param cmdType The Type of command to query the current count for.
* @return The count of commands queued, or zero if none.
*/
int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType);
int getCommandQueueCount(DatanodeID dnID, SCMCommandProto.Type cmdType);

/**
* Get the total number of pending commands of the given type on the given
Expand Down Expand Up @@ -354,11 +353,10 @@ Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(

/**
* Get list of SCMCommands in the Command Queue for a particular Datanode.
* @param dnID - Datanode uuid.
* @return list of commands
*/
// TODO: We can give better name to this method!
List<SCMCommand<?>> getCommandQueue(UUID dnID);
List<SCMCommand<?>> getCommandQueue(DatanodeID dnID);

/** @return the datanode of the given id if it exists; otherwise, return null. */
@Nullable DatanodeDetails getNode(@Nullable DatanodeID id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -547,9 +546,9 @@ public List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
writeLock().lock();
try {
Map<SCMCommandProto.Type, Integer> summary =
commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
commandQueue.getDatanodeCommandSummary(datanodeDetails.getID());
List<SCMCommand<?>> commands =
commandQueue.getCommand(datanodeDetails.getUuid());
commandQueue.getCommand(datanodeDetails.getID());

// Update the SCMCommand of deleteBlocksCommand Status
for (SCMCommand<?> command : commands) {
Expand Down Expand Up @@ -853,7 +852,7 @@ public int getNodeQueuedCommandCount(DatanodeDetails datanodeDetails,
* @return The count of commands queued, or zero if none.
*/
@Override
public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
public int getCommandQueueCount(DatanodeID dnID, SCMCommandProto.Type cmdType) {
readLock().lock();
try {
return commandQueue.getDatanodeCommandCount(dnID, cmdType);
Expand Down Expand Up @@ -885,7 +884,7 @@ public int getTotalDatanodeCommandCount(DatanodeDetails datanodeDetails,
". Assuming zero", datanodeDetails, cmdType);
dnCount = 0;
}
return getCommandQueueCount(datanodeDetails.getUuid(), cmdType) + dnCount;
return getCommandQueueCount(datanodeDetails.getID(), cmdType) + dnCount;
} finally {
readLock().unlock();
}
Expand Down Expand Up @@ -1490,7 +1489,7 @@ public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
getHealthyVolumeCount());
} catch (NodeNotFoundException e) {
LOG.warn("Cannot generate NodeStat, datanode {} not found.",
dn.getUuid());
dn.getID());
}
}
Preconditions.checkArgument(!volumeCountList.isEmpty());
Expand Down Expand Up @@ -1525,7 +1524,7 @@ public int pipelineLimit(DatanodeDetails dn) {
}
} catch (NodeNotFoundException e) {
LOG.warn("Cannot generate NodeStat, datanode {} not found.",
dn.getUuid());
dn.getID());
}
return 0;
}
Expand Down Expand Up @@ -1647,10 +1646,9 @@ public int getPipeLineCount(DatanodeDetails datanodeDetails)

@Override
public void addDatanodeCommand(DatanodeID datanodeID, SCMCommand<?> command) {
final UUID dnId = datanodeID.getUuid();
writeLock().lock();
try {
this.commandQueue.addCommand(dnId, command);
this.commandQueue.addCommand(datanodeID, command);
} finally {
writeLock().unlock();
}
Expand Down Expand Up @@ -1690,7 +1688,7 @@ public void onMessage(CommandForDatanode commandForDatanode,
}

@Override
public List<SCMCommand<?>> getCommandQueue(UUID dnID) {
public List<SCMCommand<?>> getCommandQueue(DatanodeID dnID) {
// Getting the queue actually clears it and returns the commands, so this
// is a write operation and not a read as the method name suggests.
writeLock().lock();
Expand Down Expand Up @@ -1836,7 +1834,7 @@ public void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExcep
}
nodeStateManager.removeNode(datanodeDetails.getID());
removeFromDnsToDnIdMap(datanodeDetails.getID(), datanodeDetails.getIpAddress());
final List<SCMCommand<?>> cmdList = getCommandQueue(datanodeDetails.getUuid());
final List<SCMCommand<?>> cmdList = getCommandQueue(datanodeDetails.getID());
LOG.info("Clearing command queue of size {} for DN {}", cmdList.size(), datanodeDetails);
} else {
LOG.warn("Node not decommissioned or dead, cannot remove: {}", datanodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import com.google.protobuf.Message;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
Expand Down Expand Up @@ -90,7 +90,7 @@ public List<SCMCommand<?>> dispatch(SCMHeartbeatRequestProto heartbeat) {
if (!nodeManager.isNodeRegistered(datanodeDetails)) {
LOG.info("SCM received heartbeat from an unregistered datanode {}. " +
"Asking datanode to re-register.", datanodeDetails);
UUID dnID = datanodeDetails.getUuid();
DatanodeID dnID = datanodeDetails.getID();
nodeManager.addDatanodeCommand(datanodeDetails.getID(), new ReregisterCommand());

commands = nodeManager.getCommandQueue(dnID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private HddsTestUtils() {
public static DatanodeDetails getDatanodeDetails(
RegisteredCommand registeredCommand) {
return MockDatanodeDetails.createDatanodeDetails(
registeredCommand.getDatanode().getUuidString(),
registeredCommand.getDatanode().getID(),
registeredCommand.getDatanode().getHostName(),
registeredCommand.getDatanode().getIpAddress(),
null);
Expand Down
Loading