diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index f762b4cc70dd..f3137749b9fe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.protocol; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -487,11 +488,24 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() { } public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) { - return toProtoBuilder(clientVersion).build(); + return toProtoBuilder(clientVersion, Collections.emptySet()).build(); } + public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, Set filterPorts) { + return toProtoBuilder(clientVersion, filterPorts).build(); + } + + /** + * Converts the current DatanodeDetails instance into a proto {@link HddsProtos.DatanodeDetailsProto.Builder} object. + * + * @param clientVersion - The client version. + * @param filterPorts - A set of {@link Port.Name} specifying ports to include. + * If empty, all available ports will be included. + * @return A {@link HddsProtos.DatanodeDetailsProto.Builder} Object. + */ + public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( - int clientVersion) { + int clientVersion, Set filterPorts) { HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() .setMostSigBits(uuid.getMostSignificantBits()) @@ -530,15 +544,25 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( final boolean handlesUnknownPorts = ClientVersion.fromProtoValue(clientVersion) .compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0; + final int requestedPortCount = filterPorts.size(); + final boolean maySkip = requestedPortCount > 0; for (Port port : ports) { - if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) { + if (maySkip && !filterPorts.contains(port.getName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip adding {} port {} to proto message", + port.getName(), port.getValue()); + } + } else if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) { builder.addPorts(port.toProto()); } else { if (LOG.isDebugEnabled()) { LOG.debug("Skip adding {} port {} to proto message for client v{}", - port.getName(), port.getValue(), clientVersion); + port.getName(), port.getValue(), clientVersion); } } + if (maySkip && builder.getPortsCount() == requestedPortCount) { + break; + } } builder.setCurrentVersion(currentVersion); @@ -960,6 +984,9 @@ public enum Name { Name.values()); public static final Set V0_PORTS = ImmutableSet.copyOf( EnumSet.of(STANDALONE, RATIS, REST)); + + public static final Set IO_PORTS = ImmutableSet.copyOf( + EnumSet.of(STANDALONE, RATIS, RATIS_DATASTREAM)); } private final Name name; @@ -1109,7 +1136,7 @@ public void setRevision(String rev) { public HddsProtos.NetworkNode toProtobuf( int clientVersion) { return HddsProtos.NetworkNode.newBuilder() - .setDatanodeDetails(toProtoBuilder(clientVersion).build()) + .setDatanodeDetails(toProtoBuilder(clientVersion, Collections.emptySet()).build()) .build(); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java index ac72dc942245..f4c9a5dbda9b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java @@ -21,6 +21,7 @@ import java.util.Comparator; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -61,7 +62,7 @@ public HddsProtos.ContainerWithPipeline getProtobuf(int clientVersion) HddsProtos.ContainerWithPipeline.Builder builder = HddsProtos.ContainerWithPipeline.newBuilder(); builder.setContainerInfo(getContainerInfo().getProtobuf()) - .setPipeline(getPipeline().getProtobufMessage(clientVersion)); + .setPipeline(getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS)); return builder.build(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 6c5b4aff57f6..6e72537367c1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -362,12 +362,17 @@ public ReplicationConfig getReplicationConfig() { public HddsProtos.Pipeline getProtobufMessage(int clientVersion) throws UnknownPipelineStateException { + return getProtobufMessage(clientVersion, Collections.emptySet()); + } + + public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set filterPorts) + throws UnknownPipelineStateException { List members = new ArrayList<>(); List memberReplicaIndexes = new ArrayList<>(); for (DatanodeDetails dn : nodeStatus.keySet()) { - members.add(dn.toProto(clientVersion)); + members.add(dn.toProto(clientVersion, filterPorts)); memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0)); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java index aeb1e207e704..78465fd2816c 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hdds.protocol; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hdds.DatanodeVersion; import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.junit.jupiter.api.Test; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS; import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS; @@ -48,21 +52,36 @@ void protoIncludesNewPortsOnlyForV1() { subject.toProto(VERSION_HANDLES_UNKNOWN_DN_PORTS.toProtoValue()); assertPorts(protoV1, ALL_PORTS); } + @Test + void testRequiredPortsProto() { + DatanodeDetails subject = MockDatanodeDetails.randomDatanodeDetails(); + Set requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS) + .collect(Collectors.toSet()); + HddsProtos.DatanodeDetailsProto proto = + subject.toProto(subject.getCurrentVersion(), requiredPorts); + assertPorts(proto, ImmutableSet.copyOf(requiredPorts)); + + HddsProtos.DatanodeDetailsProto ioPortProto = + subject.toProto(subject.getCurrentVersion(), Name.IO_PORTS); + assertPorts(ioPortProto, ImmutableSet.copyOf(Name.IO_PORTS)); + } @Test public void testNewBuilderCurrentVersion() { // test that if the current version is not set (Ozone 1.4.0 and earlier), // it falls back to SEPARATE_RATIS_PORTS_AVAILABLE DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); + Set requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS) + .collect(Collectors.toSet()); HddsProtos.DatanodeDetailsProto.Builder protoBuilder = - dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue()); + dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts); protoBuilder.clearCurrentVersion(); DatanodeDetails dn2 = DatanodeDetails.newBuilder(protoBuilder.build()).build(); assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), dn2.getCurrentVersion()); // test that if the current version is set, it is used protoBuilder = - dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue()); + dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts); DatanodeDetails dn3 = DatanodeDetails.newBuilder(protoBuilder.build()).build(); assertEquals(DatanodeVersion.CURRENT.toProtoValue(), dn3.getCurrentVersion()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index e77e2aebb31f..c1431845ce14 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse; @@ -216,7 +217,7 @@ public AllocateScmBlockResponseProto allocateScmBlock( for (AllocatedBlock block : allocatedBlocks) { builder.addBlocks(AllocateBlockResponse.newBuilder() .setContainerBlockID(block.getBlockID().getProtobuf()) - .setPipeline(block.getPipeline().getProtobufMessage(clientVersion))); + .setPipeline(block.getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS))); } return builder.build(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index bd5046bfc0bf..f7394729898c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -409,9 +409,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) // Do some transactions so that the log index increases List firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer, - 80); + 100); - SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80"); + SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100"); followerOM.getConfiguration().setInt( OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, KeyManagerImpl.DISABLE_VALUE); @@ -424,9 +424,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) }, 1000, 30_000); // Get two incremental tarballs, adding new keys/snapshot for each. - IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM, + IncrementData firstIncrement = getNextIncrementalTarball(200, 2, leaderOM, leaderRatisServer, faultInjector, followerOM, tempDir); - IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM, + IncrementData secondIncrement = getNextIncrementalTarball(300, 3, leaderOM, leaderRatisServer, faultInjector, followerOM, tempDir); // Resume the follower thread, it would download the incremental snapshot. @@ -501,10 +501,10 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) assertNotNull(filesInCandidate); assertEquals(0, filesInCandidate.length); - checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2); - checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(), + checkSnapshot(leaderOM, followerOM, "snap100", firstKeys, snapshotInfo2); + checkSnapshot(leaderOM, followerOM, "snap200", firstIncrement.getKeys(), firstIncrement.getSnapshotInfo()); - checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(), + checkSnapshot(leaderOM, followerOM, "snap300", secondIncrement.getKeys(), secondIncrement.getSnapshotInfo()); assertEquals( followerOM.getOmSnapshotProvider().getInitCount(), 2, @@ -618,7 +618,7 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception { // Do some transactions so that the log index increases List firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer, - 80); + 100); // Start the inactive OM. Checkpoint installation will happen spontaneously. cluster.startInactiveOM(followerNodeId);