Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ public static DatanodeDetails.Builder newBuilder(
builder.setPersistedOpStateExpiry(
datanodeDetailsProto.getPersistedOpStateExpiry());
}
if (datanodeDetailsProto.hasCurrentVersion()) {
builder.setCurrentVersion(datanodeDetailsProto.getCurrentVersion());
}
return builder;
}

Expand Down Expand Up @@ -475,6 +478,8 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
}
}

builder.setCurrentVersion(currentVersion);

return builder;
}

Expand Down Expand Up @@ -505,6 +510,7 @@ public ExtendedDatanodeDetailsProto getExtendedProtoBufMessage() {
}

/**
* Note: Datanode initial version is not passed to the client due to no use case. See HDDS-9884
* @return the version this datanode was initially created with
*/
public int getInitialVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ public void start() {
datanodeDetails.setRevision(
HddsVersionInfo.HDDS_VERSION_INFO.getRevision());
datanodeDetails.setBuildDate(HddsVersionInfo.HDDS_VERSION_INFO.getDate());
datanodeDetails.setCurrentVersion(DatanodeVersion.CURRENT_VERSION);
TracingUtil.initTracing(
"HddsDatanodeService." + datanodeDetails.getUuidString()
.substring(0, 8), conf);
Expand Down Expand Up @@ -424,17 +423,19 @@ private DatanodeDetails initializeDatanodeDetails()
String idFilePath = HddsServerUtil.getDatanodeIdFilePath(conf);
Preconditions.checkNotNull(idFilePath);
File idFile = new File(idFilePath);
DatanodeDetails details;
if (idFile.exists()) {
return ContainerUtils.readDatanodeDetailsFrom(idFile);
details = ContainerUtils.readDatanodeDetailsFrom(idFile);
// Current version is always overridden to the latest
details.setCurrentVersion(getDefaultCurrentVersion());
} else {
// There is no datanode.id file, this might be the first time datanode
// is started.
DatanodeDetails details = DatanodeDetails.newBuilder()
.setUuid(UUID.randomUUID()).build();
details.setInitialVersion(DatanodeVersion.CURRENT_VERSION);
details.setCurrentVersion(DatanodeVersion.CURRENT_VERSION);
return details;
details = DatanodeDetails.newBuilder().setUuid(UUID.randomUUID()).build();
details.setInitialVersion(getDefaultInitialVersion());
details.setCurrentVersion(getDefaultCurrentVersion());
}
return details;
}

/**
Expand Down Expand Up @@ -678,4 +679,20 @@ private String reconfigReplicationStreamsLimit(String value) {
.setPoolSize(Integer.parseInt(value));
return value;
}

/**
* Returns the initial version of the datanode.
*/
@VisibleForTesting
public static int getDefaultInitialVersion() {
return DatanodeVersion.CURRENT_VERSION;
}

/**
* Returns the current version of the datanode.
*/
@VisibleForTesting
public static int getDefaultCurrentVersion() {
return DatanodeVersion.CURRENT_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private DatanodeIdYaml() {
}

/**
* Creates a yaml file using DatnodeDetails. This method expects the path
* Creates a yaml file using DatanodeDetails. This method expects the path
* validation to be performed by the caller.
*
* @param datanodeDetails {@link DatanodeDetails}
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message DatanodeDetailsProto {
optional string networkLocation = 7; // Network topology location
optional NodeOperationalState persistedOpState = 8; // The Operational state persisted in the datanode.id file
optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch when the OpState should expire
optional int32 currentVersion = 10; // Current datanode wire version
// TODO(runzhiwang): when uuid is gone, specify 1 as the index of uuid128 and mark as required
optional UUID uuid128 = 100; // UUID with 128 bits assigned to the Datanode.
}
Expand Down
8 changes: 8 additions & 0 deletions hadoop-ozone/fault-injection-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<module>mini-chaos-tests</module>
</modules>

<dependencies>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -321,15 +322,20 @@ abstract class Builder {
protected Optional<Integer> hbProcessorInterval = Optional.empty();
protected String scmId = UUID.randomUUID().toString();
protected String omId = UUID.randomUUID().toString();

protected Optional<String> datanodeReservedSpace = Optional.empty();
protected boolean includeRecon = false;


protected Optional<Integer> omLayoutVersion = Optional.empty();
protected Optional<Integer> scmLayoutVersion = Optional.empty();
protected Optional<Integer> dnLayoutVersion = Optional.empty();

protected int dnInitialVersion = DatanodeVersion.FUTURE_VERSION.toProtoValue();
protected int dnCurrentVersion = DatanodeVersion.FUTURE_VERSION.toProtoValue();

// Use relative smaller number of handlers for testing
protected int numOfOmHandlers = 20;
protected int numOfScmHandlers = 20;
protected int numOfDatanodes = 3;
protected int numDataVolumes = 1;
protected boolean startDataNodes = true;
Expand Down Expand Up @@ -412,6 +418,30 @@ public Builder setNumDatanodes(int val) {
return this;
}

/**
* Set the initialVersion for all datanodes.
*
* @param val initialVersion value to be set for all datanodes.
*
* @return MiniOzoneCluster.Builder
*/
public Builder setDatanodeInitialVersion(int val) {
dnInitialVersion = val;
return this;
}

/**
* Set the currentVersion for all datanodes.
*
* @param val currentVersion value to be set for all datanodes.
*
* @return MiniOzoneCluster.Builder
*/
public Builder setDatanodeCurrentVersion(int val) {
dnCurrentVersion = val;
return this;
}

/**
* Sets the number of data volumes per datanode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
Expand Down Expand Up @@ -102,6 +103,8 @@
import static org.apache.ozone.test.GenericTestUtils.PortAllocator.localhostWithFreePort;

import org.hadoop.ozone.recon.codegen.ReconSqlDbConfig;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,12 +135,14 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
private CertificateClient caClient;
private final Set<AutoCloseable> clients = ConcurrentHashMap.newKeySet();
private SecretKeyClient secretKeyClient;
private static MockedStatic mockDNStatic = Mockito.mockStatic(HddsDatanodeService.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doc https://www.baeldung.com/mockito-mock-static-methods
recommends to enclose MockedStatic in a try block to avoid side effects that impacts other tests.

I think it's fine for this case though because we usually just have one mini cluster.


/**
* Creates a new MiniOzoneCluster with Recon.
*
* @throws IOException if there is an I/O error
*/
@SuppressWarnings("checkstyle:ParameterNumber")
MiniOzoneClusterImpl(OzoneConfiguration conf,
SCMConfigurator scmConfigurator,
OzoneManager ozoneManager,
Expand Down Expand Up @@ -396,6 +401,16 @@ private void waitForHddsDatanodeToStop(DatanodeDetails dn)
}, 1000, waitForClusterToBeReadyTimeout);
}

private static void overrideDatanodeVersions(int dnInitialVersion, int dnCurrentVersion) {
// FUTURE_VERSION (-1) is not a valid version for a datanode, using it as a marker when version is not overridden
if (dnInitialVersion != DatanodeVersion.FUTURE_VERSION.toProtoValue()) {
mockDNStatic.when(HddsDatanodeService::getDefaultInitialVersion).thenReturn(dnInitialVersion);
}
if (dnCurrentVersion != DatanodeVersion.FUTURE_VERSION.toProtoValue()) {
mockDNStatic.when(HddsDatanodeService::getDefaultCurrentVersion).thenReturn(dnCurrentVersion);
}
}

@Override
public void restartHddsDatanode(int i, boolean waitForDatanode)
throws InterruptedException, TimeoutException {
Expand Down Expand Up @@ -782,10 +797,14 @@ protected List<HddsDatanodeService> createHddsDatanodes(
String[] args = new String[] {};
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();

// Override default datanode initial and current version if necessary
overrideDatanodeVersions(dnInitialVersion, dnCurrentVersion);

for (int i = 0; i < numOfDatanodes; i++) {
OzoneConfiguration dnConf = new OzoneConfiguration(conf);
configureDatanodePorts(dnConf);
String datanodeBaseDir = path + "/datanode-" + Integer.toString(i);
String datanodeBaseDir = path + "/datanode-" + i;
Path metaDir = Paths.get(datanodeBaseDir, "meta");
List<String> dataDirs = new ArrayList<>();
List<String> reservedSpaceList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.hadoop.ozone.client.rpc;

import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
Expand All @@ -28,6 +30,7 @@
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
Expand All @@ -45,6 +48,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand All @@ -70,6 +74,7 @@ public class TestBlockDataStreamOutput {
private static String volumeName;
private static String bucketName;
private static String keyString;
private static final int DN_OLD_VERSION = DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue();

/**
* Create a MiniDFSCluster for testing.
Expand Down Expand Up @@ -105,6 +110,7 @@ public static void init() throws Exception {

cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5)
.setDatanodeCurrentVersion(DN_OLD_VERSION)
.setTotalPipelineNumLimit(3)
.build();
cluster.waitForClusterToBeReady();
Expand Down Expand Up @@ -270,4 +276,25 @@ public void testTotalAckDataLength() throws Exception {
assertEquals(dataLength, stream.getTotalAckDataLength());
}

@Test
public void testDatanodeVersion() throws Exception {
// Verify all DNs internally have versions set correctly
List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
for (HddsDatanodeService dn : dns) {
DatanodeDetails details = dn.getDatanodeDetails();
assertEquals(DN_OLD_VERSION, details.getCurrentVersion());
}

String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput) key.getByteBufStreamOutput();
BlockDataStreamOutputEntry stream = keyDataStreamOutput.getStreamEntries().get(0);

// Now check 3 DNs in a random pipeline returns the correct DN versions
List<DatanodeDetails> streamDnDetails = stream.getPipeline().getNodes();
for (DatanodeDetails details : streamDnDetails) {
assertEquals(DN_OLD_VERSION, details.getCurrentVersion());
}
}

}
Loading