Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.protocol;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -487,11 +488,24 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
}

public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) {
return toProtoBuilder(clientVersion).build();
return toProtoBuilder(clientVersion, Collections.emptySet()).build();
}

public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, Set<Port.Name> filterPorts) {
return toProtoBuilder(clientVersion, filterPorts).build();
}

/**
* Converts the current DatanodeDetails instance into a proto {@link HddsProtos.DatanodeDetailsProto.Builder} object.
*
* @param clientVersion - The client version.
* @param filterPorts - A set of {@link Port.Name} specifying ports to include.
* If empty, all available ports will be included.
* @return A {@link HddsProtos.DatanodeDetailsProto.Builder} Object.
*/

public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
int clientVersion) {
int clientVersion, Set<Port.Name> filterPorts) {

HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
.setMostSigBits(uuid.getMostSignificantBits())
Expand Down Expand Up @@ -530,15 +544,25 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
final boolean handlesUnknownPorts =
ClientVersion.fromProtoValue(clientVersion)
.compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0;
final int requestedPortCount = filterPorts.size();
final boolean maySkip = requestedPortCount > 0;
for (Port port : ports) {
if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) {
if (maySkip && !filterPorts.contains(port.getName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip adding {} port {} to proto message",
port.getName(), port.getValue());
}
} else if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) {
builder.addPorts(port.toProto());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip adding {} port {} to proto message for client v{}",
port.getName(), port.getValue(), clientVersion);
port.getName(), port.getValue(), clientVersion);
}
}
if (maySkip && builder.getPortsCount() == requestedPortCount) {
break;
}
}

builder.setCurrentVersion(currentVersion);
Expand Down Expand Up @@ -960,6 +984,9 @@ public enum Name {
Name.values());
public static final Set<Name> V0_PORTS = ImmutableSet.copyOf(
EnumSet.of(STANDALONE, RATIS, REST));

public static final Set<Name> IO_PORTS = ImmutableSet.copyOf(
EnumSet.of(STANDALONE, RATIS, RATIS_DATASTREAM));
}

private final Name name;
Expand Down Expand Up @@ -1109,7 +1136,7 @@ public void setRevision(String rev) {
public HddsProtos.NetworkNode toProtobuf(
int clientVersion) {
return HddsProtos.NetworkNode.newBuilder()
.setDatanodeDetails(toProtoBuilder(clientVersion).build())
.setDatanodeDetails(toProtoBuilder(clientVersion, Collections.emptySet()).build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Comparator;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -61,7 +62,7 @@ public HddsProtos.ContainerWithPipeline getProtobuf(int clientVersion)
HddsProtos.ContainerWithPipeline.Builder builder =
HddsProtos.ContainerWithPipeline.newBuilder();
builder.setContainerInfo(getContainerInfo().getProtobuf())
.setPipeline(getPipeline().getProtobufMessage(clientVersion));
.setPipeline(getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS));

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,17 @@ public ReplicationConfig getReplicationConfig() {

public HddsProtos.Pipeline getProtobufMessage(int clientVersion)
throws UnknownPipelineStateException {
return getProtobufMessage(clientVersion, Collections.emptySet());
}

public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set<DatanodeDetails.Port.Name> filterPorts)
throws UnknownPipelineStateException {

List<HddsProtos.DatanodeDetailsProto> members = new ArrayList<>();
List<Integer> memberReplicaIndexes = new ArrayList<>();

for (DatanodeDetails dn : nodeStatus.keySet()) {
members.add(dn.toProto(clientVersion));
members.add(dn.toProto(clientVersion, filterPorts));
memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hdds.protocol;

import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS;
Expand All @@ -48,21 +52,36 @@ void protoIncludesNewPortsOnlyForV1() {
subject.toProto(VERSION_HANDLES_UNKNOWN_DN_PORTS.toProtoValue());
assertPorts(protoV1, ALL_PORTS);
}
@Test
void testRequiredPortsProto() {
DatanodeDetails subject = MockDatanodeDetails.randomDatanodeDetails();
Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS)
.collect(Collectors.toSet());
HddsProtos.DatanodeDetailsProto proto =
subject.toProto(subject.getCurrentVersion(), requiredPorts);
assertPorts(proto, ImmutableSet.copyOf(requiredPorts));

HddsProtos.DatanodeDetailsProto ioPortProto =
subject.toProto(subject.getCurrentVersion(), Name.IO_PORTS);
assertPorts(ioPortProto, ImmutableSet.copyOf(Name.IO_PORTS));
}

@Test
public void testNewBuilderCurrentVersion() {
// test that if the current version is not set (Ozone 1.4.0 and earlier),
// it falls back to SEPARATE_RATIS_PORTS_AVAILABLE
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS)
.collect(Collectors.toSet());
HddsProtos.DatanodeDetailsProto.Builder protoBuilder =
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
protoBuilder.clearCurrentVersion();
DatanodeDetails dn2 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), dn2.getCurrentVersion());

// test that if the current version is set, it is used
protoBuilder =
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
DatanodeDetails dn3 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.CURRENT.toProtoValue(), dn3.getCurrentVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
Expand Down Expand Up @@ -216,7 +217,7 @@ public AllocateScmBlockResponseProto allocateScmBlock(
for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
.setPipeline(block.getPipeline().getProtobufMessage(clientVersion)));
.setPipeline(block.getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS)));
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)

// Do some transactions so that the log index increases
List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
80);
100);

SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80");
SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100");
followerOM.getConfiguration().setInt(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
KeyManagerImpl.DISABLE_VALUE);
Expand All @@ -424,9 +424,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
}, 1000, 30_000);

// Get two incremental tarballs, adding new keys/snapshot for each.
IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM,
IncrementData firstIncrement = getNextIncrementalTarball(200, 2, leaderOM,
leaderRatisServer, faultInjector, followerOM, tempDir);
IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM,
IncrementData secondIncrement = getNextIncrementalTarball(300, 3, leaderOM,
leaderRatisServer, faultInjector, followerOM, tempDir);

// Resume the follower thread, it would download the incremental snapshot.
Expand Down Expand Up @@ -501,10 +501,10 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
assertNotNull(filesInCandidate);
assertEquals(0, filesInCandidate.length);

checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2);
checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(),
checkSnapshot(leaderOM, followerOM, "snap100", firstKeys, snapshotInfo2);
checkSnapshot(leaderOM, followerOM, "snap200", firstIncrement.getKeys(),
firstIncrement.getSnapshotInfo());
checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(),
checkSnapshot(leaderOM, followerOM, "snap300", secondIncrement.getKeys(),
secondIncrement.getSnapshotInfo());
assertEquals(
followerOM.getOmSnapshotProvider().getInitCount(), 2,
Expand Down Expand Up @@ -618,7 +618,7 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception {

// Do some transactions so that the log index increases
List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
80);
100);

// Start the inactive OM. Checkpoint installation will happen spontaneously.
cluster.startInactiveOM(followerNodeId);
Expand Down