Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -579,6 +580,15 @@ public boolean equals(Object obj) {
uuid.equals(((DatanodeDetails) obj).uuid);
}


public boolean validateNodeValue(DatanodeDetails datanodeDetails) {
if (this == datanodeDetails || super.equals(datanodeDetails)) {
return true;
}
return Objects.equals(ipAddress, datanodeDetails.ipAddress)
&& Objects.equals(hostName, datanodeDetails.hostName) && Objects.equals(ports, datanodeDetails.ports);
}

@Override
public int hashCode() {
return uuid.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ public List<DatanodeDetails> getNodesInOrder() {
}

void reportDatanode(DatanodeDetails dn) throws IOException {
if (nodeStatus.get(dn) == null) {
if (dn == null || (nodeStatus.get(dn) == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which case is this validation expected to catch?

Copy link
Contributor Author

@swamirishi swamirishi Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case where a DN has gone down and has come back up with a different dnId. In such a case the DN could still be a part of an existing pipeline. The registeration of this dn on SCM should not fail since it is the same node. This is a very dumb way to work around https://issues.apache.org/jira/browse/HDDS-11670. Eventually these pipeline would get closed once the datanode proposes to close the existing pipeline on the next in-flight write or SCM will close it automatically marking the older datanodeId as dead and closing all the pipelines which contains the node with the old datanode id.

&& nodeStatus.keySet().stream().noneMatch(node -> node.validateNodeValue(dn)))) {
throw new IOException(
String.format("Datanode=%s not part of pipeline=%s", dn, id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.OutputStream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand Down Expand Up @@ -93,7 +94,8 @@ public abstract StateMachine.DataChannel getStreamDataChannel(
*
* @return datanode Id
*/
protected String getDatanodeId() {
@VisibleForTesting
public String getDatanodeId() {
return datanodeId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
Expand All @@ -78,6 +79,7 @@
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
Expand Down Expand Up @@ -202,6 +204,7 @@ long getStartTime() {
private final boolean waitOnBothFollowers;
private final HddsDatanodeService datanodeService;
private static Semaphore semaphore = new Semaphore(1);
private final AtomicBoolean peersValidated;

/**
* CSM metrics.
Expand Down Expand Up @@ -252,6 +255,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
stateMachineHealthy = new AtomicBoolean(true);
this.peersValidated = new AtomicBoolean(false);

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(
Expand All @@ -265,6 +269,19 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI

}

private void validatePeers(RaftServer server, RaftGroupId id) throws IOException {
if (this.peersValidated.get()) {
return;
}
final RaftGroup group = ratisServer.getServerDivision(getGroupId()).getGroup();
final RaftPeerId selfId = ratisServer.getServer().getId();
if (group.getPeer(selfId) == null) {
throw new StorageContainerException("Current datanode " + selfId + " is not a member of " + group,
ContainerProtos.Result.INVALID_CONFIG);
}
peersValidated.set(true);
}

@Override
public StateMachineStorage getStateMachineStorage() {
return storage;
Expand Down Expand Up @@ -962,6 +979,11 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
try {
this.validatePeers(this.ratisServer.getServer(), getGroupId());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expectation? If the peer is not in the group, fail it?

If it is the case, we should do it earlier. Why not doing it in startTransaction(LogEntryProto, RaftPeerRole)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datanode proposes to close the pipeline to SCM only on the applyTransaction block. Correct me if I am wrong here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@szetszwo szetszwo Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just invoke handleApplyTransactionFailure in the validatePeers method then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would want to close the entire raft group. If we do it in the start transaction, just the add log will fail on one server right? We would be just performing this check on the leader right? Correct me if I am wrong here. We need this block running on followers as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really want to check leader, check it also in the other startTransaction method

  TransactionContext startTransaction(RaftClientRequest request) throws IOException;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we set the exception in the transactionContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this result in the stateMachine getting closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline:
We came to a conclusion adding a validation block at applyTransactionSerial & startTransaction would not help. Start transaction on follower doesn't allow throwing an exception and it will bring down the state machine. If the state machine gets closed, we will never be able to clear up the raft meta data after the state machine goes down. The same is true applyTransactionSerial as well. Even though it allows throwing an exception that also brings down the statemachine. So the only easy solution (Handling things on Ozone side and not relying on raft) here is to add it to the apply transaction block and return an error response which would make the containerStateMachine unhealthy but would still allow a close pipeline on it since the statemachine is still healthy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would want to some kind of callback on state machine intialization on ratis eventually which should clear up the ratis dirs and remove it from the system. I have created a follow up task item for this RATIS-2187.

long timeNow = Time.monotonicNowNanos();
long queueingDelay = timeNow - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
Expand Down Expand Up @@ -242,6 +243,15 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
Type cmdType = request.getCmdType();
// Validate the request has been made to the correct datanode with the node id matching.
if (kvContainer != null) {
try {
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
request.getDatanodeUuid());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
}

switch (cmdType) {
case CreateContainer:
Expand Down Expand Up @@ -353,6 +363,13 @@ ContainerCommandResponseProto handleCreateContainer(
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
}

try {
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
request.getCreateContainer().getReplicaIndex() : null, request.getDatanodeUuid());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}

long containerID = request.getContainerID();

ContainerLayoutVersion layoutVersion =
Expand Down Expand Up @@ -1519,4 +1536,22 @@ public static FaultInjector getInjector() {
public static void setInjector(FaultInjector instance) {
injector = instance;
}

/**
* Verify if request's replicaIndex matches with containerData. This validates only for EC containers i.e.
* containerReplicaIdx should be > 0.
*
* @param containerReplicaIdx replicaIndex for the container command.
* @param requestDatanodeUUID requested block info
* @throws StorageContainerException if replicaIndex mismatches.
*/
private boolean validateRequestDatanodeId(Integer containerReplicaIdx, String requestDatanodeUUID)
throws StorageContainerException {
if (containerReplicaIdx != null && containerReplicaIdx > 0 && !requestDatanodeUUID.equals(this.getDatanodeId())) {
throw new StorageContainerException(
String.format("Request is trying to write to node with uuid : %s but the current nodeId is: %s .",
requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +132,13 @@ public void testHandlerCommandHandling() throws Exception {
.build();

KeyValueContainer container = mock(KeyValueContainer.class);

KeyValueContainerData containerData = mock(KeyValueContainerData.class);
Mockito.when(container.getContainerData()).thenReturn(containerData);
Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
ContainerProtos.ContainerCommandResponseProto responseProto = KeyValueHandler.dispatchRequest(handler,
createContainerRequest, container, null);
assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, responseProto.getResult());
Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
KeyValueHandler
.dispatchRequest(handler, createContainerRequest, container, null);
verify(handler, times(0)).handleListBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
Expand All @@ -50,6 +53,7 @@
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand Down Expand Up @@ -264,6 +268,57 @@ public void testContainerStateMachineCloseOnMissingPipeline()
key.close();
}


@Test
public void testContainerStateMachineRestartWithDNChangePipeline()
throws Exception {
try (OzoneOutputStream key = objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("testDNRestart", 1024, ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.THREE), new HashMap<>())) {
key.write("ratis".getBytes(UTF_8));
key.flush();

KeyOutputStream groupOutputStream = (KeyOutputStream) key.
getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
assertEquals(1, locationInfoList.size());

OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
Pipeline pipeline = omKeyLocationInfo.getPipeline();
List<HddsDatanodeService> datanodes =
new ArrayList<>(TestHelper.getDatanodeServices(cluster,
pipeline));

DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();

// Delete all data volumes.
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
.stream().forEach(v -> {
try {
FileUtils.deleteDirectory(v.getStorageDir());
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Delete datanode.id datanodeIdFile.
File datanodeIdFile = new File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
boolean deleted = datanodeIdFile.delete();
assertTrue(deleted);
cluster.restartHddsDatanode(dn, false);
GenericTestUtils.waitFor(() -> {
try {
key.write("ratis".getBytes(UTF_8));
key.flush();
return groupOutputStream.getLocationInfoList().size() > 1;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, 1000, 30000);
}
}

@Test
public void testContainerStateMachineFailures() throws Exception {
OzoneOutputStream key =
Expand Down
Loading