Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.metrics2.annotation.Metric;
Expand Down Expand Up @@ -76,7 +76,7 @@ public final class ContainerClientMetrics {
private MutableQuantiles[] datanodeHsyncLatencyNs;
private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
private final Map<DatanodeID, MutableCounterLong> writeChunksCallsByLeaders;
private final MetricsRegistry registry;

public static synchronized ContainerClientMetrics acquire() {
Expand Down Expand Up @@ -272,7 +272,7 @@ Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
return writeChunkCallsByPipeline;
}

Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
Map<DatanodeID, MutableCounterLong> getWriteChunksCallsByLeaders() {
return writeChunksCallsByLeaders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import static org.mockito.Mockito.mock;

import java.util.Collections;
import java.util.UUID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -45,9 +45,9 @@ public void setup() {
public void testRecordChunkMetrics() {
ContainerClientMetrics metrics = ContainerClientMetrics.acquire();
PipelineID pipelineId1 = PipelineID.randomId();
UUID leaderId1 = UUID.randomUUID();
DatanodeID leaderId1 = DatanodeID.randomID();
PipelineID pipelineId2 = PipelineID.randomId();
UUID leaderId2 = UUID.randomUUID();
DatanodeID leaderId2 = DatanodeID.randomID();
PipelineID pipelineId3 = PipelineID.randomId();

metrics.recordWriteChunk(createPipeline(pipelineId1, leaderId1), 10);
Expand Down Expand Up @@ -103,7 +103,7 @@ public void testAcquireAndRelease() {
assertNotNull(ContainerClientMetrics.acquire());
}

private Pipeline createPipeline(PipelineID piplineId, UUID leaderId) {
private Pipeline createPipeline(PipelineID piplineId, DatanodeID leaderId) {
return Pipeline.newBuilder()
.setId(piplineId)
.setReplicationConfig(mock(ReplicationConfig.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public static DatanodeID of(final UUID id) {
return CACHE.computeIfAbsent(id, DatanodeID::new);
}

public static DatanodeID of(final HddsProtos.UUID uuid) {
return of(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}

/**
* Returns a random DatanodeID.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,6 +41,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
Expand Down Expand Up @@ -78,11 +78,11 @@ public final class Pipeline {
// nodes with ordered distance to client
private final ImmutableList<DatanodeDetails> nodesInOrder;
// Current reported Leader for the pipeline
private UUID leaderId;
private DatanodeID leaderId;
// Timestamp for pipeline upon creation
private Instant creationTimestamp;
// suggested leader id with high priority
private final UUID suggestedLeaderId;
private final DatanodeID suggestedLeaderId;

private final Instant stateEnterTime;

Expand Down Expand Up @@ -163,7 +163,7 @@ public Instant getStateEnterTime() {
*
* @return Suggested LeaderId
*/
public UUID getSuggestedLeaderId() {
public DatanodeID getSuggestedLeaderId() {
return suggestedLeaderId;
}

Expand All @@ -175,18 +175,18 @@ public void setCreationTimestamp(Instant creationTimestamp) {
}

/**
* Return the pipeline leader's UUID.
* Return the pipeline leader's DatanodeID.
*
* @return DatanodeDetails.UUID.
* @return DatanodeDetails.DatanodeID.
*/
public UUID getLeaderId() {
public DatanodeID getLeaderId() {
return leaderId;
}

/**
* Pipeline object, outside of letting leader id to be set, is immutable.
*/
void setLeaderId(UUID leaderId) {
void setLeaderId(DatanodeID leaderId) {
this.leaderId = leaderId;
}

Expand Down Expand Up @@ -256,7 +256,7 @@ public DatanodeDetails getLeaderNode() throws IOException {
}
Optional<DatanodeDetails> datanodeDetails =
nodeStatus.keySet().stream().filter(d ->
d.getUuid().equals(leaderId)).findFirst();
d.getID().equals(leaderId)).findFirst();
if (datanodeDetails.isPresent()) {
return datanodeDetails.get();
} else {
Expand Down Expand Up @@ -389,19 +389,11 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set<DatanodeDet
builder.setFactor(ReplicationConfig.getLegacyFactor(replicationConfig));
}
if (leaderId != null) {
HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
.setMostSigBits(leaderId.getMostSignificantBits())
.setLeastSigBits(leaderId.getLeastSignificantBits())
.build();
builder.setLeaderID128(uuid128);
builder.setLeaderDatanodeID(leaderId.toProto());
}

if (suggestedLeaderId != null) {
HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
.setMostSigBits(suggestedLeaderId.getMostSignificantBits())
.setLeastSigBits(suggestedLeaderId.getLeastSignificantBits())
.build();
builder.setSuggestedLeaderID(uuid128);
builder.setSuggestedLeaderDatanodeID(suggestedLeaderId.toProto());
}

// To save the message size on wire, only transfer the node order based on
Expand Down Expand Up @@ -451,20 +443,23 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline) {
nodes.put(DatanodeDetails.getFromProtoBuf(member), repIndex);
index++;
}
UUID leaderId = null;
if (pipeline.hasLeaderID128()) {
DatanodeID leaderId = null;
if (pipeline.hasLeaderDatanodeID()) {
leaderId = DatanodeID.of(pipeline.getLeaderDatanodeID().getUuid());
} else if (pipeline.hasLeaderID128()) {
HddsProtos.UUID uuid = pipeline.getLeaderID128();
leaderId = new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
leaderId = DatanodeID.of(uuid);
} else if (pipeline.hasLeaderID() &&
StringUtils.isNotEmpty(pipeline.getLeaderID())) {
leaderId = UUID.fromString(pipeline.getLeaderID());
leaderId = DatanodeID.fromUuidString(pipeline.getLeaderID());
}

UUID suggestedLeaderId = null;
if (pipeline.hasSuggestedLeaderID()) {
DatanodeID suggestedLeaderId = null;
if (pipeline.hasSuggestedLeaderDatanodeID()) {
suggestedLeaderId = DatanodeID.of(pipeline.getSuggestedLeaderDatanodeID().getUuid());
} else if (pipeline.hasSuggestedLeaderID()) {
HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID();
suggestedLeaderId =
new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
suggestedLeaderId = DatanodeID.of(uuid);
}

final ReplicationConfig config = ReplicationConfig
Expand Down Expand Up @@ -550,9 +545,9 @@ public static class Builder {
private Map<DatanodeDetails, Long> nodeStatus = null;
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private DatanodeID leaderId = null;
private Instant creationTimestamp = null;
private UUID suggestedLeaderId = null;
private DatanodeID suggestedLeaderId = null;
private Map<DatanodeDetails, Integer> replicaIndexes = ImmutableMap.of();

public Builder() { }
Expand Down Expand Up @@ -593,7 +588,7 @@ public Builder setState(PipelineState state1) {
return this;
}

public Builder setLeaderId(UUID leaderId1) {
public Builder setLeaderId(DatanodeID leaderId1) {
this.leaderId = leaderId1;
return this;
}
Expand Down Expand Up @@ -631,8 +626,8 @@ public Builder setCreateTimestamp(long createTimestamp) {
return this;
}

public Builder setSuggestedLeaderId(UUID uuid) {
this.suggestedLeaderId = uuid;
public Builder setSuggestedLeaderId(DatanodeID dnId) {
this.suggestedLeaderId = dnId;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public static PipelineID valueOf(UUID id) {
return new PipelineID(id);
}

public static PipelineID valueOf(String id) {
return valueOf(UUID.fromString(id));
}

public UUID getId() {
return id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;

Expand Down Expand Up @@ -86,7 +86,7 @@ public static Pipeline createRatisPipeline() {
.setReplicationConfig(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
.setNodes(nodes)
.setLeaderId(UUID.randomUUID())
.setLeaderId(DatanodeID.randomID())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected void setPipelineLeaderId(PipelineReport report,
if (report.getIsLeader() ||
RatisReplicationConfig.hasFactor(pipeline.getReplicationConfig(),
ReplicationFactor.ONE)) {
pipeline.setLeaderId(dn.getUuid());
pipeline.setLeaderId(dn.getID());
metrics.incNumPipelineBytesWritten(pipeline, report.getBytesWritten());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,
.setReplicationConfig(RatisReplicationConfig.getInstance(factor))
.setNodes(dns)
.setSuggestedLeaderId(
suggestedLeader != null ? suggestedLeader.getUuid() : null)
suggestedLeader != null ? suggestedLeader.getID() : null)
.build();

// Send command to datanodes to create pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private Map<DatanodeDetails, Integer> getSuggestedLeaderCount(
try {
Pipeline pipeline = pipelineStateManager.getPipeline(pipelineID);
if (!pipeline.isClosed()
&& dn.getUuid().equals(pipeline.getSuggestedLeaderId())) {
&& dn.getID().equals(pipeline.getSuggestedLeaderId())) {
suggestedLeaderCount.put(dn, suggestedLeaderCount.get(dn) + 1);
}
} catch (PipelineNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private Pipeline createPipeline(ReplicationConfig repConfig, int nodes) {
dns.add(MockDatanodeDetails.randomDatanodeDetails());
}
builder.setNodes(dns);
builder.setLeaderId(dns.get(0).getUuid());
builder.setLeaderId(dns.get(0).getID());
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void markPipelineHealthy(Pipeline pipeline)
for (DatanodeDetails datanodeDetails : pipeline.getNodes()) {
pipeline.reportDatanode(datanodeDetails);
}
pipeline.setLeaderId(pipeline.getFirstNode().getUuid());
pipeline.setLeaderId(pipeline.getFirstNode().getID());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void printDatanodeInfo(DatanodeWithAttributes dna) {
.append('/').append(p.getReplicationConfig().toString())
.append('/').append(p.getType().toString())
.append('/').append(p.getPipelineState().toString()).append('/')
.append(datanode.getUuid().equals(p.getLeaderId()) ?
.append(datanode.getID().equals(p.getLeaderId()) ?
"Leader" : "Follower")
.append(System.getProperty("line.separator")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
Expand Down Expand Up @@ -184,18 +185,18 @@ private XceiverClientRatis createXceiverClient(OzoneConfiguration conf) {
List<DatanodeDetails> datanodes = new ArrayList<>();

datanodes.add(DatanodeDetails.newBuilder()
.setUuid(UUID.fromString(serverId))
.setID(DatanodeID.fromUuidString(serverId))
.setHostName("localhost")
.setIpAddress("127.0.0.1")
.addPort(DatanodeDetails.newPort(Name.RATIS, 9858))
.build());

Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.valueOf(UUID.fromString(pipelineId)))
.setId(PipelineID.valueOf(pipelineId))
.setState(PipelineState.OPEN)
.setReplicationConfig(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
.setLeaderId(UUID.fromString(serverId))
.setLeaderId(DatanodeID.fromUuidString(serverId))
.setNodes(datanodes)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testLeaderIdAfterLeaderChange() throws Exception {
Pipeline ratisPipeline = optional.get();
Optional<HddsDatanodeService> dnToStop =
cluster.getHddsDatanodes().stream().filter(s ->
!s.getDatanodeStateMachine().getDatanodeDetails().getUuid().equals(
!s.getDatanodeStateMachine().getDatanodeDetails().getID().equals(
ratisPipeline.getLeaderId())).findAny();
assertTrue(dnToStop.isPresent());
dnToStop.get().stop();
Expand All @@ -145,7 +145,7 @@ public void testLeaderIdAfterLeaderChange() throws Exception {
private boolean verifyLeaderInfo(Pipeline ratisPipeline) throws Exception {
Optional<HddsDatanodeService> hddsDatanodeService =
cluster.getHddsDatanodes().stream().filter(s ->
s.getDatanodeStateMachine().getDatanodeDetails().getUuid()
s.getDatanodeStateMachine().getDatanodeDetails().getID()
.equals(ratisPipeline.getLeaderId())).findFirst();
assertTrue(hddsDatanodeService.isPresent());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand Down Expand Up @@ -85,9 +85,9 @@ private void checkLeaderBalance(int dnNum, int leaderNumOfEachDn)
pipeline.getLeaderId().equals(pipeline.getSuggestedLeaderId()));
}

Map<UUID, Integer> leaderCount = new HashMap<>();
Map<DatanodeID, Integer> leaderCount = new HashMap<>();
for (Pipeline pipeline : pipelines) {
UUID leader = pipeline.getLeaderId();
DatanodeID leader = pipeline.getLeaderId();
if (!leaderCount.containsKey(leader)) {
leaderCount.put(leader, 0);
}
Expand All @@ -96,7 +96,7 @@ private void checkLeaderBalance(int dnNum, int leaderNumOfEachDn)
}

assertEquals(dnNum, leaderCount.size());
for (Map.Entry<UUID, Integer> entry: leaderCount.entrySet()) {
for (Map.Entry<DatanodeID, Integer> entry: leaderCount.entrySet()) {
assertEquals(leaderNumOfEachDn, leaderCount.get(entry.getKey()));
}
}
Expand Down
Loading