Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
Expand Down Expand Up @@ -52,7 +53,8 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk;
Expand Down Expand Up @@ -147,11 +149,21 @@ void shutdown() {
}
}

private static Stream<Arguments> clientParameters() {
return Stream.of(
Arguments.of(true, true),
Arguments.of(true, false),
Arguments.of(false, true),
Arguments.of(false, false)
);
}

static OzoneClientConfig newClientConfig(ConfigurationSource source,
boolean flushDelay) {
boolean flushDelay, boolean enablePiggybacking) {
OzoneClientConfig clientConfig = source.getObject(OzoneClientConfig.class);
clientConfig.setChecksumType(ChecksumType.NONE);
clientConfig.setStreamBufferFlushDelay(flushDelay);
clientConfig.setEnablePutblockPiggybacking(enablePiggybacking);
return clientConfig;
}

Expand All @@ -163,9 +175,9 @@ static OzoneClient newClient(OzoneConfiguration conf,
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteLessThanChunkSize(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testWriteLessThanChunkSize(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
Expand Down Expand Up @@ -258,9 +270,9 @@ void testWriteLessThanChunkSize(boolean flushDelay) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteExactlyFlushSize(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testWriteExactlyFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
Expand Down Expand Up @@ -377,9 +389,9 @@ void testWriteExactlyFlushSize(boolean flushDelay) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testWriteMoreThanChunkSize(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
Expand Down Expand Up @@ -426,7 +438,7 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception {
key.flush();
assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(WriteChunk));
assertEquals(putBlockCount,
assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
metrics.getContainerOpCountMetrics(PutBlock));
assertEquals(pendingWriteChunkCount,
metrics.getPendingContainerOpCountMetrics(WriteChunk));
Expand Down Expand Up @@ -455,19 +467,19 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception {
metrics.getPendingContainerOpCountMetrics(PutBlock));
assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(WriteChunk));
assertEquals(putBlockCount + 1,
assertEquals(putBlockCount + ((enablePiggybacking) ? 1 : 2),
metrics.getContainerOpCountMetrics(PutBlock));
assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
assertEquals(totalOpCount + ((enablePiggybacking) ? 3 : 4), metrics.getTotalOpCount());
assertEquals(0, keyOutputStream.getStreamEntries().size());

validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET);
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testWriteMoreThanFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
Expand Down Expand Up @@ -539,9 +551,10 @@ void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception {
assertEquals(writeChunkCount + 3,
metrics.getContainerOpCountMetrics(WriteChunk));
// If the flushDelay was disabled, it sends PutBlock with the data in the buffer.
assertEquals(putBlockCount + (flushDelay ? 1 : 2),
assertEquals(putBlockCount + (flushDelay ? 2 : 3) - (enablePiggybacking ? 1 : 0),
metrics.getContainerOpCountMetrics(PutBlock));
assertEquals(totalOpCount + (flushDelay ? 4 : 5), metrics.getTotalOpCount());
assertEquals(totalOpCount + (flushDelay ? 5 : 6) - (enablePiggybacking ? 1 : 0),
metrics.getTotalOpCount());
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Expand All @@ -554,9 +567,9 @@ void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteExactlyMaxFlushSize(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testWriteExactlyMaxFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
Expand Down Expand Up @@ -645,9 +658,9 @@ void testWriteExactlyMaxFlushSize(boolean flushDelay) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteMoreThanMaxFlushSize(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testWriteMoreThanMaxFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
Expand Down Expand Up @@ -732,9 +745,9 @@ void testWriteMoreThanMaxFlushSize(boolean flushDelay) throws Exception {
assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(WriteChunk));
// The previous flush did not trigger any action with flushDelay enabled
assertEquals(putBlockCount + (flushDelay ? 2 : 3),
assertEquals(putBlockCount + (flushDelay ? 2 : 3) + (enablePiggybacking ? 0 : 1),
metrics.getContainerOpCountMetrics(PutBlock));
assertEquals(totalOpCount + (flushDelay ? 7 : 8),
assertEquals(totalOpCount + (flushDelay ? 7 : 8) + ((enablePiggybacking ? 0 : 1)),
metrics.getTotalOpCount());
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

/**
* Tests failure detection and handling in BlockOutputStream Class.
Expand All @@ -79,10 +82,19 @@ void shutdown() {
}
}

private static Stream<Arguments> clientParameters() {
return Stream.of(
Arguments.of(true, true),
Arguments.of(true, false),
Arguments.of(false, true),
Arguments.of(false, false)
);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testContainerClose(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void testContainerClose(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
testWatchForCommitWithCloseContainerException(client);
testWatchForCommitWithSingleNodeRatis(client);
Expand Down Expand Up @@ -174,10 +186,10 @@ private void testWatchForCommitWithCloseContainerException(OzoneClient client)
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@MethodSource("clientParameters")
@Flaky("HDDS-6113")
void testWatchForCommitDatanodeFailure(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
OzoneOutputStream key = createKey(client, keyName);
Expand Down Expand Up @@ -259,9 +271,9 @@ void testWatchForCommitDatanodeFailure(boolean flushDelay) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void test2DatanodesFailure(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
@MethodSource("clientParameters")
void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
OzoneOutputStream key = createKey(client, keyName);
Expand Down Expand Up @@ -560,10 +572,10 @@ private void testWatchForCommitWithSingleNodeRatis(OzoneClient client)
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@MethodSource("clientParameters")
@Flaky("HDDS-6113")
void testDatanodeFailureWithSingleNode(boolean flushDelay) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean enablePiggybacking) throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
OzoneOutputStream key =
Expand Down Expand Up @@ -650,10 +662,10 @@ void testDatanodeFailureWithSingleNode(boolean flushDelay) throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testDatanodeFailureWithPreAllocation(boolean flushDelay)
@MethodSource("clientParameters")
void testDatanodeFailureWithPreAllocation(boolean flushDelay, boolean enablePiggybacking)
throws Exception {
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking);
try (OzoneClient client = newClient(cluster.getConf(), config)) {
String keyName = getKeyName();
OzoneOutputStream key =
Expand Down