diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 859d8080e6a5..150c418a85c3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -487,7 +487,7 @@ public void flush() throws IOException { } catch (Throwable e) { String msg = "Failed to flush. error: " + e.getMessage(); LOG.error(msg, e); - throw new RuntimeException(msg, e); + throw e; } } } @@ -553,7 +553,7 @@ public void close() throws IOException { } catch (Throwable e) { String msg = "Failed to flush. error: " + e.getMessage(); LOG.error(msg, e); - throw new RuntimeException(msg, e); + throw e; } finally { cleanup(false); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 4f7e183586a7..763ec9fabf79 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -29,9 +30,9 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -55,20 +56,23 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.ozone.test.LambdaTestUtils; import static java.nio.charset.StandardCharsets.UTF_8; @@ -93,7 +97,6 @@ import static org.junit.Assert.fail; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.jupiter.api.BeforeEach; /** * Tests the containerStateMachine failure handling. @@ -135,8 +138,8 @@ public static void init() throws Exception { RatisClientConfig ratisClientConfig = conf.getObject(RatisClientConfig.class); - ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10)); - ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10)); + ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(20)); + ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(20)); conf.setFromObject(ratisClientConfig); DatanodeRatisServerConfig ratisServerConfig = @@ -148,7 +151,7 @@ public static void init() throws Exception { RatisClientConfig.RaftConfig raftClientConfig = conf.getObject(RatisClientConfig.RaftConfig.class); raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3)); - raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10)); + raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(20)); conf.setFromObject(raftClientConfig); conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); @@ -169,16 +172,6 @@ public static void init() throws Exception { random = new Random(); } - @BeforeEach - public void restartDatanode() - throws InterruptedException, TimeoutException, AuthenticationException, - IOException { - for (int i=0; i < cluster.getHddsDatanodes().size(); i++) { - cluster.restartHddsDatanode(i, true); - } - cluster.restartStorageContainerManager(true); - } - /** * Shutdown MiniDFSCluster. */ @@ -677,4 +670,138 @@ public void testWriteStateMachineDataIdempotencyWithClosedContainer() r2.run(); } + + @Test + public void testContainerStateMachineSingleFailureRetry() + throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis1", 1024, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + + key.write("ratis".getBytes(UTF_8)); + key.flush(); + key.write("ratis".getBytes(UTF_8)); + key.write("ratis".getBytes(UTF_8)); + + KeyOutputStream groupOutputStream = (KeyOutputStream) key. + getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + + induceFollowerFailure(omKeyLocationInfo, 2); + + try { + key.flush(); + key.write("ratis".getBytes(UTF_8)); + key.flush(); + key.close(); + } catch (Exception ioe) { + // Should not fail.. + Assert.fail("Exception " + ioe.getMessage()); + } + validateData("ratis1", 2, "ratisratisratisratis"); + } + + @Test + public void testContainerStateMachineDualFailureRetry() + throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis2", 1024, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + + key.write("ratis".getBytes(UTF_8)); + key.flush(); + key.write("ratis".getBytes(UTF_8)); + key.write("ratis".getBytes(UTF_8)); + + KeyOutputStream groupOutputStream = (KeyOutputStream) key. + getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + + induceFollowerFailure(omKeyLocationInfo, 1); + + try { + key.flush(); + key.write("ratis".getBytes(UTF_8)); + key.flush(); + key.close(); + } catch (Exception ioe) { + // Should not fail.. + Assert.fail("Exception " + ioe.getMessage()); + } + validateData("ratis1", 2, "ratisratisratisratis"); + } + + private void induceFollowerFailure(OmKeyLocationInfo omKeyLocationInfo, + int failureCount) { + UUID leader = omKeyLocationInfo.getPipeline().getLeaderId(); + Set datanodeSet = + TestHelper.getDatanodeServices(cluster, + omKeyLocationInfo.getPipeline()); + int count = 0; + for (HddsDatanodeService dn : datanodeSet) { + UUID dnUuid = dn.getDatanodeDetails().getUuid(); + if (!dnUuid.equals(leader)) { + count++; + long containerID = omKeyLocationInfo.getContainerID(); + Container container = dn + .getDatanodeStateMachine() + .getContainer() + .getContainerSet() + .getContainer(containerID); + if (container != null) { + ContainerData containerData = + container + .getContainerData(); + Assert.assertTrue(containerData instanceof KeyValueContainerData); + KeyValueContainerData keyValueContainerData = + (KeyValueContainerData) containerData; + FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath())); + } + + if (count == failureCount) { + break; + } + } + } + } + + private void validateData(String key, int locationCount, String payload) { + OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(key) + .setRefreshPipeline(true) + .build(); + OmKeyInfo keyInfo = null; + try { + keyInfo = cluster.getOzoneManager().lookupKey(omKeyArgs); + + Assert.assertEquals(locationCount, + keyInfo.getLatestVersionLocations().getLocationListCount()); + OzoneInputStream + o = objectStore + .getVolume(volumeName) + .getBucket(bucketName) + .readKey(key); + byte[] buffer = new byte[1024]; + o.read(buffer, 0, 1024); + int end = ArrayUtils.indexOf(buffer, (byte) 0); + String response = new String(buffer, 0, + end, + StandardCharsets.UTF_8); + Assert.assertEquals(payload, response); + } catch (IOException e) { + Assert.fail("Exception not expected " + e.getMessage()); + } + } }