Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.*;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -40,9 +42,9 @@
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -56,27 +58,26 @@
import java.util.concurrent.TimeoutException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;

/**
* This class verifies the watchForCommit Handling by xceiverClient.
*/
@Ignore
public class TestWatchForCommit {

private MiniOzoneCluster cluster;
private OzoneClient client;
private ObjectStore objectStore;
private String volumeName;
private String bucketName;
private String keyString;
private int chunkSize;
private int flushSize;
private int maxFlushSize;
private int blockSize;
private StorageContainerLocationProtocolClientSideTranslatorPB
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static OzoneClient client;
private static ObjectStore objectStore;
private static String volumeName;
private static String bucketName;
private static String keyString;
private static int chunkSize;
private static int flushSize;
private static int maxFlushSize;
private static int blockSize;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;

/**
Expand All @@ -86,19 +87,42 @@ public class TestWatchForCommit {
*
* @throws IOException
*/
private void startCluster(OzoneConfiguration conf) throws Exception {
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
chunkSize = 100;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;

conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);

conf.setBoolean(
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
TimeUnit.SECONDS);
conf.setQuietMode(false);
conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
+ ".client.request.write.timeout", 10, TimeUnit.SECONDS);
conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
+ ".client.request.watch.timeout", 10, TimeUnit.SECONDS);
conf.setTimeDuration(
RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
3, TimeUnit.SECONDS);
conf.setTimeDuration(
RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
DatanodeRatisServerConfig.
RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
3, TimeUnit.SECONDS);
conf.setTimeDuration(
RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
"rpc.request.timeout",
3, TimeUnit.SECONDS);
conf.setTimeDuration(
RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
"watch.request.timeout",
10, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setTotalPipelineNumLimit(10)
.setNumDatanodes(9)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
Expand All @@ -122,7 +146,8 @@ private void startCluster(OzoneConfiguration conf) throws Exception {
/**
* Shutdown MiniDFSCluster.
*/
private void shutdown() {
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
Expand All @@ -134,25 +159,6 @@ private String getKeyName() {

@Test
public void testWatchForCommitWithKeyWrite() throws Exception {
// in this case, watch request should fail with RaftRetryFailureException
// and will be captured in keyOutputStream and the failover will happen
// to a different block
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
startCluster(conf);
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
Expand All @@ -161,24 +167,9 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();

Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
Expand All @@ -201,16 +192,6 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Expand All @@ -236,43 +217,26 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
// rewritten plus one partial chunk plus two putBlocks for flushSize
// and one flush for partial chunk
key.flush();
Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
.getIoException()) instanceof RaftRetryFailureException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
key.close();
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
validateData(keyName, data1);
shutdown();
}

@Test
public void testWatchForCommitForRetryfailure() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
startCluster(conf);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(HddsProtos.ReplicationType.RATIS,
Expand All @@ -283,6 +247,7 @@ public void testWatchForCommitForRetryfailure() throws Exception {
Assert.assertEquals(container1.getPipeline(),
xceiverClient.getPipeline());
Pipeline pipeline = xceiverClient.getPipeline();
TestHelper.createPipelineOnDatanode(pipeline, cluster);
XceiverClientReply reply = xceiverClient.sendCommandAsync(
ContainerTestHelper.getCreateContainerRequest(
container1.getContainerInfo().getContainerID(),
Expand All @@ -308,14 +273,10 @@ public void testWatchForCommitForRetryfailure() throws Exception {
.checkForException(e) instanceof TimeoutException);
}
clientManager.releaseClient(xceiverClient, false);
shutdown();
}

@Test
public void test2WayCommitForTimeoutException() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set("raft.client.watch.request.timeout", "3s");
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Expand All @@ -329,14 +290,21 @@ public void test2WayCommitForTimeoutException() throws Exception {
Assert.assertEquals(container1.getPipeline(),
xceiverClient.getPipeline());
Pipeline pipeline = xceiverClient.getPipeline();
TestHelper.createPipelineOnDatanode(pipeline, cluster);
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
XceiverClientReply reply = xceiverClient.sendCommandAsync(
ContainerTestHelper.getCreateContainerRequest(
container1.getContainerInfo().getContainerID(),
xceiverClient.getPipeline()));
reply.getResponse().get();
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
// shutdown the ratis follower
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
cluster.shutdownHddsDatanode(dn.getDatanodeDetails());
break;
}
}
reply = xceiverClient.sendCommandAsync(ContainerTestHelper
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
Expand All @@ -351,20 +319,11 @@ public void test2WayCommitForTimeoutException() throws Exception {
Assert
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
logCapturer.stopCapturing();
shutdown();
}

@Test
public void testWatchForCommitForGroupMismatchException() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();

// mark the node stale early so that pipleline gets destroyed quickly
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
XceiverClientManager clientManager = new XceiverClientManager(conf);

ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE);
Expand All @@ -389,14 +348,14 @@ public void testWatchForCommitForGroupMismatchException() throws Exception {
// as well as there is no logIndex generate in Ratis.
// The basic idea here is just to test if its throws an exception.
xceiverClient
.watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10);
.watchForCommit(reply.getLogIndex() +
new Random().nextInt(100) + 10);
Assert.fail("Expected exception not thrown");
} catch(Exception e) {
Assert.assertTrue(HddsClientUtils
.checkForException(e) instanceof GroupMismatchException);
}
clientManager.releaseClient(xceiverClient, false);
shutdown();
}

private OzoneOutputStream createKey(String keyName, ReplicationType type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@

import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.ratis.RatisHelper;
Expand Down Expand Up @@ -220,10 +216,27 @@ public static void waitForPipelineClose(List<Pipeline> pipelineList,
cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn))
.getDatanodeStateMachine().getContainer().getWriteChannel();
Assert.assertTrue(server instanceof XceiverServerRatis);
XceiverServerRatis raftServer = (XceiverServerRatis) server;
GenericTestUtils.waitFor(
() -> (!raftServer.getPipelineIds().contains(pipeline.getId())),
500, 100 * 1000);
server.removeGroup(pipeline.getId().getProtobuf());
}
}
}

public static void createPipelineOnDatanode(Pipeline pipeline,
MiniOzoneCluster cluster)
throws IOException {

// wait for the pipeline to get destroyed in the datanodes
for (DatanodeDetails dn : pipeline.getNodes()) {
XceiverServerSpi server =
cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn))
.getDatanodeStateMachine().getContainer()
.getWriteChannel();
Assert.assertTrue(server instanceof XceiverServerRatis);
try {
server.addGroup(pipeline.getId().getProtobuf(), Collections.
unmodifiableList(pipeline.getNodes()));
} catch (Exception e) {
//ignore exception
}
}
}
Expand Down