From 73c293e42aa99d5bbaed4c8aab9e9be7c87539d0 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 9 May 2024 18:37:09 +0800 Subject: [PATCH 1/6] Make SCM only send io ports in repsonse to AllocateBlock and getContainerWithPipeline requests --- .../hadoop/hdds/protocol/DatanodeDetails.java | 31 +++++++++++++------ .../common/helpers/ContainerWithPipeline.java | 3 +- .../hadoop/hdds/scm/pipeline/Pipeline.java | 7 ++++- .../hdds/protocol/TestDatanodeDetails.java | 17 ++++++++++ ...ocationProtocolServerSideTranslatorPB.java | 3 +- 5 files changed, 49 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index b455daba529f..4a0c36996f3d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.protocol; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -411,11 +412,15 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() { } public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) { - return toProtoBuilder(clientVersion).build(); + return toProtoBuilder(clientVersion, Collections.emptySet()).build(); + } + + public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, Set requiredPorts) { + return toProtoBuilder(clientVersion, requiredPorts).build(); } public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( - int clientVersion) { + int clientVersion, Set requiredPorts) { HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() .setMostSigBits(uuid.getMostSignificantBits()) @@ -455,14 +460,19 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( ClientVersion.fromProtoValue(clientVersion) .compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0; for (Port port : ports) { - if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) { - builder.addPorts(port.toProto()); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip adding {} port {} to proto message for client v{}", - port.getName(), port.getValue(), clientVersion); + if (requiredPorts.isEmpty() || requiredPorts.contains(port.name)) { + if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) { + builder.addPorts(port.toProto()); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip adding {} port {} to proto message for client v{}", + port.getName(), port.getValue(), clientVersion); + } } } + if (!requiredPorts.isEmpty() && builder.getPortsCount() == requiredPorts.size()) { + break; + } } return builder; @@ -836,6 +846,9 @@ public enum Name { Name.values()); public static final Set V0_PORTS = ImmutableSet.copyOf( EnumSet.of(STANDALONE, RATIS, REST)); + + public static final Set IO_PORTS = ImmutableSet.copyOf( + EnumSet.of(STANDALONE, RATIS, REST)); } private final Name name; @@ -1003,7 +1016,7 @@ public void setBuildDate(String date) { public HddsProtos.NetworkNode toProtobuf( int clientVersion) { return HddsProtos.NetworkNode.newBuilder() - .setDatanodeDetails(toProtoBuilder(clientVersion).build()) + .setDatanodeDetails(toProtoBuilder(clientVersion, Collections.emptySet()).build()) .build(); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java index ac72dc942245..f4c9a5dbda9b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java @@ -21,6 +21,7 @@ import java.util.Comparator; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -61,7 +62,7 @@ public HddsProtos.ContainerWithPipeline getProtobuf(int clientVersion) HddsProtos.ContainerWithPipeline.Builder builder = HddsProtos.ContainerWithPipeline.newBuilder(); builder.setContainerInfo(getContainerInfo().getProtobuf()) - .setPipeline(getPipeline().getProtobufMessage(clientVersion)); + .setPipeline(getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS)); return builder.build(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 05d83a8b8b56..e04ad80c7602 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -353,12 +353,17 @@ public ReplicationConfig getReplicationConfig() { public HddsProtos.Pipeline getProtobufMessage(int clientVersion) throws UnknownPipelineStateException { + return getProtobufMessage(clientVersion, Collections.emptySet()); + } + + public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set requiredPorts) + throws UnknownPipelineStateException { List members = new ArrayList<>(); List memberReplicaIndexes = new ArrayList<>(); for (DatanodeDetails dn : nodeStatus.keySet()) { - members.add(dn.toProto(clientVersion)); + members.add(dn.toProto(clientVersion, requiredPorts)); memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0)); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java index b05deaa0d668..b28000aea31d 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hdds.protocol; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.junit.jupiter.api.Test; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS; import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS; @@ -47,6 +51,19 @@ void protoIncludesNewPortsOnlyForV1() { subject.toProto(VERSION_HANDLES_UNKNOWN_DN_PORTS.toProtoValue()); assertPorts(protoV1, ALL_PORTS); } + @Test + void testRequiredPortsProto() { + DatanodeDetails subject = MockDatanodeDetails.randomDatanodeDetails(); + Set requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS) + .collect(Collectors.toSet()); + HddsProtos.DatanodeDetailsProto proto = + subject.toProto(subject.getCurrentVersion(), requiredPorts); + assertPorts(proto, ImmutableSet.copyOf(requiredPorts)); + + HddsProtos.DatanodeDetailsProto ioPortProto = + subject.toProto(subject.getCurrentVersion(), Name.IO_PORTS); + assertPorts(ioPortProto, ImmutableSet.copyOf(Name.IO_PORTS)); + } public static void assertPorts(HddsProtos.DatanodeDetailsProto dn, Set expectedPorts) throws IllegalArgumentException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index e77e2aebb31f..c1431845ce14 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse; @@ -216,7 +217,7 @@ public AllocateScmBlockResponseProto allocateScmBlock( for (AllocatedBlock block : allocatedBlocks) { builder.addBlocks(AllocateBlockResponse.newBuilder() .setContainerBlockID(block.getBlockID().getProtobuf()) - .setPipeline(block.getPipeline().getProtobufMessage(clientVersion))); + .setPipeline(block.getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS))); } return builder.build(); From 2e46c352ad379f06e976f932ef1cc187c4d8c852 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 10 May 2024 00:07:17 +0800 Subject: [PATCH 2/6] Fix test --- .../java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 4a0c36996f3d..5213dac44de7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -848,7 +848,7 @@ public enum Name { EnumSet.of(STANDALONE, RATIS, REST)); public static final Set IO_PORTS = ImmutableSet.copyOf( - EnumSet.of(STANDALONE, RATIS, REST)); + EnumSet.of(STANDALONE, RATIS, RATIS_DATASTREAM)); } private final Name name; From f25b8ae539f7c5f2b7805eb7c7cdf693ef4e2beb Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 10 May 2024 20:09:43 +0800 Subject: [PATCH 3/6] fix test --- .../java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index 42708b0b1607..70ec23153aae 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -409,7 +409,7 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) // Do some transactions so that the log index increases List firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer, - 80); + 100); SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80"); followerOM.getConfiguration().setInt( @@ -618,7 +618,7 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception { // Do some transactions so that the log index increases List firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer, - 80); + 100); // Start the inactive OM. Checkpoint installation will happen spontaneously. cluster.startInactiveOM(followerNodeId); From 6d12c1cb46ed650631529a053824be3a7d07d41a Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 10 May 2024 22:25:11 +0800 Subject: [PATCH 4/6] fix test --- .../apache/hadoop/ozone/om/TestOMRatisSnapshots.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index 70ec23153aae..df000a2c6a26 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -411,7 +411,7 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) List firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer, 100); - SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80"); + SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100"); followerOM.getConfiguration().setInt( OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, KeyManagerImpl.DISABLE_VALUE); @@ -424,9 +424,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) }, 1000, 30_000); // Get two incremental tarballs, adding new keys/snapshot for each. - IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM, + IncrementData firstIncrement = getNextIncrementalTarball(200, 2, leaderOM, leaderRatisServer, faultInjector, followerOM, tempDir); - IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM, + IncrementData secondIncrement = getNextIncrementalTarball(300, 3, leaderOM, leaderRatisServer, faultInjector, followerOM, tempDir); // Resume the follower thread, it would download the incremental snapshot. @@ -501,10 +501,10 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) assertNotNull(filesInCandidate); assertEquals(0, filesInCandidate.length); - checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2); - checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(), + checkSnapshot(leaderOM, followerOM, "snap100", firstKeys, snapshotInfo2); + checkSnapshot(leaderOM, followerOM, "snap200", firstIncrement.getKeys(), firstIncrement.getSnapshotInfo()); - checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(), + checkSnapshot(leaderOM, followerOM, "snap300", secondIncrement.getKeys(), secondIncrement.getSnapshotInfo()); assertEquals( followerOM.getOmSnapshotProvider().getInitCount(), 2, From 307f772db46b6c7a9c25888266477b33db8a3c10 Mon Sep 17 00:00:00 2001 From: "pony.chen" Date: Thu, 3 Oct 2024 02:32:08 +0800 Subject: [PATCH 5/6] Fix Conflicts --- .../apache/hadoop/hdds/protocol/TestDatanodeDetails.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java index 726407d6836a..78465fd2816c 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java @@ -71,15 +71,17 @@ public void testNewBuilderCurrentVersion() { // test that if the current version is not set (Ozone 1.4.0 and earlier), // it falls back to SEPARATE_RATIS_PORTS_AVAILABLE DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); + Set requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS) + .collect(Collectors.toSet()); HddsProtos.DatanodeDetailsProto.Builder protoBuilder = - dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue()); + dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts); protoBuilder.clearCurrentVersion(); DatanodeDetails dn2 = DatanodeDetails.newBuilder(protoBuilder.build()).build(); assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), dn2.getCurrentVersion()); // test that if the current version is set, it is used protoBuilder = - dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue()); + dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts); DatanodeDetails dn3 = DatanodeDetails.newBuilder(protoBuilder.build()).build(); assertEquals(DatanodeVersion.CURRENT.toProtoValue(), dn3.getCurrentVersion()); } From f2c1a77896f044240ea45b5845e693cc403e5204 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Sat, 16 Nov 2024 20:04:27 +0800 Subject: [PATCH 6/6] Rename requiredPorts to filterPorts; Refactor ports loop --- .../hadoop/hdds/protocol/DatanodeDetails.java | 38 +++++++++++++------ .../hadoop/hdds/scm/pipeline/Pipeline.java | 4 +- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index ec6f3e4115b3..f3137749b9fe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -491,12 +491,21 @@ public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) { return toProtoBuilder(clientVersion, Collections.emptySet()).build(); } - public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, Set requiredPorts) { - return toProtoBuilder(clientVersion, requiredPorts).build(); + public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, Set filterPorts) { + return toProtoBuilder(clientVersion, filterPorts).build(); } + /** + * Converts the current DatanodeDetails instance into a proto {@link HddsProtos.DatanodeDetailsProto.Builder} object. + * + * @param clientVersion - The client version. + * @param filterPorts - A set of {@link Port.Name} specifying ports to include. + * If empty, all available ports will be included. + * @return A {@link HddsProtos.DatanodeDetailsProto.Builder} Object. + */ + public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( - int clientVersion, Set requiredPorts) { + int clientVersion, Set filterPorts) { HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() .setMostSigBits(uuid.getMostSignificantBits()) @@ -535,18 +544,23 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( final boolean handlesUnknownPorts = ClientVersion.fromProtoValue(clientVersion) .compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0; + final int requestedPortCount = filterPorts.size(); + final boolean maySkip = requestedPortCount > 0; for (Port port : ports) { - if (requiredPorts.isEmpty() || requiredPorts.contains(port.name)) { - if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) { - builder.addPorts(port.toProto()); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip adding {} port {} to proto message for client v{}", - port.getName(), port.getValue(), clientVersion); - } + if (maySkip && !filterPorts.contains(port.getName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip adding {} port {} to proto message", + port.getName(), port.getValue()); + } + } else if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) { + builder.addPorts(port.toProto()); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip adding {} port {} to proto message for client v{}", + port.getName(), port.getValue(), clientVersion); } } - if (!requiredPorts.isEmpty() && builder.getPortsCount() == requiredPorts.size()) { + if (maySkip && builder.getPortsCount() == requestedPortCount) { break; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 08a4abe043ea..6e72537367c1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -365,14 +365,14 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion) return getProtobufMessage(clientVersion, Collections.emptySet()); } - public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set requiredPorts) + public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set filterPorts) throws UnknownPipelineStateException { List members = new ArrayList<>(); List memberReplicaIndexes = new ArrayList<>(); for (DatanodeDetails dn : nodeStatus.keySet()) { - members.add(dn.toProto(clientVersion, requiredPorts)); + members.add(dn.toProto(clientVersion, filterPorts)); memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0)); }