Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
*/
public class CommandQueue {
private final Map<UUID, Commands> commandMap;
private final Map<UUID, Map<SCMCommandProto.Type, Integer>> summaryMap;
private final Lock lock;
private long commandsInQueue;

Expand All @@ -60,7 +59,6 @@ public long getCommandsInQueue() {
*/
public CommandQueue() {
commandMap = new HashMap<>();
summaryMap = new HashMap<>();
lock = new ReentrantLock();
commandsInQueue = 0;
}
Expand All @@ -73,7 +71,6 @@ public void clear() {
lock.lock();
try {
commandMap.clear();
summaryMap.clear();
commandsInQueue = 0;
} finally {
lock.unlock();
Expand All @@ -93,7 +90,6 @@ List<SCMCommand> getCommand(final UUID datanodeUuid) {
lock.lock();
try {
Commands cmds = commandMap.remove(datanodeUuid);
summaryMap.remove(datanodeUuid);
List<SCMCommand> cmdList = null;
if (cmds != null) {
cmdList = cmds.getCommands();
Expand All @@ -118,11 +114,11 @@ public int getDatanodeCommandCount(
final UUID datanodeUuid, SCMCommandProto.Type commandType) {
lock.lock();
try {
Map<SCMCommandProto.Type, Integer> summary = summaryMap.get(datanodeUuid);
if (summary == null) {
Commands commands = commandMap.get(datanodeUuid);
if (commands == null) {
return 0;
}
return summary.getOrDefault(commandType, 0);
return commands.getCommandSummary(commandType);
} finally {
lock.unlock();
}
Expand All @@ -140,10 +136,6 @@ public void addCommand(final UUID datanodeUuid, final SCMCommand
try {
commandMap.computeIfAbsent(datanodeUuid, s -> new Commands())
.add(command);
Map<SCMCommandProto.Type, Integer> summary =
summaryMap.computeIfAbsent(datanodeUuid, s -> new HashMap<>());
summary.put(command.getType(),
summary.getOrDefault(command.getType(), 0) + 1);
commandsInQueue++;
} finally {
lock.unlock();
Expand All @@ -154,18 +146,10 @@ public void addCommand(final UUID datanodeUuid, final SCMCommand
* Class that stores commands for a datanode.
*/
private static class Commands {
private long updateTime;
private long readTime;
private List<SCMCommand> commands;

/**
* Constructs a Commands class.
*/
Commands() {
commands = new ArrayList<>();
updateTime = 0;
readTime = 0;
}
private long updateTime = 0;
private long readTime = 0;
private List<SCMCommand> commands = new ArrayList<>();
private final Map<SCMCommandProto.Type, Integer> summary = new HashMap<>();

/**
* Gets the last time the commands for this node was updated.
Expand All @@ -190,16 +174,23 @@ public long getReadTime() {
*/
public void add(SCMCommand command) {
this.commands.add(command);
summary.put(command.getType(),
summary.getOrDefault(command.getType(), 0) + 1);
updateTime = Time.monotonicNow();
}

public int getCommandSummary(SCMCommandProto.Type commandType) {
return summary.getOrDefault(commandType, 0);
}

/**
* Returns the commands for this datanode.
* @return command list.
*/
public List<SCMCommand> getCommands() {
List<SCMCommand> temp = this.commands;
this.commands = new ArrayList<>();
summary.clear();
readTime = Time.monotonicNow();
return temp;
}
Expand Down