Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,24 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
Expand All @@ -55,56 +47,40 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.test.NonHATests;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;

/**
* Tests Close Container Exception handling by Ozone Client.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Timeout(300)
public class TestCloseContainerHandlingByClient {

private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf = new OzoneConfiguration();
private static OzoneClient client;
private static ObjectStore objectStore;
private static int chunkSize;
private static int blockSize;
private static String volumeName;
private static String bucketName;
private static String keyString;

/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
public abstract class TestCloseContainerHandlingByClient implements NonHATests.TestCase {

private MiniOzoneCluster cluster;
private OzoneClient client;
private ObjectStore objectStore;
private int chunkSize;
private int blockSize;
private String volumeName;
private String bucketName;
private String keyString;

@BeforeAll
public static void init() throws Exception {
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;

OzoneClientConfig config = conf.getObject(OzoneClientConfig.class);
config.setChecksumType(ChecksumType.NONE);
conf.setFromObject(config);

conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getRpcClient(conf);
void init() throws Exception {
chunkSize = (int) cluster().getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 1024 * 1024, StorageUnit.BYTES);
blockSize = (int) cluster().getConf().getStorageSize(OZONE_SCM_BLOCK_SIZE, 4 * chunkSize, StorageUnit.BYTES);

cluster = cluster();
client = cluster().newClient();
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "closecontainerexceptionhandlingtest";
bucketName = volumeName;
volumeName = "vol-" + UUID.randomUUID();
bucketName = "bucket";
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
Expand All @@ -113,15 +89,9 @@ private String getKeyName() {
return UUID.randomUUID().toString();
}

/**
* Shutdown MiniDFSCluster.
*/
@AfterAll
public static void shutdown() {
void cleanup() {
IOUtils.closeQuietly(client);
if (cluster != null) {
cluster.shutdown();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,200 +17,92 @@

package org.apache.hadoop.ozone.client.rpc;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.IOException;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import static org.apache.hadoop.ozone.container.TestHelper.createStreamKey;
import static org.apache.hadoop.ozone.container.TestHelper.getDatanodeService;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.ozone.test.NonHATests;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Tests the containerStateMachine stream handling.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Timeout(300)
public class TestContainerStateMachineStream {
private MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
public abstract class TestContainerStateMachineStream implements NonHATests.TestCase {
private OzoneClient client;
private ObjectStore objectStore;
private String volumeName;
private String bucketName;
private int chunkSize;

@BeforeAll
void setup() throws Exception {
chunkSize = (int) cluster().getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 1024 * 1024, StorageUnit.BYTES);

private static final int CHUNK_SIZE = 100;
private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;

/**
* Create a MiniDFSCluster for testing.
*
* @throws IOException
*/
@BeforeEach
public void setup() throws Exception {
conf = new OzoneConfiguration();

OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamBufferFlushDelay(false);
conf.setFromObject(clientConfig);

conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 200, TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
TimeUnit.SECONDS);

RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10));
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
conf.setFromObject(ratisClientConfig);

DatanodeRatisServerConfig ratisServerConfig =
conf.getObject(DatanodeRatisServerConfig.class);
ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10));
conf.setFromObject(ratisServerConfig);

RatisClientConfig.RaftConfig raftClientConfig =
conf.getObject(RatisClientConfig.RaftConfig.class);
raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
conf.setFromObject(raftClientConfig);

ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
.setDataStreamMinPacketSize(1024)
.setBlockSize(BLOCK_SIZE)
.setChunkSize(CHUNK_SIZE)
.setStreamBufferFlushSize(FLUSH_SIZE)
.setStreamBufferMaxSize(MAX_FLUSH_SIZE)
.applyTo(conf);

conf.setLong(OzoneConfigKeys.HDDS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getRpcClient(conf);
client = cluster().newClient();
objectStore = client.getObjectStore();

volumeName = "testcontainerstatemachinestream";
volumeName = "vol-" + UUID.randomUUID();
bucketName = "teststreambucket";
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);

}

/**
* Shutdown MiniDFSCluster.
*/
@AfterEach
public void shutdown() {
@AfterAll
void shutdown() {
IOUtils.closeQuietly(client);
if (cluster != null) {
cluster.shutdown();
}
}

@Test
public void testContainerStateMachineForStreaming() throws Exception {
long size = CHUNK_SIZE + 1;

OzoneDataStreamOutput key = TestHelper.createStreamKey(
"ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore,
volumeName, bucketName);

byte[] data = ContainerTestHelper.generateData((int) size, true);
key.write(ByteBuffer.wrap(data));
key.flush();

KeyDataStreamOutput streamOutput =
(KeyDataStreamOutput) key.getByteBufStreamOutput();
List<OmKeyLocationInfo> locationInfoList =
streamOutput.getLocationInfoList();

key.close();

OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
cluster);

long bytesUsed = dn.getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).
getContainerData().getBytesUsed();

assertEquals(bytesUsed, size);
}

@ParameterizedTest
@ValueSource(ints = {-1, +1})
void testContainerStateMachineForStreaming(int offset) throws Exception {
final int size = chunkSize + offset;

@Test
public void testContainerStateMachineForStreamingSmallFile()
throws Exception {
long size = CHUNK_SIZE - 1;
final List<OmKeyLocationInfo> locationInfoList;
try (OzoneDataStreamOutput key = createStreamKey("key" + offset, ReplicationType.RATIS, size,
objectStore, volumeName, bucketName)) {

OzoneDataStreamOutput key = TestHelper.createStreamKey(
"ozone-stream-test-small-file.txt", ReplicationType.RATIS, size,
objectStore, volumeName, bucketName);
byte[] data = ContainerTestHelper.generateData(size, true);
key.write(ByteBuffer.wrap(data));
key.flush();

byte[] data = ContainerTestHelper.generateData((int) size, true);
key.write(ByteBuffer.wrap(data));
key.flush();
locationInfoList = assertInstanceOf(KeyDataStreamOutput.class, key.getByteBufStreamOutput())
.getLocationInfoList();
}

KeyDataStreamOutput streamOutput =
(KeyDataStreamOutput) key.getByteBufStreamOutput();
List<OmKeyLocationInfo> locationInfoList =
streamOutput.getLocationInfoList();
key.close();
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
cluster);

long bytesUsed = dn.getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).
getContainerData().getBytesUsed();

assertEquals(bytesUsed, size);
long bytesUsed = getDatanodeService(omKeyLocationInfo, cluster())
.getDatanodeStateMachine()
.getContainer()
.getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerData()
.getBytesUsed();

assertThat(bytesUsed)
// container may have previous data
.isGreaterThanOrEqualTo(size);
}

}
Loading
Loading