Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -674,30 +674,54 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
builder.setCreateContainerSet(createContainerSet);
}
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
CompletableFuture<Message> future = CompletableFuture
.supplyAsync(() -> runCommand(requestProto, builder.build()),
CompletableFuture<ContainerCommandResponseProto> future =
CompletableFuture.supplyAsync(
() -> runCommandGetResponse(requestProto, builder.build()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets rename runCommandGetResponse and remove runCommand as all the existing caller of the earlier function runCommand can be removed.

getCommandExecutor(requestProto));

future.thenAccept(m -> {
future.thenApply(r -> {
if (trx.getServerRole() == RaftPeerRole.LEADER) {
long startTime = (long) trx.getStateMachineContext();
metrics.incPipelineLatency(cmdType,
Time.monotonicNowNanos() - startTime);
}

final Long previous =
applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
metrics.incNumBytesCommittedCount(
if (r.getResult() != ContainerProtos.Result.SUCCESS) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": ApplyTransaction failed: cmd " + r.getCmdType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add containerID in this error log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Container Id will be present in the Response Message. Will add that to the logger output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets convert this to parameterized logging.

+ " logIndex " + index + " Error message: " + r.getMessage()
+ " Container Result: " + r.getResult());
metrics.incNumApplyTransactionsFails();
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
// Since the applyTransaction now is completed exceptionally,
// before any further snapshot is taken , the exception will be
// caught in stateMachineUpdater in Ratis and ratis server will
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets move the ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole()); as the last line in the if block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the latest patch.

} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
LOG.debug(gid + ": ApplyTransaction completed: cmd " + r.getCmdType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a success, then "" Error message: " + r.getMessage()" will not be the right thing to print here.

+ " logIndex " + index + " Error message: " + r.getMessage()
+ " Container Result: " + r.getResult());
applyTransactionFuture.complete(r::toByteString);
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
metrics.incNumBytesCommittedCount(
requestProto.getWriteChunk().getChunkData().getLen());
}
}

final Long previous = applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
updateLastApplied();
}).whenComplete((r, t) -> applyTransactionSemaphore.release());
return future;
applyTransactionSemaphore.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go inside a finally, to make sure the semaphore is released always.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the whenComplete() stage at the end.
releasing the semaphore from a whenComplete() stage guarantees that the semaphore will be released even if the processing inside thenApply() stage hits an exception. This seems to me to be a good practice.

return applyTransactionFuture;
});
return applyTransactionFuture;
} catch (IOException | InterruptedException e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,16 @@ void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
}

void handleApplyTransactionFailure(RaftGroupId groupId,
RaftProtos.RaftPeerRole role) {
UUID dnId = RatisHelper.toDatanodeId(getServer().getId());
String msg =
"Ratis Transaction failure in datanode" + dnId + " with role " + role
+ " Triggering pipeline close action.";
triggerPipelineClose(groupId, msg, ClosePipelineInfo.Reason.PIPELINE_FAILED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new reason code here? Pipeline Failed is getting overloaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, the msg will differentiate what was the cause of the error. The reason code is just for SCM to take action of closing the pipeline. I don't think possibly SCM needs to differentiate its behaviour depending on why the pipelien failed.

If required, we can add it in a separate jira as it needs to change for other reasons of pipeline failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a new failure reason here as @supratimdeka is suggesting. This will help in identifying difference failure via metrics in SCM.

false);
stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not necessarily need to stop the raftServer here, for the other container's we can still keep on applying the transaction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as i know from previous discussions , the decision was to not take any other transactions on this pipeline at all and kill the RaftServerImpl instance. Any deviation from that conclusion?

}
/**
* The fact that the snapshot contents cannot be used to actually catch up
* the follower, it is the reason to initiate close pipeline and
Expand All @@ -630,6 +640,10 @@ void handleInstallSnapshotFromLeader(RaftGroupId groupId,
handlePipelineFailure(groupId, roleInfoProto);
}

@VisibleForTesting
public boolean isClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove this. and this can be replaced with

raftServer.getServer().getLifeCycleState().isClosingOrClosed()

return !isStarted;
}
/**
* Notify the Datanode Ratis endpoint of Ratis log failure.
* Expected to be invoked from the Container StateMachine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused.

import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
Expand All @@ -34,11 +39,14 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.util.LifeCycle;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -77,7 +85,7 @@ public class TestContainerStateMachineFailures {
private static String volumeName;
private static String bucketName;
private static String path;
private static int chunkSize;
private static XceiverClientManager xceiverClientManager;

/**
* Create a MiniDFSCluster for testing.
Expand Down Expand Up @@ -109,6 +117,7 @@ public static void init() throws Exception {
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
xceiverClientManager = new XceiverClientManager(conf);
volumeName = "testcontainerstatemachinefailures";
bucketName = volumeName;
objectStore.createVolume(volumeName);
Expand Down Expand Up @@ -270,4 +279,73 @@ public void testUnhealthyContainer() throws Exception {
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
dispatcher.dispatch(request.build(), null).getResult());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you speak to the validity of these generated assert statement for the above test method?

org.junit.Assert.assertTrue( ONE.readContainerFile( node ) );
org.junit.Assert.assertTrue( ONE.readContainerFile( node.toString() ));
org.junit.Assert.assertTrue( ONE.readContainerFile( node.toString(), KeyOutputStream ))

Thank you.


@Test
public void testAppyTransactionFailure() throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());

//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused.

setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
.build();
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
ContainerData containerData =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
KeyValueContainerData keyValueContainerData =
(KeyValueContainerData) containerData;
key.close();

long containerID = omKeyLocationInfo.getContainerID();
// delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
Pipeline pipeline = cluster.getStorageContainerLocationClient()
.getContainerWithPipeline(containerID).getPipeline();
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtos.ContainerCommandRequestProto.Builder request =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we append some more data to the key and flush again ? in place of a close ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to execute a transaction on the same container. If we write more data , it can potentially go a new container altogether.

ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
// close container transaction will fail over Ratis and will cause the raft
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incomplete comment here.

try {
client.sendCommand(request.build());
Assert.fail("Expected exception not thrown");
} catch (IOException e) {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a log message here to say that the test caught an IOException as expected by the test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in the next patch..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception are we expecting here ?


// Make sure the container is marked unhealthy
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet().getContainer(containerID)
.getContainerState()
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
XceiverServerRatis raftServer = (XceiverServerRatis)
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getWriteChannel();
Assert.assertTrue(raftServer.isClosed());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one key goal of HDDS-1610 is to ensure that no snapshot can be taken after a log apply failure.

Should the unit test include this assertion? Perhaps by setting the autoTriggerThreshold in Ratis to take a snapshot after every applyLog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address in the next patch.

try {
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(pipeline.getId());
Assert.fail("Expected exception not thrown");
} catch(PipelineNotFoundException e) {
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you speak to the validity of these generated assert statement for the above test method?

org.junit.Assert.assertTrue( name, volumeName.isValid() )

Thank you.