Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
Expand Down Expand Up @@ -174,8 +173,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) throws IOE
Pipeline pipeline = Pipeline.newBuilder()
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocation))
.setId(PipelineID.valueOf(dataLocation.getUuid()))
.setNodes(Collections.singletonList(dataLocation))
.setId(dataLocation.getID())
.setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
Expand Down Expand Up @@ -106,7 +105,7 @@ public void testMissingStripeChecksumDoesNotMakeExecutePutBlockFailDuringECRecon
BlockID blockID = new BlockID(1, 1);
DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails();
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.valueOf(datanodeDetails.getUuid()))
.setId(datanodeDetails.getID())
.setReplicationConfig(replicationConfig)
.setNodes(ImmutableList.of(datanodeDetails))
.setState(Pipeline.PipelineState.CLOSED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeIDProto;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.util.StringWithByteString;

/**
Expand Down Expand Up @@ -78,6 +79,10 @@ public ByteString getByteString() {
return uuidByteString.getBytes();
}

public PipelineID toPipelineID() {
return PipelineID.valueOf(uuid);
}

public DatanodeIDProto toProto() {
return DatanodeIDProto.newBuilder().setUuid(toProto(uuid)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ public Builder(Pipeline pipeline) {
}
}

public Builder setId(DatanodeID datanodeID) {
this.id = datanodeID.toPipelineID();
return this;
}

public Builder setId(PipelineID id1) {
this.id = id1;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.UuidCodec;
import org.apache.ratis.util.MemoizedSupplier;

/**
* ID for the pipeline, the ID is based on UUID.
Expand All @@ -35,13 +37,15 @@ public final class PipelineID {
PipelineID.class, DelegatedCodec.CopyType.SHALLOW);

private final UUID id;
private final Supplier<HddsProtos.PipelineID> protoSupplier;

public static Codec<PipelineID> getCodec() {
return CODEC;
}

private PipelineID(UUID id) {
this.id = id;
this.protoSupplier = MemoizedSupplier.valueOf(() -> buildProtobuf(id));
}

public static PipelineID randomId() {
Expand All @@ -62,6 +66,10 @@ public UUID getId() {

@JsonIgnore
public HddsProtos.PipelineID getProtobuf() {
return protoSupplier.get();
}

static HddsProtos.PipelineID buildProtobuf(UUID id) {
HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
.setMostSigBits(id.getMostSignificantBits())
.setLeastSigBits(id.getLeastSignificantBits())
Expand All @@ -86,7 +94,7 @@ public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) {

@Override
public String toString() {
return "PipelineID=" + id.toString();
return "Pipeline-" + id;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -210,15 +209,14 @@ private boolean isDeletionAllowed(ContainerData containerData,
// TODO: currently EC container goes through this path.
return true;
}
UUID pipelineUUID;
final PipelineID pipelineID;
try {
pipelineUUID = UUID.fromString(originPipelineId);
pipelineID = PipelineID.valueOf(originPipelineId);
} catch (IllegalArgumentException e) {
LOG.warn("Invalid pipelineID {} for container {}",
originPipelineId, containerData.getContainerID());
return false;
}
PipelineID pipelineID = PipelineID.valueOf(pipelineUUID);
// in case the ratis group does not exist, just mark it for deletion.
if (!ratisServer.isExist(pipelineID.getProtobuf())) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
final RaftGroup group =
RatisHelper.newRaftGroup(groupId, peers, priorityList);
server.addGroup(pipelineIdProto, peers, priorityList);
peers.stream().filter(
d -> !d.getUuid().equals(dn.getUuid()))
peers.stream().filter(d -> !d.getID().equals(dn.getID()))
.forEach(d -> {
final RaftPeer peer = RatisHelper.toRaftPeer(d);
try (RaftClient client = newRaftClient.apply(peer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,7 @@ private void processCmd(DeleteCmdInfo cmd) {

ContainerBlocksDeletionACKProto.Builder resultBuilder =
ContainerBlocksDeletionACKProto.newBuilder().addAllResults(results);
resultBuilder.setDnId(cmd.getContext().getParent().getDatanodeDetails()
.getUuid().toString());
resultBuilder.setDnId(cmd.getContext().getParent().getDatanodeDetails().getUuidString());
blockDeletionACK = resultBuilder.build();

// Send ACK back to SCM as long as meta updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand All @@ -36,6 +34,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -129,15 +128,10 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
.register(datanodeDetails.getExtendedProtoBufMessage(),
nodeReport, containerReport, pipelineReportsProto, layoutInfo);
Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
.equals(datanodeDetails.getUuid()),
"Unexpected datanode ID in the response.");
Preconditions.checkState(!StringUtils.isBlank(response.getClusterID()),
Preconditions.assertEquals(datanodeDetails.getUuidString(), response.getDatanodeUUID(), "datanodeID");
Preconditions.assertTrue(!StringUtils.isBlank(response.getClusterID()),
"Invalid cluster ID in the response.");
Preconditions.checkState(response.getErrorCode() == success,
"DataNode has different Software Layout Version" +
" than SCM or RECON. EndPoint address is: " +
rpcEndPoint.getAddressString());
Preconditions.assertSame(success, response.getErrorCode(), "ErrorCode");
if (response.hasHostname() && response.hasIpAddress()) {
datanodeDetails.setHostName(response.getHostname());
datanodeDetails.setIpAddress(response.getIpAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@
import java.net.BindException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
Expand Down Expand Up @@ -71,7 +70,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
private static final Logger
LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
private int port;
private UUID id;
private DatanodeID id;
private Server server;
private final ContainerDispatcher storageContainer;
private boolean isStarted;
Expand All @@ -90,7 +89,7 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails,
ContainerDispatcher dispatcher, CertificateClient caClient) {
Preconditions.checkNotNull(conf);

this.id = datanodeDetails.getUuid();
this.id = datanodeDetails.getID();
this.datanodeDetails = datanodeDetails;
this.port = conf.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
Expand Down Expand Up @@ -237,14 +236,13 @@ public void submitRequest(ContainerCommandRequestProto request,

@Override
public boolean isExist(HddsProtos.PipelineID pipelineId) {
return PipelineID.valueOf(id).getProtobuf().equals(pipelineId);
return id.toPipelineID().getProtobuf().equals(pipelineId);
}

@Override
public List<PipelineReport> getPipelineReport() {
return Collections.singletonList(
PipelineReport.newBuilder()
.setPipelineID(PipelineID.valueOf(id).getProtobuf())
.build());
return Collections.singletonList(PipelineReport.newBuilder()
.setPipelineID(id.toPipelineID().getProtobuf())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import jakarta.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.collections.map.SingletonMap;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -34,7 +34,6 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
Expand Down Expand Up @@ -181,10 +180,10 @@ Pipeline singleNodePipeline(DatanodeDetails dn,
// To get the same client from cache, we try to use the DN UUID as
// pipelineID for uniqueness. Please note, pipeline does not have any
// significance after it's close. So, we are ok to use any ID.
return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
return Pipeline.newBuilder().setId(dn.getID())
.setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
.setState(Pipeline.PipelineState.CLOSED)
.setReplicaIndexes(new SingletonMap(dn, replicaIndex)).build();
.setReplicaIndexes(Collections.singletonMap(dn, replicaIndex)).build();
}

public XceiverClientManager getXceiverClientManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ void testWriteReadBeforeRatisDatastreamPortLayoutVersion(@TempDir File dir)
void testWriteReadAfterRatisDatastreamPortLayoutVersion(@TempDir File dir)
throws IOException {
DatanodeDetails original = MockDatanodeDetails.randomDatanodeDetails();
assertEquals(original.getUuid().toString(), original.getUuidString());

File file = new File(dir, "datanode.yaml");
OzoneConfiguration conf = new OzoneConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.Test;
Expand All @@ -37,7 +37,7 @@
* This unit test is not enabled (doesn't start with Test) but can be used
* to validate changes manually.
*/
public class ReplicationSupervisorScheduling {
public class ReplicationSupervisorSchedulingBenchmark {

private final Random random = new Random();

Expand All @@ -50,16 +50,16 @@ public void test() throws InterruptedException {
//locks representing the limited resource of remote and local disks

//datanode -> disk -> lock object (remote resources)
Map<UUID, Map<Integer, Object>> volumeLocks = new HashMap<>();
final Map<DatanodeID, Map<Integer, Object>> volumeLocks = new HashMap<>();

//disk -> lock (local resources)
Map<Integer, Object> destinationLocks = new HashMap<>();

//init the locks
for (DatanodeDetails datanode : datanodes) {
volumeLocks.put(datanode.getUuid(), new HashMap<>());
volumeLocks.put(datanode.getID(), new HashMap<>());
for (int i = 0; i < 10; i++) {
volumeLocks.get(datanode.getUuid()).put(i, new Object());
volumeLocks.get(datanode.getID()).put(i, new Object());
}
}

Expand All @@ -75,15 +75,14 @@ public void test() throws InterruptedException {
task.getSources().get(random.nextInt(task.getSources().size()));

final Map<Integer, Object> volumes =
volumeLocks.get(sourceDatanode.getUuid());
volumeLocks.get(sourceDatanode.getID());
Object volumeLock = volumes.get(random.nextInt(volumes.size()));
synchronized (volumeLock) {
System.out.println("Downloading " + task.getContainerId() + " from "
+ sourceDatanode.getUuid());
System.out.println("Downloading " + task.getContainerId() + " from " + sourceDatanode);
try {
volumeLock.wait(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
throw new IllegalStateException(ex);
}
}

Expand All @@ -98,7 +97,7 @@ public void test() throws InterruptedException {
try {
destinationLock.wait(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
throw new IllegalStateException(ex);
}
}
};
Expand Down