Skip to content

Commit d30ff3e

Browse files
HDDS-3430. Enable TestWatchForCommit test cases. (#1114)
Co-authored-by: Doroszlai, Attila <[email protected]>
1 parent 07f7131 commit d30ff3e

File tree

2 files changed

+83
-111
lines changed

2 files changed

+83
-111
lines changed

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java

Lines changed: 61 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919

2020
import org.apache.hadoop.conf.StorageUnit;
2121
import org.apache.hadoop.hdds.client.ReplicationType;
22+
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
2223
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
23-
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2424
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
25+
import org.apache.hadoop.hdds.ratis.RatisHelper;
2526
import org.apache.hadoop.hdds.scm.*;
2627
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
2728
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
2829
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
2930
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
3031
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
32+
import org.apache.hadoop.ozone.HddsDatanodeService;
3133
import org.apache.hadoop.ozone.MiniOzoneCluster;
3234
import org.apache.hadoop.ozone.OzoneConfigKeys;
3335
import org.apache.hadoop.ozone.OzoneConsts;
@@ -40,9 +42,9 @@
4042
import org.apache.hadoop.ozone.container.TestHelper;
4143
import org.apache.hadoop.test.GenericTestUtils;
4244
import org.apache.ratis.protocol.GroupMismatchException;
43-
import org.apache.ratis.protocol.RaftRetryFailureException;
4445
import org.junit.Assert;
45-
import org.junit.Ignore;
46+
import org.junit.Before;
47+
import org.junit.After;
4648
import org.junit.Test;
4749

4850
import java.io.IOException;
@@ -56,27 +58,26 @@
5658
import java.util.concurrent.TimeoutException;
5759

5860
import static java.nio.charset.StandardCharsets.UTF_8;
59-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
6061
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
61-
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
62+
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
6263

6364
/**
6465
* This class verifies the watchForCommit Handling by xceiverClient.
6566
*/
66-
@Ignore
6767
public class TestWatchForCommit {
6868

69-
private MiniOzoneCluster cluster;
70-
private OzoneClient client;
71-
private ObjectStore objectStore;
72-
private String volumeName;
73-
private String bucketName;
74-
private String keyString;
75-
private int chunkSize;
76-
private int flushSize;
77-
private int maxFlushSize;
78-
private int blockSize;
79-
private StorageContainerLocationProtocolClientSideTranslatorPB
69+
private static MiniOzoneCluster cluster;
70+
private static OzoneConfiguration conf;
71+
private static OzoneClient client;
72+
private static ObjectStore objectStore;
73+
private static String volumeName;
74+
private static String bucketName;
75+
private static String keyString;
76+
private static int chunkSize;
77+
private static int flushSize;
78+
private static int maxFlushSize;
79+
private static int blockSize;
80+
private static StorageContainerLocationProtocolClientSideTranslatorPB
8081
storageContainerLocationClient;
8182

8283
/**
@@ -86,19 +87,42 @@ public class TestWatchForCommit {
8687
*
8788
* @throws IOException
8889
*/
89-
private void startCluster(OzoneConfiguration conf) throws Exception {
90+
@Before
91+
public void init() throws Exception {
92+
conf = new OzoneConfiguration();
9093
chunkSize = 100;
9194
flushSize = 2 * chunkSize;
9295
maxFlushSize = 2 * flushSize;
9396
blockSize = 2 * maxFlushSize;
94-
95-
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
96-
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
97-
97+
conf.setBoolean(
98+
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
99+
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
100+
TimeUnit.SECONDS);
98101
conf.setQuietMode(false);
102+
conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
103+
+ ".client.request.write.timeout", 10, TimeUnit.SECONDS);
104+
conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
105+
+ ".client.request.watch.timeout", 10, TimeUnit.SECONDS);
106+
conf.setTimeDuration(
107+
RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
108+
DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
109+
3, TimeUnit.SECONDS);
110+
conf.setTimeDuration(
111+
RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
112+
DatanodeRatisServerConfig.
113+
RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
114+
3, TimeUnit.SECONDS);
115+
conf.setTimeDuration(
116+
RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
117+
"rpc.request.timeout",
118+
3, TimeUnit.SECONDS);
119+
conf.setTimeDuration(
120+
RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
121+
"watch.request.timeout",
122+
10, TimeUnit.SECONDS);
123+
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
99124
cluster = MiniOzoneCluster.newBuilder(conf)
100-
.setNumDatanodes(7)
101-
.setTotalPipelineNumLimit(10)
125+
.setNumDatanodes(9)
102126
.setBlockSize(blockSize)
103127
.setChunkSize(chunkSize)
104128
.setStreamBufferFlushSize(flushSize)
@@ -122,7 +146,8 @@ private void startCluster(OzoneConfiguration conf) throws Exception {
122146
/**
123147
* Shutdown MiniDFSCluster.
124148
*/
125-
private void shutdown() {
149+
@After
150+
public void shutdown() {
126151
if (cluster != null) {
127152
cluster.shutdown();
128153
}
@@ -134,25 +159,6 @@ private String getKeyName() {
134159

135160
@Test
136161
public void testWatchForCommitWithKeyWrite() throws Exception {
137-
// in this case, watch request should fail with RaftRetryFailureException
138-
// and will be captured in keyOutputStream and the failover will happen
139-
// to a different block
140-
OzoneConfiguration conf = new OzoneConfiguration();
141-
conf.setTimeDuration(
142-
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
143-
1, TimeUnit.SECONDS);
144-
startCluster(conf);
145-
XceiverClientMetrics metrics =
146-
XceiverClientManager.getXceiverClientMetrics();
147-
long writeChunkCount = metrics.getContainerOpCountMetrics(
148-
ContainerProtos.Type.WriteChunk);
149-
long putBlockCount = metrics.getContainerOpCountMetrics(
150-
ContainerProtos.Type.PutBlock);
151-
long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics(
152-
ContainerProtos.Type.WriteChunk);
153-
long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
154-
ContainerProtos.Type.PutBlock);
155-
long totalOpCount = metrics.getTotalOpCount();
156162
String keyName = getKeyName();
157163
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
158164
int dataLength = maxFlushSize + 50;
@@ -161,24 +167,9 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
161167
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
162168
.getBytes(UTF_8);
163169
key.write(data1);
164-
// since its hitting the full bufferCondition, it will call watchForCommit
165-
// and completes atleast putBlock for first flushSize worth of data
166-
Assert.assertTrue(metrics
167-
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)
168-
<= pendingWriteChunkCount + 2);
169-
Assert.assertTrue(
170-
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
171-
<= pendingPutBlockCount + 1);
172-
Assert.assertEquals(writeChunkCount + 4,
173-
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
174-
Assert.assertEquals(putBlockCount + 2,
175-
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
176-
Assert.assertEquals(totalOpCount + 6,
177-
metrics.getTotalOpCount());
178170
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
179171
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
180172

181-
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
182173
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
183174
.getOutputStream();
184175
Assert.assertTrue(stream instanceof BlockOutputStream);
@@ -201,16 +192,6 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
201192
// Now do a flush. This will flush the data and update the flush length and
202193
// the map.
203194
key.flush();
204-
Assert.assertEquals(pendingWriteChunkCount, metrics
205-
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
206-
Assert.assertEquals(pendingPutBlockCount, metrics
207-
.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
208-
Assert.assertEquals(writeChunkCount + 5,
209-
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
210-
Assert.assertEquals(putBlockCount + 3,
211-
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
212-
Assert.assertEquals(totalOpCount + 8,
213-
metrics.getTotalOpCount());
214195
// Since the data in the buffer is already flushed, flush here will have
215196
// no impact on the counters and data structures
216197
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
@@ -236,43 +217,26 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
236217
// rewritten plus one partial chunk plus two putBlocks for flushSize
237218
// and one flush for partial chunk
238219
key.flush();
239-
Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
240-
.getIoException()) instanceof RaftRetryFailureException);
241220
// Make sure the retryCount is reset after the exception is handled
242221
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
243222
// now close the stream, It will update the ack length after watchForCommit
244-
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
245223
key.close();
246224
Assert
247225
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
248226
Assert
249227
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
250228
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
251-
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
252-
Assert.assertEquals(pendingWriteChunkCount, metrics
253-
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
254-
Assert.assertEquals(pendingPutBlockCount, metrics
255-
.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
256-
Assert.assertEquals(writeChunkCount + 14,
257-
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
258-
Assert.assertEquals(putBlockCount + 8,
259-
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
260-
Assert.assertEquals(totalOpCount + 22,
261-
metrics.getTotalOpCount());
262229
Assert
263230
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
264231
// make sure the bufferPool is empty
265232
Assert
266233
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
267234
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
268235
validateData(keyName, data1);
269-
shutdown();
270236
}
271237

272238
@Test
273239
public void testWatchForCommitForRetryfailure() throws Exception {
274-
OzoneConfiguration conf = new OzoneConfiguration();
275-
startCluster(conf);
276240
XceiverClientManager clientManager = new XceiverClientManager(conf);
277241
ContainerWithPipeline container1 = storageContainerLocationClient
278242
.allocateContainer(HddsProtos.ReplicationType.RATIS,
@@ -283,6 +247,7 @@ public void testWatchForCommitForRetryfailure() throws Exception {
283247
Assert.assertEquals(container1.getPipeline(),
284248
xceiverClient.getPipeline());
285249
Pipeline pipeline = xceiverClient.getPipeline();
250+
TestHelper.createPipelineOnDatanode(pipeline, cluster);
286251
XceiverClientReply reply = xceiverClient.sendCommandAsync(
287252
ContainerTestHelper.getCreateContainerRequest(
288253
container1.getContainerInfo().getContainerID(),
@@ -308,14 +273,10 @@ public void testWatchForCommitForRetryfailure() throws Exception {
308273
.checkForException(e) instanceof TimeoutException);
309274
}
310275
clientManager.releaseClient(xceiverClient, false);
311-
shutdown();
312276
}
313277

314278
@Test
315279
public void test2WayCommitForTimeoutException() throws Exception {
316-
OzoneConfiguration conf = new OzoneConfiguration();
317-
conf.set("raft.client.watch.request.timeout", "3s");
318-
startCluster(conf);
319280
GenericTestUtils.LogCapturer logCapturer =
320281
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
321282
XceiverClientManager clientManager = new XceiverClientManager(conf);
@@ -329,14 +290,21 @@ public void test2WayCommitForTimeoutException() throws Exception {
329290
Assert.assertEquals(container1.getPipeline(),
330291
xceiverClient.getPipeline());
331292
Pipeline pipeline = xceiverClient.getPipeline();
293+
TestHelper.createPipelineOnDatanode(pipeline, cluster);
332294
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
333295
XceiverClientReply reply = xceiverClient.sendCommandAsync(
334296
ContainerTestHelper.getCreateContainerRequest(
335297
container1.getContainerInfo().getContainerID(),
336298
xceiverClient.getPipeline()));
337299
reply.getResponse().get();
338300
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
339-
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
301+
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
302+
// shutdown the ratis follower
303+
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
304+
cluster.shutdownHddsDatanode(dn.getDatanodeDetails());
305+
break;
306+
}
307+
}
340308
reply = xceiverClient.sendCommandAsync(ContainerTestHelper
341309
.getCloseContainer(pipeline,
342310
container1.getContainerInfo().getContainerID()));
@@ -351,20 +319,11 @@ public void test2WayCommitForTimeoutException() throws Exception {
351319
Assert
352320
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
353321
logCapturer.stopCapturing();
354-
shutdown();
355322
}
356323

357324
@Test
358325
public void testWatchForCommitForGroupMismatchException() throws Exception {
359-
OzoneConfiguration conf = new OzoneConfiguration();
360-
361-
// mark the node stale early so that pipleline gets destroyed quickly
362-
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
363-
startCluster(conf);
364-
GenericTestUtils.LogCapturer logCapturer =
365-
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
366326
XceiverClientManager clientManager = new XceiverClientManager(conf);
367-
368327
ContainerWithPipeline container1 = storageContainerLocationClient
369328
.allocateContainer(HddsProtos.ReplicationType.RATIS,
370329
HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE);
@@ -389,14 +348,14 @@ public void testWatchForCommitForGroupMismatchException() throws Exception {
389348
// as well as there is no logIndex generate in Ratis.
390349
// The basic idea here is just to test if its throws an exception.
391350
xceiverClient
392-
.watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10);
351+
.watchForCommit(reply.getLogIndex() +
352+
new Random().nextInt(100) + 10);
393353
Assert.fail("Expected exception not thrown");
394354
} catch(Exception e) {
395355
Assert.assertTrue(HddsClientUtils
396356
.checkForException(e) instanceof GroupMismatchException);
397357
}
398358
clientManager.releaseClient(xceiverClient, false);
399-
shutdown();
400359
}
401360

402361
private OzoneOutputStream createKey(String keyName, ReplicationType type,

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@
2020

2121
import java.io.IOException;
2222
import java.security.MessageDigest;
23-
import java.util.ArrayList;
24-
import java.util.Arrays;
25-
import java.util.HashMap;
26-
import java.util.List;
23+
import java.util.*;
2724
import java.util.concurrent.TimeoutException;
28-
2925
import org.apache.hadoop.hdds.client.ReplicationType;
3026
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
3127
import org.apache.hadoop.hdds.ratis.RatisHelper;
@@ -220,10 +216,27 @@ public static void waitForPipelineClose(List<Pipeline> pipelineList,
220216
cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn))
221217
.getDatanodeStateMachine().getContainer().getWriteChannel();
222218
Assert.assertTrue(server instanceof XceiverServerRatis);
223-
XceiverServerRatis raftServer = (XceiverServerRatis) server;
224-
GenericTestUtils.waitFor(
225-
() -> (!raftServer.getPipelineIds().contains(pipeline.getId())),
226-
500, 100 * 1000);
219+
server.removeGroup(pipeline.getId().getProtobuf());
220+
}
221+
}
222+
}
223+
224+
public static void createPipelineOnDatanode(Pipeline pipeline,
225+
MiniOzoneCluster cluster)
226+
throws IOException {
227+
228+
// wait for the pipeline to get destroyed in the datanodes
229+
for (DatanodeDetails dn : pipeline.getNodes()) {
230+
XceiverServerSpi server =
231+
cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn))
232+
.getDatanodeStateMachine().getContainer()
233+
.getWriteChannel();
234+
Assert.assertTrue(server instanceof XceiverServerRatis);
235+
try {
236+
server.addGroup(pipeline.getId().getProtobuf(), Collections.
237+
unmodifiableList(pipeline.getNodes()));
238+
} catch (Exception e) {
239+
//ignore exception
227240
}
228241
}
229242
}

0 commit comments

Comments
 (0)