Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class StateContext {

static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
private final Queue<SCMCommand> commandQueue;
private final Queue<SCMCommand<?>> commandQueue;
private final Map<Long, CommandStatus> cmdStatusMap;
private final Lock lock;
private final DatanodeStateMachine parentDatanodeStateMachine;
Expand Down Expand Up @@ -738,7 +738,7 @@ public OptionalLong getTermOfLeaderSCM() {
*
* @return SCMCommand or Null.
*/
public SCMCommand getNextCommand() {
public SCMCommand<?> getNextCommand() {
lock.lock();
try {
initTermOfLeaderSCM();
Expand Down Expand Up @@ -772,7 +772,7 @@ public SCMCommand getNextCommand() {
*
* @param command - SCMCommand.
*/
public void addCommand(SCMCommand command) {
public void addCommand(SCMCommand<?> command) {
lock.lock();
try {
if (commandQueue.size() >= maxCommandQueueLimit) {
Expand All @@ -792,7 +792,7 @@ public Map<SCMCommandProto.Type, Integer> getCommandQueueSummary() {
Map<SCMCommandProto.Type, Integer> summary = new HashMap<>();
lock.lock();
try {
for (SCMCommand cmd : commandQueue) {
for (SCMCommand<?> cmd : commandQueue) {
summary.put(cmd.getType(), summary.getOrDefault(cmd.getType(), 0) + 1);
}
} finally {
Expand Down Expand Up @@ -832,7 +832,7 @@ public void addCmdStatus(Long key, CommandStatus status) {
*
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
public void addCmdStatus(SCMCommand<?> cmd) {
if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
addCmdStatus(cmd.getId(),
DeleteBlockCommandStatusBuilder.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public CloseContainerCommandHandler(
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public ClosePipelineCommandHandler(
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public CommandHandler getDeleteBlocksCommandHandler() {
*
* @param command - SCM Command.
*/
public void handle(SCMCommand command) {
public void handle(SCMCommand<?> command) {
Preconditions.checkNotNull(command);
CommandHandler handler = handlerMap.get(command.getType());
if (handler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface CommandHandler {
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
void handle(SCMCommand command, OzoneContainer container,
void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager);

/**
Expand Down Expand Up @@ -68,7 +68,7 @@ void handle(SCMCommand command, OzoneContainer container,
/**
* Default implementation for updating command status.
*/
default void updateCommandStatus(StateContext context, SCMCommand command,
default void updateCommandStatus(StateContext context, SCMCommand<?> command,
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
log.warn("{} with Id:{} not found.", command.getType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public CreatePipelineCommandHandler(ConfigurationSource conf,
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public DeleteBlocksCommandHandler(OzoneContainer container,
}

@Override
public void handle(SCMCommand command, OzoneContainer container,
public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected DeleteContainerCommandHandler(Clock clock,
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms");
}
@Override
public void handle(final SCMCommand command,
public void handle(final SCMCommand<?> command,
final OzoneContainer ozoneContainer,
final StateContext context,
final SCMConnectionManager connectionManager) {
Expand All @@ -93,7 +93,7 @@ public void handle(final SCMCommand command,
}
}

private void handleInternal(SCMCommand command, StateContext context,
private void handleInternal(SCMCommand<?> command, StateContext context,
DeleteContainerCommand deleteContainerCommand,
ContainerController controller) {
final long startTime = Time.monotonicNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public FinalizeNewLayoutVersionCommandHandler() {
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
invocationCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ReconstructECContainersCommandHandler(ConfigurationSource conf,
}

@Override
public void handle(SCMCommand command, OzoneContainer container,
public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
ReconstructECContainersCommand ecContainersCommand =
(ReconstructECContainersCommand) command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public RefreshVolumeUsageCommandHandler() {
}

@Override
public void handle(SCMCommand command, OzoneContainer container,
public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("receive command to refresh usage info of all volumes");
invocationCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String getMetricsName() {
}

@Override
public void handle(SCMCommand command, OzoneContainer container,
public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {

final ReplicateContainerCommand replicateCommand =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public SetNodeOperationalStateCommandHandler(ConfigurationSource conf,
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer container,
public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
* @param datanodeDetails - Datanode ID.
* @return Commands to be sent to the datanode.
*/
default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
default List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails) {
return processHeartbeat(datanodeDetails, null);
};

Expand All @@ -80,7 +80,7 @@ default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
* heartbeating datanode.
* @return Commands to be sent to the datanode.
*/
List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto queueReport);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public enum HealthState {

private final ContainerInfo containerInfo;
private final HealthState healthState;
private final List<SCMCommand> commands = new ArrayList<>();
private final List<SCMCommand<?>> commands = new ArrayList<>();

public ContainerHealthResult(ContainerInfo containerInfo,
HealthState healthState) {
Expand All @@ -52,11 +52,11 @@ public HealthState getHealthState() {
return healthState;
}

public void addCommand(SCMCommand command) {
public void addCommand(SCMCommand<?> command) {
commands.add(command);
}

public List<SCMCommand> getCommands() {
public List<SCMCommand<?>> getCommands() {
return commands;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;

/**
* Command Queue is queue of commands for the datanode.
Expand All @@ -52,8 +51,6 @@ public long getCommandsInQueue() {

/**
* Constructs a Command Queue.
* TODO : Add a flusher thread that throws away commands older than a certain
* time period.
*/
public CommandQueue() {
commandMap = new HashMap<>();
Expand All @@ -78,9 +75,9 @@ public void clear() {
* @return List of SCM Commands.
*/
@SuppressWarnings("unchecked")
List<SCMCommand> getCommand(final UUID datanodeUuid) {
List<SCMCommand<?>> getCommand(final UUID datanodeUuid) {
Commands cmds = commandMap.remove(datanodeUuid);
List<SCMCommand> cmdList = null;
List<SCMCommand<?>> cmdList = null;
if (cmds != null) {
cmdList = cmds.getCommands();
commandsInQueue -= !cmdList.isEmpty() ? cmdList.size() : 0;
Expand Down Expand Up @@ -134,8 +131,7 @@ public Map<SCMCommandProto.Type, Integer> getDatanodeCommandSummary(
* @param datanodeUuid DatanodeDetails.Uuid
* @param command - Command
*/
public void addCommand(final UUID datanodeUuid, final SCMCommand
command) {
public void addCommand(final UUID datanodeUuid, final SCMCommand<?> command) {
commandMap.computeIfAbsent(datanodeUuid, s -> new Commands()).add(command);
commandsInQueue++;
}
Expand All @@ -144,39 +140,20 @@ public void addCommand(final UUID datanodeUuid, final SCMCommand
* Class that stores commands for a datanode.
*/
private static class Commands {
private long updateTime = 0;
private long readTime = 0;
private List<SCMCommand> commands = new ArrayList<>();
private List<SCMCommand<?>> commands = new ArrayList<>();
private final Map<SCMCommandProto.Type, Integer> summary = new HashMap<>();

/**
* Gets the last time the commands for this node was updated.
* @return Time stamp
*/
public long getUpdateTime() {
return updateTime;
}

/**
* Gets the last read time.
* @return last time when these commands were read from this queue.
*/
public long getReadTime() {
return readTime;
}

/**
* Adds a command to the list.
*
* @param command SCMCommand
*/
public void add(SCMCommand command) {
public void add(SCMCommand<?> command) {
this.commands.add(command);
if (command.contributesToQueueSize()) {
summary.put(command.getType(),
summary.getOrDefault(command.getType(), 0) + 1);
}
updateTime = Time.monotonicNow();
}

public int getCommandSummary(SCMCommandProto.Type commandType) {
Expand All @@ -191,11 +168,10 @@ public Map<SCMCommandProto.Type, Integer> getAllCommandsSummary() {
* Returns the commands for this datanode.
* @return command list.
*/
public List<SCMCommand> getCommands() {
List<SCMCommand> temp = this.commands;
public List<SCMCommand<?>> getCommands() {
List<SCMCommand<?>> temp = this.commands;
this.commands = new ArrayList<>();
summary.clear();
readTime = Time.monotonicNow();
return temp;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
}

// remove commands in command queue for the DN
final List<SCMCommand> cmdList = nodeManager.getCommandQueue(
final List<SCMCommand<?>> cmdList = nodeManager.getCommandQueue(
datanodeDetails.getUuid());
LOG.info("Clearing command queue of size {} for DN {}",
cmdList.size(), datanodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
* @param dnId datanode uuid
* @param command
*/
void addDatanodeCommand(UUID dnId, SCMCommand command);
void addDatanodeCommand(UUID dnId, SCMCommand<?> command);


/**
Expand Down Expand Up @@ -368,7 +368,7 @@ Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
* @return list of commands
*/
// TODO: We can give better name to this method!
List<SCMCommand> getCommandQueue(UUID dnID);
List<SCMCommand<?>> getCommandQueue(UUID dnID);

/**
* Given datanode uuid, returns the DatanodeDetails for the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ private boolean isVersionChange(String oldVersion, String newVersion) {
* @return SCMheartbeat response.
*/
@Override
public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
public List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto queueReport) {
Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
"DatanodeDetails.");
Expand All @@ -550,7 +550,7 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
try {
Map<SCMCommandProto.Type, Integer> summary =
commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
List<SCMCommand> commands =
List<SCMCommand<?>> commands =
commandQueue.getCommand(datanodeDetails.getUuid());

// Update the SCMCommand of deleteBlocksCommand Status
Expand Down Expand Up @@ -1635,7 +1635,7 @@ public int getPipeLineCount(DatanodeDetails datanodeDetails)
}

@Override
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
writeLock().lock();
try {
this.commandQueue.addCommand(dnId, command);
Expand Down Expand Up @@ -1678,7 +1678,7 @@ public void onMessage(CommandForDatanode commandForDatanode,
}

@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
public List<SCMCommand<?>> getCommandQueue(UUID dnID) {
// Getting the queue actually clears it and returns the commands, so this
// is a write operation and not a read as the method name suggests.
writeLock().lock();
Expand Down Expand Up @@ -1846,7 +1846,7 @@ public void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExcep
}
nodeStateManager.removeNode(datanodeDetails);
removeFromDnsToUuidMap(datanodeDetails.getUuid(), datanodeDetails.getIpAddress());
final List<SCMCommand> cmdList = getCommandQueue(datanodeDetails.getUuid());
final List<SCMCommand<?>> cmdList = getCommandQueue(datanodeDetails.getUuid());
LOG.info("Clearing command queue of size {} for DN {}", cmdList.size(), datanodeDetails);
} else {
LOG.warn("Node not decommissioned or dead, cannot remove: {}", datanodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager,
*
* @return list of SCMCommand
*/
public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
public List<SCMCommand<?>> dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails =
DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
List<SCMCommand> commands;
List<SCMCommand<?>> commands;

// If node is not registered, ask the node to re-register. Do not process
// Heartbeat for unregistered nodes.
Expand Down
Loading