Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.server.protocol.TermIndex;
import org.assertj.core.api.Fail;
import org.junit.Assert;
Expand All @@ -67,6 +66,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -160,7 +160,6 @@ public void shutdown() {
}

@Test
@Flaky("HDDS-8355")
public void testInstallSnapshot() throws Exception {
// Get the leader OM
String leaderOMNodeId = OmFailoverProxyUtil
Expand Down Expand Up @@ -220,7 +219,7 @@ public void testInstallSnapshot() throws Exception {

// Verify checkpoint installation was happened.
String msg = "Reloaded OM state";
Assert.assertTrue(logCapture.getOutput().contains(msg));
assertLogCapture(logCapture, msg);

// Verify that the follower OM's DB contains the transactions which were
// made while it was inactive.
Expand All @@ -240,8 +239,8 @@ public void testInstallSnapshot() throws Exception {
return followerOM.isOmRpcServerRunning();
}, 100, 5000);

Assert.assertTrue(logCapture.getOutput().contains(
"Install Checkpoint is finished"));
assertLogCapture(logCapture,
"Install Checkpoint is finished");

// Read & Write after snapshot installed.
List<String> newKeys = writeKeys(1);
Expand Down Expand Up @@ -360,9 +359,8 @@ public void testInstallSnapshotWithClientWrite() throws Exception {

// Verify checkpoint installation was happened.
String msg = "Reloaded OM state";
Assert.assertTrue(logCapture.getOutput().contains(msg));
Assert.assertTrue(logCapture.getOutput().contains(
"Install Checkpoint is finished"));
assertLogCapture(logCapture, msg);
assertLogCapture(logCapture, "Install Checkpoint is finished");

long followerOMLastAppliedIndex =
followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
Expand Down Expand Up @@ -493,9 +491,8 @@ public void testInstallSnapshotWithClientRead() throws Exception {
// Wait installation finish
Thread.sleep(5000);
// Verify checkpoint installation was happened.
Assert.assertTrue(logCapture.getOutput().contains("Reloaded OM state"));
Assert.assertTrue(logCapture.getOutput().contains(
"Install Checkpoint is finished"));
assertLogCapture(logCapture, "Reloaded OM state");
assertLogCapture(logCapture, "Install Checkpoint is finished");
}

@Test
Expand Down Expand Up @@ -544,15 +541,15 @@ public void testInstallOldCheckpointFailure() throws Exception {
String errorMsg = "Cannot proceed with InstallSnapshot as OM is at " +
"TermIndex " + followerTermIndex + " and checkpoint has lower " +
"TermIndex";
Assert.assertTrue(logCapture.getOutput().contains(errorMsg));
assertLogCapture(logCapture, errorMsg);
Assert.assertNull("OM installed checkpoint even though checkpoint " +
"logIndex is less than it's lastAppliedIndex", newTermIndex);
Assert.assertEquals(followerTermIndex,
followerRatisServer.getLastAppliedTermIndex());
String msg = "OM DB is not stopped. Started services with Term: " +
followerTermIndex.getTerm() + " and Index: " +
followerTermIndex.getIndex();
Assert.assertTrue(logCapture.getOutput().contains(msg));
assertLogCapture(logCapture, msg);
}

@Test
Expand Down Expand Up @@ -605,13 +602,10 @@ public void testInstallCorruptedCheckpointFailure() throws Exception {
leaderCheckpointTrxnInfo);

// Wait checkpoint installation to be finished.
GenericTestUtils.waitFor(() -> {
Assert.assertTrue(logCapture.getOutput().contains("System Exit: " +
"Failed to reload OM state and instantiate services."));
return true;
}, 100, 3000);
assertLogCapture(logCapture, "System Exit: " +
"Failed to reload OM state and instantiate services.");
String msg = "RPC server is stopped";
Assert.assertTrue(logCapture.getOutput().contains(msg));
assertLogCapture(logCapture, msg);
}

private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM)
Expand Down Expand Up @@ -678,6 +672,14 @@ private void readKeys(List<String> keys) throws IOException {
}
}

private void assertLogCapture(GenericTestUtils.LogCapturer logCapture,
String msg)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> {
return logCapture.getOutput().contains(msg);
}, 100, 5000);
}

private static class DummyExitManager extends ExitManager {
@Override
public void exitSystem(int status, String message, Throwable throwable,
Expand Down