Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -647,6 +648,20 @@ public boolean equals(Object obj) {
uuid.equals(((DatanodeDetails) obj).uuid);
}


/**
* Checks hostname, ipAddress and port of the 2 nodes are the same.
* @param datanodeDetails dnDetails object to compare with.
* @return true if the values match otherwise false.
*/
public boolean compareNodeValues(DatanodeDetails datanodeDetails) {
if (this == datanodeDetails || super.equals(datanodeDetails)) {
return true;
}
return Objects.equals(ipAddress, datanodeDetails.ipAddress)
&& Objects.equals(hostName, datanodeDetails.hostName) && Objects.equals(ports, datanodeDetails.ports);
}

@Override
public int hashCode() {
return uuid.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,11 @@ public List<DatanodeDetails> getNodesInOrder() {
}

void reportDatanode(DatanodeDetails dn) throws IOException {
if (nodeStatus.get(dn) == null) {
//This is a workaround for the case a datanode restarted with reinitializing it's dnId but it still reports the
// same set of pipelines it was part of. The pipeline report should be accepted for this anomalous condition.
// We rely on StaleNodeHandler in closing this pipeline eventually.
if (dn == null || (nodeStatus.get(dn) == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which case is this validation expected to catch?

Copy link
Contributor Author

@swamirishi swamirishi Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case where a DN has gone down and has come back up with a different dnId. In such a case the DN could still be a part of an existing pipeline. The registeration of this dn on SCM should not fail since it is the same node. This is a very dumb way to work around https://issues.apache.org/jira/browse/HDDS-11670. Eventually these pipeline would get closed once the datanode proposes to close the existing pipeline on the next in-flight write or SCM will close it automatically marking the older datanodeId as dead and closing all the pipelines which contains the node with the old datanode id.

&& nodeStatus.keySet().stream().noneMatch(node -> node.compareNodeValues(dn)))) {
throw new IOException(
String.format("Datanode=%s not part of pipeline=%s", dn, id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.OutputStream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand Down Expand Up @@ -93,7 +94,8 @@ public abstract StateMachine.DataChannel getStreamDataChannel(
*
* @return datanode Id
*/
protected String getDatanodeId() {
@VisibleForTesting
public String getDatanodeId() {
return datanodeId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
Expand All @@ -78,6 +79,7 @@
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
Expand Down Expand Up @@ -202,6 +204,7 @@ long getStartTime() {
private final boolean waitOnBothFollowers;
private final HddsDatanodeService datanodeService;
private static Semaphore semaphore = new Semaphore(1);
private final AtomicBoolean peersValidated;

/**
* CSM metrics.
Expand Down Expand Up @@ -252,6 +255,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
stateMachineHealthy = new AtomicBoolean(true);
this.peersValidated = new AtomicBoolean(false);

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(
Expand All @@ -265,6 +269,19 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI

}

private void validatePeers() throws IOException {
if (this.peersValidated.get()) {
return;
}
final RaftGroup group = ratisServer.getServerDivision(getGroupId()).getGroup();
final RaftPeerId selfId = ratisServer.getServer().getId();
if (group.getPeer(selfId) == null) {
throw new StorageContainerException("Current datanode " + selfId + " is not a member of " + group,
ContainerProtos.Result.INVALID_CONFIG);
}
peersValidated.set(true);
}

@Override
public StateMachineStorage getStateMachineStorage() {
return storage;
Expand Down Expand Up @@ -962,6 +979,11 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
try {
this.validatePeers();
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
long timeNow = Time.monotonicNowNanos();
long queueingDelay = timeNow - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
Expand Down Expand Up @@ -242,6 +243,15 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
Type cmdType = request.getCmdType();
// Validate the request has been made to the correct datanode with the node id matching.
if (kvContainer != null) {
try {
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
request.getDatanodeUuid());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
}

switch (cmdType) {
case CreateContainer:
Expand Down Expand Up @@ -353,6 +363,13 @@ ContainerCommandResponseProto handleCreateContainer(
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
}

try {
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
request.getCreateContainer().getReplicaIndex() : null, request.getDatanodeUuid());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}

long containerID = request.getContainerID();
State containerState = request.getCreateContainer().getState();

Expand Down Expand Up @@ -1532,4 +1549,22 @@ public static FaultInjector getInjector() {
public static void setInjector(FaultInjector instance) {
injector = instance;
}

/**
* Verify if request's replicaIndex matches with containerData. This validates only for EC containers i.e.
* containerReplicaIdx should be > 0.
*
* @param containerReplicaIdx replicaIndex for the container command.
* @param requestDatanodeUUID requested block info
* @throws StorageContainerException if replicaIndex mismatches.
*/
private boolean validateRequestDatanodeId(Integer containerReplicaIdx, String requestDatanodeUUID)
throws StorageContainerException {
if (containerReplicaIdx != null && containerReplicaIdx > 0 && !requestDatanodeUUID.equals(this.getDatanodeId())) {
throw new StorageContainerException(
String.format("Request is trying to write to node with uuid : %s but the current nodeId is: %s .",
requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +132,13 @@ public void testHandlerCommandHandling() throws Exception {
.build();

KeyValueContainer container = mock(KeyValueContainer.class);

KeyValueContainerData containerData = mock(KeyValueContainerData.class);
Mockito.when(container.getContainerData()).thenReturn(containerData);
Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
ContainerProtos.ContainerCommandResponseProto responseProto = KeyValueHandler.dispatchRequest(handler,
createContainerRequest, container, null);
assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, responseProto.getResult());
Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
KeyValueHandler
.dispatchRequest(handler, createContainerRequest, container, null);
verify(handler, times(0)).handleListBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
Expand All @@ -50,6 +53,7 @@
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand Down Expand Up @@ -264,6 +268,57 @@ public void testContainerStateMachineCloseOnMissingPipeline()
key.close();
}


@Test
public void testContainerStateMachineRestartWithDNChangePipeline()
throws Exception {
try (OzoneOutputStream key = objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("testDNRestart", 1024, ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.THREE), new HashMap<>())) {
key.write("ratis".getBytes(UTF_8));
key.flush();

KeyOutputStream groupOutputStream = (KeyOutputStream) key.
getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
assertEquals(1, locationInfoList.size());

OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
Pipeline pipeline = omKeyLocationInfo.getPipeline();
List<HddsDatanodeService> datanodes =
new ArrayList<>(TestHelper.getDatanodeServices(cluster,
pipeline));

DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();

// Delete all data volumes.
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
.stream().forEach(v -> {
try {
FileUtils.deleteDirectory(v.getStorageDir());
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Delete datanode.id datanodeIdFile.
File datanodeIdFile = new File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
boolean deleted = datanodeIdFile.delete();
assertTrue(deleted);
cluster.restartHddsDatanode(dn, false);
GenericTestUtils.waitFor(() -> {
try {
key.write("ratis".getBytes(UTF_8));
key.flush();
return groupOutputStream.getLocationInfoList().size() > 1;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, 1000, 30000);
}
}

@Test
public void testContainerStateMachineFailures() throws Exception {
OzoneOutputStream key =
Expand Down
Loading
Loading