Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void flush() throws IOException {
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
throw new RuntimeException(msg, e);
throw e;
}
}
}
Expand Down Expand Up @@ -553,7 +553,7 @@ public void close() throws IOException {
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
throw new RuntimeException(msg, e);
throw e;
} finally {
cleanup(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -29,9 +30,9 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
Expand All @@ -55,20 +56,23 @@
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.ozone.test.LambdaTestUtils;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand All @@ -93,7 +97,6 @@
import static org.junit.Assert.fail;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;

/**
* Tests the containerStateMachine failure handling.
Expand Down Expand Up @@ -135,8 +138,8 @@ public static void init() throws Exception {

RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10));
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(20));
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(20));
conf.setFromObject(ratisClientConfig);

DatanodeRatisServerConfig ratisServerConfig =
Expand All @@ -148,7 +151,7 @@ public static void init() throws Exception {
RatisClientConfig.RaftConfig raftClientConfig =
conf.getObject(RatisClientConfig.RaftConfig.class);
raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(20));
conf.setFromObject(raftClientConfig);

conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
Expand All @@ -169,16 +172,6 @@ public static void init() throws Exception {
random = new Random();
}

@BeforeEach
public void restartDatanode()
throws InterruptedException, TimeoutException, AuthenticationException,
IOException {
for (int i=0; i < cluster.getHddsDatanodes().size(); i++) {
cluster.restartHddsDatanode(i, true);
}
cluster.restartStorageContainerManager(true);
}

/**
* Shutdown MiniDFSCluster.
*/
Expand Down Expand Up @@ -677,4 +670,138 @@ public void testWriteStateMachineDataIdempotencyWithClosedContainer()

r2.run();
}

@Test
public void testContainerStateMachineSingleFailureRetry()
throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis1", 1024, ReplicationType.RATIS,
ReplicationFactor.THREE, new HashMap<>());

key.write("ratis".getBytes(UTF_8));
key.flush();
key.write("ratis".getBytes(UTF_8));
key.write("ratis".getBytes(UTF_8));

KeyOutputStream groupOutputStream = (KeyOutputStream) key.
getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());

OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);

induceFollowerFailure(omKeyLocationInfo, 2);

try {
key.flush();
key.write("ratis".getBytes(UTF_8));
key.flush();
key.close();
} catch (Exception ioe) {
// Should not fail..
Assert.fail("Exception " + ioe.getMessage());
}
validateData("ratis1", 2, "ratisratisratisratis");
}

@Test
public void testContainerStateMachineDualFailureRetry()
throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis2", 1024, ReplicationType.RATIS,
ReplicationFactor.THREE, new HashMap<>());

key.write("ratis".getBytes(UTF_8));
key.flush();
key.write("ratis".getBytes(UTF_8));
key.write("ratis".getBytes(UTF_8));

KeyOutputStream groupOutputStream = (KeyOutputStream) key.
getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());

OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);

induceFollowerFailure(omKeyLocationInfo, 1);

try {
key.flush();
key.write("ratis".getBytes(UTF_8));
key.flush();
key.close();
} catch (Exception ioe) {
// Should not fail..
Assert.fail("Exception " + ioe.getMessage());
}
validateData("ratis1", 2, "ratisratisratisratis");
}

private void induceFollowerFailure(OmKeyLocationInfo omKeyLocationInfo,
int failureCount) {
UUID leader = omKeyLocationInfo.getPipeline().getLeaderId();
Set<HddsDatanodeService> datanodeSet =
TestHelper.getDatanodeServices(cluster,
omKeyLocationInfo.getPipeline());
int count = 0;
for (HddsDatanodeService dn : datanodeSet) {
UUID dnUuid = dn.getDatanodeDetails().getUuid();
if (!dnUuid.equals(leader)) {
count++;
long containerID = omKeyLocationInfo.getContainerID();
Container container = dn
.getDatanodeStateMachine()
.getContainer()
.getContainerSet()
.getContainer(containerID);
if (container != null) {
ContainerData containerData =
container
.getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
KeyValueContainerData keyValueContainerData =
(KeyValueContainerData) containerData;
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
}

if (count == failureCount) {
break;
}
}
}
}

private void validateData(String key, int locationCount, String payload) {
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(key)
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = null;
try {
keyInfo = cluster.getOzoneManager().lookupKey(omKeyArgs);

Assert.assertEquals(locationCount,
keyInfo.getLatestVersionLocations().getLocationListCount());
OzoneInputStream
o = objectStore
.getVolume(volumeName)
.getBucket(bucketName)
.readKey(key);
byte[] buffer = new byte[1024];
o.read(buffer, 0, 1024);
int end = ArrayUtils.indexOf(buffer, (byte) 0);
String response = new String(buffer, 0,
end,
StandardCharsets.UTF_8);
Assert.assertEquals(payload, response);
} catch (IOException e) {
Assert.fail("Exception not expected " + e.getMessage());
}
}
}