diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index 157685f231c8..7e1b4fb82963 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -19,15 +19,17 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.*; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -40,9 +42,9 @@ import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.test.GenericTestUtils; import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.RaftRetryFailureException; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.Before; +import org.junit.After; import org.junit.Test; import java.io.IOException; @@ -56,27 +58,26 @@ import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; /** * This class verifies the watchForCommit Handling by xceiverClient. */ -@Ignore public class TestWatchForCommit { - private MiniOzoneCluster cluster; - private OzoneClient client; - private ObjectStore objectStore; - private String volumeName; - private String bucketName; - private String keyString; - private int chunkSize; - private int flushSize; - private int maxFlushSize; - private int blockSize; - private StorageContainerLocationProtocolClientSideTranslatorPB + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static String volumeName; + private static String bucketName; + private static String keyString; + private static int chunkSize; + private static int flushSize; + private static int maxFlushSize; + private static int blockSize; + private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; /** @@ -86,19 +87,42 @@ public class TestWatchForCommit { * * @throws IOException */ - private void startCluster(OzoneConfiguration conf) throws Exception { + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); chunkSize = 100; flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; - - conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5); - + conf.setBoolean( + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10, + TimeUnit.SECONDS); conf.setQuietMode(false); + conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY + + ".client.request.write.timeout", 10, TimeUnit.SECONDS); + conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY + + ".client.request.watch.timeout", 10, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." + + DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY, + 3, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." + + DatanodeRatisServerConfig. + RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY, + 3, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." + + "rpc.request.timeout", + 3, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." + + "watch.request.timeout", + 10, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(7) - .setTotalPipelineNumLimit(10) + .setNumDatanodes(9) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) @@ -122,7 +146,8 @@ private void startCluster(OzoneConfiguration conf) throws Exception { /** * Shutdown MiniDFSCluster. */ - private void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } @@ -134,25 +159,6 @@ private String getKeyName() { @Test public void testWatchForCommitWithKeyWrite() throws Exception { - // in this case, watch request should fail with RaftRetryFailureException - // and will be captured in keyOutputStream and the failover will happen - // to a different block - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration( - OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, - 1, TimeUnit.SECONDS); - startCluster(conf); - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); int dataLength = maxFlushSize + 50; @@ -161,24 +167,9 @@ public void testWatchForCommitWithKeyWrite() throws Exception { ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); key.write(data1); - // since its hitting the full bufferCondition, it will call watchForCommit - // and completes atleast putBlock for first flushSize worth of data - Assert.assertTrue(metrics - .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk) - <= pendingWriteChunkCount + 2); - Assert.assertTrue( - metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) - <= pendingPutBlockCount + 1); - Assert.assertEquals(writeChunkCount + 4, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 2, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); @@ -201,16 +192,6 @@ public void testWatchForCommitWithKeyWrite() throws Exception { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); - Assert.assertEquals(pendingWriteChunkCount, metrics - .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, metrics - .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(writeChunkCount + 5, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 3, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); @@ -236,29 +217,15 @@ public void testWatchForCommitWithKeyWrite() throws Exception { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream - .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); - Assert.assertEquals(pendingWriteChunkCount, metrics - .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, metrics - .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(writeChunkCount + 14, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 8, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty @@ -266,13 +233,10 @@ public void testWatchForCommitWithKeyWrite() throws Exception { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); validateData(keyName, data1); - shutdown(); } @Test public void testWatchForCommitForRetryfailure() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - startCluster(conf); XceiverClientManager clientManager = new XceiverClientManager(conf); ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, @@ -283,6 +247,7 @@ public void testWatchForCommitForRetryfailure() throws Exception { Assert.assertEquals(container1.getPipeline(), xceiverClient.getPipeline()); Pipeline pipeline = xceiverClient.getPipeline(); + TestHelper.createPipelineOnDatanode(pipeline, cluster); XceiverClientReply reply = xceiverClient.sendCommandAsync( ContainerTestHelper.getCreateContainerRequest( container1.getContainerInfo().getContainerID(), @@ -308,14 +273,10 @@ public void testWatchForCommitForRetryfailure() throws Exception { .checkForException(e) instanceof TimeoutException); } clientManager.releaseClient(xceiverClient, false); - shutdown(); } @Test public void test2WayCommitForTimeoutException() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.set("raft.client.watch.request.timeout", "3s"); - startCluster(conf); GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); XceiverClientManager clientManager = new XceiverClientManager(conf); @@ -329,6 +290,7 @@ public void test2WayCommitForTimeoutException() throws Exception { Assert.assertEquals(container1.getPipeline(), xceiverClient.getPipeline()); Pipeline pipeline = xceiverClient.getPipeline(); + TestHelper.createPipelineOnDatanode(pipeline, cluster); XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; XceiverClientReply reply = xceiverClient.sendCommandAsync( ContainerTestHelper.getCreateContainerRequest( @@ -336,7 +298,13 @@ public void test2WayCommitForTimeoutException() throws Exception { xceiverClient.getPipeline())); reply.getResponse().get(); Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { + // shutdown the ratis follower + if (ContainerTestHelper.isRatisFollower(dn, pipeline)) { + cluster.shutdownHddsDatanode(dn.getDatanodeDetails()); + break; + } + } reply = xceiverClient.sendCommandAsync(ContainerTestHelper .getCloseContainer(pipeline, container1.getContainerInfo().getContainerID())); @@ -351,20 +319,11 @@ public void test2WayCommitForTimeoutException() throws Exception { Assert .assertTrue(logCapturer.getOutput().contains("Committed by majority")); logCapturer.stopCapturing(); - shutdown(); } @Test public void testWatchForCommitForGroupMismatchException() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - - // mark the node stale early so that pipleline gets destroyed quickly - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); XceiverClientManager clientManager = new XceiverClientManager(conf); - ContainerWithPipeline container1 = storageContainerLocationClient .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE); @@ -389,14 +348,14 @@ public void testWatchForCommitForGroupMismatchException() throws Exception { // as well as there is no logIndex generate in Ratis. // The basic idea here is just to test if its throws an exception. xceiverClient - .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10); + .watchForCommit(reply.getLogIndex() + + new Random().nextInt(100) + 10); Assert.fail("Expected exception not thrown"); } catch(Exception e) { Assert.assertTrue(HddsClientUtils .checkForException(e) instanceof GroupMismatchException); } clientManager.releaseClient(xceiverClient, false); - shutdown(); } private OzoneOutputStream createKey(String keyName, ReplicationType type, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java index 672f4b3d8883..f8646b9af5f1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java @@ -20,12 +20,8 @@ import java.io.IOException; import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeoutException; - import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.ratis.RatisHelper; @@ -220,10 +216,27 @@ public static void waitForPipelineClose(List pipelineList, cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn)) .getDatanodeStateMachine().getContainer().getWriteChannel(); Assert.assertTrue(server instanceof XceiverServerRatis); - XceiverServerRatis raftServer = (XceiverServerRatis) server; - GenericTestUtils.waitFor( - () -> (!raftServer.getPipelineIds().contains(pipeline.getId())), - 500, 100 * 1000); + server.removeGroup(pipeline.getId().getProtobuf()); + } + } + } + + public static void createPipelineOnDatanode(Pipeline pipeline, + MiniOzoneCluster cluster) + throws IOException { + + // wait for the pipeline to get destroyed in the datanodes + for (DatanodeDetails dn : pipeline.getNodes()) { + XceiverServerSpi server = + cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn)) + .getDatanodeStateMachine().getContainer() + .getWriteChannel(); + Assert.assertTrue(server instanceof XceiverServerRatis); + try { + server.addGroup(pipeline.getId().getProtobuf(), Collections. + unmodifiableList(pipeline.getNodes())); + } catch (Exception e) { + //ignore exception } } }