diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 9411216ae2c0..d02c7636ca01 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -52,7 +53,8 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk; @@ -147,11 +149,21 @@ void shutdown() { } } + private static Stream clientParameters() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); + } + static OzoneClientConfig newClientConfig(ConfigurationSource source, - boolean flushDelay) { + boolean flushDelay, boolean enablePiggybacking) { OzoneClientConfig clientConfig = source.getObject(OzoneClientConfig.class); clientConfig.setChecksumType(ChecksumType.NONE); clientConfig.setStreamBufferFlushDelay(flushDelay); + clientConfig.setEnablePutblockPiggybacking(enablePiggybacking); return clientConfig; } @@ -163,9 +175,9 @@ static OzoneClient newClient(OzoneConfiguration conf, } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteLessThanChunkSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testWriteLessThanChunkSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -258,9 +270,9 @@ void testWriteLessThanChunkSize(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteExactlyFlushSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testWriteExactlyFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -377,9 +389,9 @@ void testWriteExactlyFlushSize(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testWriteMoreThanChunkSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -426,7 +438,7 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { key.flush(); assertEquals(writeChunkCount + 2, metrics.getContainerOpCountMetrics(WriteChunk)); - assertEquals(putBlockCount, + assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1), metrics.getContainerOpCountMetrics(PutBlock)); assertEquals(pendingWriteChunkCount, metrics.getPendingContainerOpCountMetrics(WriteChunk)); @@ -455,9 +467,9 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { metrics.getPendingContainerOpCountMetrics(PutBlock)); assertEquals(writeChunkCount + 2, metrics.getContainerOpCountMetrics(WriteChunk)); - assertEquals(putBlockCount + 1, + assertEquals(putBlockCount + ((enablePiggybacking) ? 1 : 2), metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); + assertEquals(totalOpCount + ((enablePiggybacking) ? 3 : 4), metrics.getTotalOpCount()); assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); @@ -465,9 +477,9 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testWriteMoreThanFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -539,9 +551,10 @@ void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { assertEquals(writeChunkCount + 3, metrics.getContainerOpCountMetrics(WriteChunk)); // If the flushDelay was disabled, it sends PutBlock with the data in the buffer. - assertEquals(putBlockCount + (flushDelay ? 1 : 2), + assertEquals(putBlockCount + (flushDelay ? 2 : 3) - (enablePiggybacking ? 1 : 0), metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + (flushDelay ? 4 : 5), metrics.getTotalOpCount()); + assertEquals(totalOpCount + (flushDelay ? 5 : 6) - (enablePiggybacking ? 1 : 0), + metrics.getTotalOpCount()); assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -554,9 +567,9 @@ void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteExactlyMaxFlushSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testWriteExactlyMaxFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -645,9 +658,9 @@ void testWriteExactlyMaxFlushSize(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteMoreThanMaxFlushSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testWriteMoreThanMaxFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -732,9 +745,9 @@ void testWriteMoreThanMaxFlushSize(boolean flushDelay) throws Exception { assertEquals(writeChunkCount + 5, metrics.getContainerOpCountMetrics(WriteChunk)); // The previous flush did not trigger any action with flushDelay enabled - assertEquals(putBlockCount + (flushDelay ? 2 : 3), + assertEquals(putBlockCount + (flushDelay ? 2 : 3) + (enablePiggybacking ? 0 : 1), metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + (flushDelay ? 7 : 8), + assertEquals(totalOpCount + (flushDelay ? 7 : 8) + ((enablePiggybacking ? 0 : 1)), metrics.getTotalOpCount()); assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 708f4cec04b8..8d69da3ef3e1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -56,7 +56,10 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -79,10 +82,19 @@ void shutdown() { } } + private static Stream clientParameters() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testContainerClose(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void testContainerClose(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { testWatchForCommitWithCloseContainerException(client); testWatchForCommitWithSingleNodeRatis(client); @@ -174,10 +186,10 @@ private void testWatchForCommitWithCloseContainerException(OzoneClient client) } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @MethodSource("clientParameters") @Flaky("HDDS-6113") - void testWatchForCommitDatanodeFailure(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { String keyName = getKeyName(); OzoneOutputStream key = createKey(client, keyName); @@ -259,9 +271,9 @@ void testWatchForCommitDatanodeFailure(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void test2DatanodesFailure(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + @MethodSource("clientParameters") + void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { String keyName = getKeyName(); OzoneOutputStream key = createKey(client, keyName); @@ -560,10 +572,10 @@ private void testWatchForCommitWithSingleNodeRatis(OzoneClient client) } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @MethodSource("clientParameters") @Flaky("HDDS-6113") - void testDatanodeFailureWithSingleNode(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { String keyName = getKeyName(); OzoneOutputStream key = @@ -650,10 +662,10 @@ void testDatanodeFailureWithSingleNode(boolean flushDelay) throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testDatanodeFailureWithPreAllocation(boolean flushDelay) + @MethodSource("clientParameters") + void testDatanodeFailureWithPreAllocation(boolean flushDelay, boolean enablePiggybacking) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay); + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { String keyName = getKeyName(); OzoneOutputStream key =