Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,7 @@ private OzoneConsts() {

public static final String CONTAINER_DB_TYPE_ROCKSDB = "RocksDB";
public static final String CONTAINER_DB_TYPE_LEVELDB = "LevelDB";

// An on-disk transient marker file used when replacing DB with checkpoint
public static final String DB_TRANSIENT_MARKER = "dbInconsistentMarker";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.util;

import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;

/**
* An Exit Manager used to shutdown service in case of unrecoverable error.
* This class will be helpful to test exit functionality.
*/
public class ExitManager {

public void exitSystem(int status, String message, Throwable throwable,
Logger log) {
ExitUtils.terminate(1, message, throwable, log);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand All @@ -33,10 +35,14 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.util.ExitManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.server.protocol.TermIndex;

import org.apache.commons.lang3.RandomStringUtils;
import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
import org.apache.ratis.server.protocol.TermIndex;
import static org.junit.Assert.assertTrue;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -45,6 +51,8 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/**
* Tests the Ratis snaphsots feature in OM.
Expand All @@ -59,6 +67,10 @@ public class TestOMRatisSnapshots {
private String scmId;
private String omServiceId;
private int numOfOMs = 3;
private OzoneBucket ozoneBucket;
private String volumeName;
private String bucketName;

private static final long SNAPSHOT_THRESHOLD = 50;
private static final int LOG_PURGE_GAP = 50;

Expand Down Expand Up @@ -95,6 +107,20 @@ public void init() throws Exception {
cluster.waitForClusterToBeReady();
objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf)
.getObjectStore();

volumeName = "volume" + RandomStringUtils.randomNumeric(5);
bucketName = "bucket" + RandomStringUtils.randomNumeric(5);

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner("user" + RandomStringUtils.randomNumeric(5))
.setAdmin("admin" + RandomStringUtils.randomNumeric(5))
.build();

objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);

retVolumeinfo.createBucket(bucketName);
ozoneBucket = retVolumeinfo.getBucket(bucketName);
}

/**
Expand Down Expand Up @@ -125,37 +151,13 @@ public void testInstallSnapshot() throws Exception {
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);

// Do some transactions so that the log index increases
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();

objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);

retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);

long leaderOMappliedLogIndex =
leaderRatisServer.getLastAppliedTermIndex().getIndex();

List<String> keys = new ArrayList<>();
while (leaderOMappliedLogIndex < 2000) {
keys.add(createKey(ozoneBucket));
leaderOMappliedLogIndex =
leaderRatisServer.getLastAppliedTermIndex().getIndex();
}
List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);

// Get the latest db checkpoint from the leader OM.
OMTransactionInfo omTransactionInfo =
OMTransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
TermIndex leaderOMTermIndex =
TermIndex.newTermIndex(omTransactionInfo.getCurrentTerm(),
TermIndex.newTermIndex(omTransactionInfo.getTerm(),
omTransactionInfo.getTransactionIndex());
long leaderOMSnaphsotIndex = leaderOMTermIndex.getIndex();
long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
Expand All @@ -169,30 +171,20 @@ public void testInstallSnapshot() throws Exception {
// The recently started OM should be lagging behind the leader OM.
long followerOMLastAppliedIndex =
followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
Assert.assertTrue(
assertTrue(
followerOMLastAppliedIndex < leaderOMSnaphsotIndex);

// Install leader OM's db checkpoint on the lagging OM.
File oldDbLocation = followerOM.getMetadataManager().getStore()
.getDbLocation();
followerOM.getOmRatisServer().getOmStateMachine().pause();
followerOM.getMetadataManager().getStore().close();
followerOM.replaceOMDBWithCheckpoint(leaderOMSnaphsotIndex, oldDbLocation,
leaderDbCheckpoint.getCheckpointLocation());

// Reload the follower OM with new DB checkpoint from the leader OM.
followerOM.reloadOMState(leaderOMSnaphsotIndex, leaderOMSnapshotTermIndex);
followerOM.getOmRatisServer().getOmStateMachine().unpause(
leaderOMSnaphsotIndex, leaderOMSnapshotTermIndex);

// After the new checkpoint is loaded and state machine is unpaused, the
// follower OM lastAppliedIndex must match the snapshot index of the
// checkpoint.
followerOM.installCheckpoint(leaderOMNodeId, leaderDbCheckpoint);

// After the new checkpoint is installed, the follower OM
// lastAppliedIndex must >= the snapshot index of the checkpoint. It
// could be great than snapshot index if there is any conf entry from ratis.
followerOMLastAppliedIndex = followerOM.getOmRatisServer()
.getLastAppliedTermIndex().getIndex();
Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex);
Assert.assertEquals(leaderOMSnapshotTermIndex,
followerOM.getOmRatisServer().getLastAppliedTermIndex().getTerm());
assertTrue(followerOMLastAppliedIndex >= leaderOMSnaphsotIndex);
assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex()
.getTerm() >= leaderOMSnapshotTermIndex);

// Verify that the follower OM's DB contains the transactions which were
// made while it was inactive.
Expand All @@ -206,4 +198,133 @@ public void testInstallSnapshot() throws Exception {
followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
}
}

@Test
public void testInstallOldCheckpointFailure() throws Exception {
// Get the leader OM
String leaderOMNodeId = OmFailoverProxyUtil
.getFailoverProxyProvider(objectStore.getClientProxy())
.getCurrentProxyOMNodeId();

OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);

// Find the inactive OM and start it
String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
if (cluster.isOMActive(followerNodeId)) {
followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
}
cluster.startInactiveOM(followerNodeId);

OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer();

// Do some transactions so that the log index increases on follower OM
writeKeysToIncreaseLogIndex(followerRatisServer, 100);

TermIndex leaderCheckpointTermIndex = leaderOM.getOmRatisServer()
.getLastAppliedTermIndex();
DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore()
.getCheckpoint(false);

// Do some more transactions to increase the log index further on
// follower OM such that it is more than the checkpoint index taken on
// leader OM.
writeKeysToIncreaseLogIndex(followerOM.getOmRatisServer(),
leaderCheckpointTermIndex.getIndex() + 100);

GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
GenericTestUtils.LogCapturer logCapture =
GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);

// Install the old checkpoint on the follower OM. This should fail as the
// followerOM is already ahead of that transactionLogIndex and the OM
// state should be reloaded.
TermIndex followerTermIndex = followerRatisServer.getLastAppliedTermIndex();
TermIndex newTermIndex = followerOM.installCheckpoint(
leaderOMNodeId, leaderDbCheckpoint);

String errorMsg = "Cannot proceed with InstallSnapshot as OM is at " +
"TermIndex " + followerTermIndex + " and checkpoint has lower " +
"TermIndex";
Assert.assertTrue(logCapture.getOutput().contains(errorMsg));
Assert.assertNull("OM installed checkpoint even though checkpoint " +
"logIndex is less than it's lastAppliedIndex", newTermIndex);
Assert.assertEquals(followerTermIndex,
followerRatisServer.getLastAppliedTermIndex());
}

@Test
public void testInstallCorruptedCheckpointFailure() throws Exception {
// Get the leader OM
String leaderOMNodeId = OmFailoverProxyUtil
.getFailoverProxyProvider(objectStore.getClientProxy())
.getCurrentProxyOMNodeId();

OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();

// Find the inactive OM
String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
if (cluster.isOMActive(followerNodeId)) {
followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
}
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer();

// Do some transactions so that the log index increases
writeKeysToIncreaseLogIndex(leaderRatisServer, 100);

DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore()
.getCheckpoint(false);
Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation();
OMTransactionInfo leaderCheckpointTrxnInfo = OzoneManagerRatisUtils
.getTrxnInfoFromCheckpoint(conf, leaderCheckpointLocation);

// Corrupt the leader checkpoint and install that on the OM. The
// operation should fail and OM should shutdown.
boolean delete = true;
for (File file : leaderCheckpointLocation.toFile()
.listFiles()) {
if (file.getName().contains(".sst")) {
if (delete) {
file.delete();
delete = false;
} else {
delete = true;
}
}
}

GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.ERROR);
GenericTestUtils.LogCapturer logCapture =
GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
followerOM.setExitManagerForTesting(new DummyExitManager());

followerOM.installCheckpoint(leaderOMNodeId, leaderCheckpointLocation,
leaderCheckpointTrxnInfo);

Assert.assertTrue(logCapture.getOutput().contains("System Exit: " +
"Failed to reload OM state and instantiate services."));
}

private List<String> writeKeysToIncreaseLogIndex(
OzoneManagerRatisServer omRatisServer, long targetLogIndex)
throws IOException, InterruptedException {
List<String> keys = new ArrayList<>();
long logIndex = omRatisServer.getLastAppliedTermIndex().getIndex();
while (logIndex < targetLogIndex) {
keys.add(createKey(ozoneBucket));
Thread.sleep(100);
logIndex = omRatisServer.getLastAppliedTermIndex().getIndex();
}
return keys;
}

private class DummyExitManager extends ExitManager {
@Override
public void exitSystem(int status, String message, Throwable throwable,
Logger log) {
log.error("System Exit: " + message, throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.UUID;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand All @@ -31,11 +32,10 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testDownloadCheckpoint() throws Exception {
.getOzoneManagerDBSnapshot(leaderOMNodeId);

long leaderSnapshotIndex = leaderOM.getRatisSnapshotIndex();
long downloadedSnapshotIndex = getDownloadSnapshotIndex(omSnapshot);
long downloadedSnapshotIndex = getDownloadedSnapshotIndex(omSnapshot);

// The snapshot index downloaded from leader OM should match the ratis
// snapshot index on the leader OM
Expand All @@ -133,21 +133,13 @@ public void testDownloadCheckpoint() throws Exception {
leaderSnapshotIndex, downloadedSnapshotIndex);
}

private long getDownloadSnapshotIndex(DBCheckpoint dbCheckpoint)
private long getDownloadedSnapshotIndex(DBCheckpoint dbCheckpoint)
throws Exception {

OzoneConfiguration configuration = new OzoneConfiguration(conf);
configuration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
dbCheckpoint.getCheckpointLocation().getParent().toString());

OmMetadataManagerImpl omMetadataManager =
new OmMetadataManagerImpl(configuration);

long transactionIndex =
OMTransactionInfo.readTransactionInfo(omMetadataManager)
.getTransactionIndex();
omMetadataManager.stop();
return transactionIndex;
OMTransactionInfo trxnInfoFromCheckpoint =
OzoneManagerRatisUtils.getTrxnInfoFromCheckpoint(conf,
dbCheckpoint.getCheckpointLocation());

return trxnInfoFromCheckpoint.getTransactionIndex();
}
}
Loading