Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ private static HddsProtos.UUID toProto(final UUID id) {
}

// TODO: Remove this in follow-up Jira. (HDDS-12015)
UUID getUuid() {
// Exposing this temporarily to help with refactoring.
@Deprecated
public UUID getUuid() {
return uuid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.Message;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;

/**
Expand All @@ -28,22 +29,21 @@
public class CommandForDatanode<T extends Message> implements
IdentifiableEventPayload {

private final UUID datanodeId;

private final DatanodeID datanodeId;
private final SCMCommand<T> command;

public CommandForDatanode(DatanodeDetails datanode, SCMCommand<T> command) {
this(datanode.getUuid(), command);
this(datanode.getID(), command);
}

// TODO: Command for datanode should take DatanodeDetails as parameter.
public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) {
public CommandForDatanode(DatanodeID datanodeId, SCMCommand<T> command) {
this.datanodeId = datanodeId;
this.command = command;
}

@Deprecated
public UUID getDatanodeId() {
return datanodeId;
return datanodeId.getUuid();
}

public SCMCommand<T> getCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;

/**
Expand All @@ -32,7 +32,7 @@
*/
class DatanodeDeletedBlockTransactions {
// A list of TXs mapped to a certain datanode ID.
private final Map<UUID, List<DeletedBlocksTransaction>> transactions =
private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions =
new HashMap<>();
// counts blocks deleted across datanodes. Blocks deleted will be counted
// for all the replicas and may not be unique.
Expand All @@ -41,7 +41,7 @@ class DatanodeDeletedBlockTransactions {
DatanodeDeletedBlockTransactions() {
}

void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
void addTransactionToDN(DatanodeID dnID, DeletedBlocksTransaction tx) {
transactions.computeIfAbsent(dnID, k -> new LinkedList<>()).add(tx);
blocksDeleted += tx.getLocalIDCount();
if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
Expand All @@ -51,15 +51,15 @@ void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
}
}

Map<UUID, List<DeletedBlocksTransaction>> getDatanodeTransactionMap() {
Map<DatanodeID, List<DeletedBlocksTransaction>> getDatanodeTransactionMap() {
return transactions;
}

int getBlocksDeleted() {
return blocksDeleted;
}

List<String> getTransactionIDList(UUID dnId) {
List<String> getTransactionIDList(DatanodeID dnId) {
return Optional.ofNullable(transactions.get(dnId))
.orElse(new LinkedList<>())
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
Expand Down Expand Up @@ -91,7 +91,7 @@ void incrementCount(List<Long> txIDs)
* @param dnTxSet Set of transaction IDs for the DataNode.
*/
void recordTransactionCreated(
UUID dnId, long scmCmdId, Set<Long> dnTxSet);
DatanodeID dnId, long scmCmdId, Set<Long> dnTxSet);

/**
* Handles the cleanup process when a DataNode is reported dead. This method
Expand All @@ -100,7 +100,7 @@ void recordTransactionCreated(
*
* @param dnId The identifier of the dead DataNode.
*/
void onDatanodeDead(UUID dnId);
void onDatanodeDead(DatanodeID dnId);

/**
* Records the event of sending a block deletion command to a DataNode. This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
Expand Down Expand Up @@ -342,7 +343,7 @@ private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDetails details = replica.getDatanodeDetails();
if (!transactionStatusManager.isDuplication(
details, updatedTxn.getTxID(), commandStatus)) {
transactions.addTransactionToDN(details.getUuid(), updatedTxn);
transactions.addTransactionToDN(details.getID(), updatedTxn);
metrics.incrProcessedTransaction();
}
}
Expand Down Expand Up @@ -506,10 +507,10 @@ public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
}

@Override
public void recordTransactionCreated(UUID dnId, long scmCmdId,
public void recordTransactionCreated(DatanodeID dnId, long scmCmdId,
Set<Long> dnTxSet) {
getSCMDeletedBlockTransactionStatusManager()
.recordTransactionCreated(dnId, scmCmdId, dnTxSet);
.recordTransactionCreated(dnId.getUuid(), scmCmdId, dnTxSet);
}

@Override
Expand All @@ -518,8 +519,8 @@ public int getTransactionToDNsCommitMapSize() {
}

@Override
public void onDatanodeDead(UUID dnId) {
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
public void onDatanodeDead(DatanodeID dnId) {
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId.getUuid());
}

@Override
Expand All @@ -536,7 +537,7 @@ public void onMessage(
}

DatanodeDetails details = deleteBlockStatus.getDatanodeDetails();
UUID dnId = details.getUuid();
DatanodeID dnId = details.getID();
for (CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) {
CommandStatus.Status status = commandStatus.getStatus();
lock.lock();
Expand All @@ -545,7 +546,7 @@ public void onMessage(
ContainerBlocksDeletionACKProto ackProto =
commandStatus.getBlockDeletionAck();
getSCMDeletedBlockTransactionStatusManager()
.commitTransactions(ackProto.getResultsList(), dnId);
.commitTransactions(ackProto.getResultsList(), dnId.getUuid());
metrics.incrBlockDeletionCommandSuccess();
metrics.incrDNCommandsSuccess(dnId, 1);
} else if (status == CommandStatus.Status.FAILED) {
Expand All @@ -557,7 +558,7 @@ public void onMessage(
}

getSCMDeletedBlockTransactionStatusManager()
.commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
.commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId.getUuid());
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -38,6 +37,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.ScmConfig;
Expand Down Expand Up @@ -183,9 +183,9 @@ public EmptyTaskResult call() throws Exception {
}

Set<Long> processedTxIDs = new HashSet<>();
for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
transactions.getDatanodeTransactionMap().entrySet()) {
UUID dnId = entry.getKey();
DatanodeID dnId = entry.getKey();
List<DeletedBlocksTransaction> dnTXs = entry.getValue();
if (!dnTXs.isEmpty()) {
Set<Long> dnTxSet = dnTXs.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.hadoop.hdds.scm.block;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
Expand Down Expand Up @@ -95,7 +95,7 @@ public final class ScmBlockDeletingServiceMetrics implements MetricsSource {
@Metric(about = "The number of dataNodes of delete transactions.")
private MutableGaugeLong numBlockDeletionTransactionDataNodes;

private final Map<UUID, DatanodeCommandCounts> numCommandsDatanode = new ConcurrentHashMap<>();
private final Map<DatanodeID, DatanodeCommandCounts> numCommandsDatanode = new ConcurrentHashMap<>();

private ScmBlockDeletingServiceMetrics() {
this.registry = new MetricsRegistry(SOURCE_NAME);
Expand Down Expand Up @@ -164,17 +164,17 @@ public void setNumBlockDeletionTransactionDataNodes(long dataNodes) {
this.numBlockDeletionTransactionDataNodes.set(dataNodes);
}

public void incrDNCommandsSent(UUID id, long delta) {
public void incrDNCommandsSent(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsSent(delta);
}

public void incrDNCommandsSuccess(UUID id, long delta) {
public void incrDNCommandsSuccess(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsSuccess(delta);
}

public void incrDNCommandsFailure(UUID id, long delta) {
public void incrDNCommandsFailure(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsFailure(delta);
}
Expand Down Expand Up @@ -239,7 +239,7 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
numBlockDeletionTransactionDataNodes.snapshot(builder, all);

MetricsRecordBuilder recordBuilder = builder;
for (Map.Entry<UUID, DatanodeCommandCounts> e : numCommandsDatanode.entrySet()) {
for (Map.Entry<DatanodeID, DatanodeCommandCounts> e : numCommandsDatanode.entrySet()) {
recordBuilder = recordBuilder.endRecord().addRecord(SOURCE_NAME)
.add(new MetricsTag(Interns.info("datanode",
"Datanode host for deletion commands"), e.getKey().toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private Void triggerCloseCallback(
throws ContainerNotFoundException {
getNodes(container).forEach(node ->
publisher.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(node.getUuid(), command)));
new CommandForDatanode<>(node, command)));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
// remove DeleteBlocksCommand associated with the dead node unless it
// is IN_MAINTENANCE
if (deletedBlockLog != null && !isNodeInMaintenance) {
deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
deletedBlockLog.onDatanodeDead(datanodeDetails.getID());
}

//move dead datanode out of ClusterNetworkTopology
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ protected void sendFinalizeToDatanodeIfNeeded(DatanodeDetails datanodeDetails,
// Send Finalize command to the data node. Its OK to
// send Finalize command multiple times.
scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanodeDetails.getUuid(),
new CommandForDatanode<>(datanodeDetails,
finalizeCmd));
} catch (NotLeaderException ex) {
LOG.warn("Skip sending finalize upgrade command since current SCM" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void closeUnknownPipeline(final EventPublisher publisher,
SCMCommand<?> command = new ClosePipelineCommand(pid);
command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanode.getUuid(), command));
new CommandForDatanode<>(datanode, command));
} catch (NotLeaderException nle) {
LOG.info("Cannot process Pipeline Action for pipeline {} as " +
"current SCM is not leader anymore.", pid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private void handlePipelineNotFoundException(final PipelineReport report,
final SCMCommand<?> command = new ClosePipelineCommand(pipelineID);
command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dn.getUuid(), command));
new CommandForDatanode<>(dn, command));
} catch (NotLeaderException ex) {
// Do nothing if the leader has changed.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,
LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}",
pipeline.getId(), node);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(node.getUuid(), createCommand));
new CommandForDatanode<>(node, createCommand));
});

return pipeline;
Expand Down Expand Up @@ -269,7 +269,7 @@ public void close(Pipeline pipeline) throws NotLeaderException {
closeCommand.setTerm(scmContext.getTermOfLeader());
pipeline.getNodes().forEach(node -> {
final CommandForDatanode<?> datanodeCommand =
new CommandForDatanode<>(node.getUuid(), closeCommand);
new CommandForDatanode<>(node, closeCommand);
LOG.info("Send pipeline:{} close command to datanode {}",
pipeline.getId(), datanodeCommand.getDatanodeId());
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
Expand Down
Loading
Loading