diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index de11ec19d718..dd124be7236c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -43,7 +43,6 @@ import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.ozone.test.GenericTestUtils; -import org.apache.ozone.test.tag.Flaky; import org.apache.ratis.server.protocol.TermIndex; import org.assertj.core.api.Fail; import org.junit.Assert; @@ -67,6 +66,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -160,7 +160,6 @@ public void shutdown() { } @Test - @Flaky("HDDS-8355") public void testInstallSnapshot() throws Exception { // Get the leader OM String leaderOMNodeId = OmFailoverProxyUtil @@ -220,7 +219,7 @@ public void testInstallSnapshot() throws Exception { // Verify checkpoint installation was happened. String msg = "Reloaded OM state"; - Assert.assertTrue(logCapture.getOutput().contains(msg)); + assertLogCapture(logCapture, msg); // Verify that the follower OM's DB contains the transactions which were // made while it was inactive. @@ -240,8 +239,8 @@ public void testInstallSnapshot() throws Exception { return followerOM.isOmRpcServerRunning(); }, 100, 5000); - Assert.assertTrue(logCapture.getOutput().contains( - "Install Checkpoint is finished")); + assertLogCapture(logCapture, + "Install Checkpoint is finished"); // Read & Write after snapshot installed. List newKeys = writeKeys(1); @@ -360,9 +359,8 @@ public void testInstallSnapshotWithClientWrite() throws Exception { // Verify checkpoint installation was happened. String msg = "Reloaded OM state"; - Assert.assertTrue(logCapture.getOutput().contains(msg)); - Assert.assertTrue(logCapture.getOutput().contains( - "Install Checkpoint is finished")); + assertLogCapture(logCapture, msg); + assertLogCapture(logCapture, "Install Checkpoint is finished"); long followerOMLastAppliedIndex = followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex(); @@ -493,9 +491,8 @@ public void testInstallSnapshotWithClientRead() throws Exception { // Wait installation finish Thread.sleep(5000); // Verify checkpoint installation was happened. - Assert.assertTrue(logCapture.getOutput().contains("Reloaded OM state")); - Assert.assertTrue(logCapture.getOutput().contains( - "Install Checkpoint is finished")); + assertLogCapture(logCapture, "Reloaded OM state"); + assertLogCapture(logCapture, "Install Checkpoint is finished"); } @Test @@ -544,7 +541,7 @@ public void testInstallOldCheckpointFailure() throws Exception { String errorMsg = "Cannot proceed with InstallSnapshot as OM is at " + "TermIndex " + followerTermIndex + " and checkpoint has lower " + "TermIndex"; - Assert.assertTrue(logCapture.getOutput().contains(errorMsg)); + assertLogCapture(logCapture, errorMsg); Assert.assertNull("OM installed checkpoint even though checkpoint " + "logIndex is less than it's lastAppliedIndex", newTermIndex); Assert.assertEquals(followerTermIndex, @@ -552,7 +549,7 @@ public void testInstallOldCheckpointFailure() throws Exception { String msg = "OM DB is not stopped. Started services with Term: " + followerTermIndex.getTerm() + " and Index: " + followerTermIndex.getIndex(); - Assert.assertTrue(logCapture.getOutput().contains(msg)); + assertLogCapture(logCapture, msg); } @Test @@ -605,13 +602,10 @@ public void testInstallCorruptedCheckpointFailure() throws Exception { leaderCheckpointTrxnInfo); // Wait checkpoint installation to be finished. - GenericTestUtils.waitFor(() -> { - Assert.assertTrue(logCapture.getOutput().contains("System Exit: " + - "Failed to reload OM state and instantiate services.")); - return true; - }, 100, 3000); + assertLogCapture(logCapture, "System Exit: " + + "Failed to reload OM state and instantiate services."); String msg = "RPC server is stopped"; - Assert.assertTrue(logCapture.getOutput().contains(msg)); + assertLogCapture(logCapture, msg); } private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM) @@ -678,6 +672,14 @@ private void readKeys(List keys) throws IOException { } } + private void assertLogCapture(GenericTestUtils.LogCapturer logCapture, + String msg) + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> { + return logCapture.getOutput().contains(msg); + }, 100, 5000); + } + private static class DummyExitManager extends ExitManager { @Override public void exitSystem(int status, String message, Throwable throwable,