diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index e340b3231491..4b380948abd9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -366,4 +366,7 @@ private OzoneConsts() { public static final String CONTAINER_DB_TYPE_ROCKSDB = "RocksDB"; public static final String CONTAINER_DB_TYPE_LEVELDB = "LevelDB"; + + // An on-disk transient marker file used when replacing DB with checkpoint + public static final String DB_TRANSIENT_MARKER = "dbInconsistentMarker"; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/ExitManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/ExitManager.java new file mode 100644 index 000000000000..4a83c1d8c239 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/ExitManager.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.util; + +import org.apache.ratis.util.ExitUtils; +import org.slf4j.Logger; + +/** + * An Exit Manager used to shutdown service in case of unrecoverable error. + * This class will be helpful to test exit functionality. + */ +public class ExitManager { + + public void exitSystem(int status, String message, Throwable throwable, + Logger log) { + ExitUtils.terminate(1, message, throwable, log); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index 0cfbea4ef9c2..ef08abd89096 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -18,10 +18,12 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -33,10 +35,14 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.util.ExitManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.server.protocol.TermIndex; -import org.apache.commons.lang3.RandomStringUtils; import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey; -import org.apache.ratis.server.protocol.TermIndex; +import static org.junit.Assert.assertTrue; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,6 +51,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.event.Level; /** * Tests the Ratis snaphsots feature in OM. @@ -59,6 +67,10 @@ public class TestOMRatisSnapshots { private String scmId; private String omServiceId; private int numOfOMs = 3; + private OzoneBucket ozoneBucket; + private String volumeName; + private String bucketName; + private static final long SNAPSHOT_THRESHOLD = 50; private static final int LOG_PURGE_GAP = 50; @@ -95,6 +107,20 @@ public void init() throws Exception { cluster.waitForClusterToBeReady(); objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) .getObjectStore(); + + volumeName = "volume" + RandomStringUtils.randomNumeric(5); + bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner("user" + RandomStringUtils.randomNumeric(5)) + .setAdmin("admin" + RandomStringUtils.randomNumeric(5)) + .build(); + + objectStore.createVolume(volumeName, createVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + retVolumeinfo.createBucket(bucketName); + ozoneBucket = retVolumeinfo.getBucket(bucketName); } /** @@ -125,37 +151,13 @@ public void testInstallSnapshot() throws Exception { OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); // Do some transactions so that the log index increases - String userName = "user" + RandomStringUtils.randomNumeric(5); - String adminName = "admin" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); - - VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() - .setOwner(userName) - .setAdmin(adminName) - .build(); - - objectStore.createVolume(volumeName, createVolumeArgs); - OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); - - retVolumeinfo.createBucket(bucketName); - OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); - - long leaderOMappliedLogIndex = - leaderRatisServer.getLastAppliedTermIndex().getIndex(); - - List keys = new ArrayList<>(); - while (leaderOMappliedLogIndex < 2000) { - keys.add(createKey(ozoneBucket)); - leaderOMappliedLogIndex = - leaderRatisServer.getLastAppliedTermIndex().getIndex(); - } + List keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200); // Get the latest db checkpoint from the leader OM. OMTransactionInfo omTransactionInfo = OMTransactionInfo.readTransactionInfo(leaderOM.getMetadataManager()); TermIndex leaderOMTermIndex = - TermIndex.newTermIndex(omTransactionInfo.getCurrentTerm(), + TermIndex.newTermIndex(omTransactionInfo.getTerm(), omTransactionInfo.getTransactionIndex()); long leaderOMSnaphsotIndex = leaderOMTermIndex.getIndex(); long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm(); @@ -169,30 +171,20 @@ public void testInstallSnapshot() throws Exception { // The recently started OM should be lagging behind the leader OM. long followerOMLastAppliedIndex = followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex(); - Assert.assertTrue( + assertTrue( followerOMLastAppliedIndex < leaderOMSnaphsotIndex); // Install leader OM's db checkpoint on the lagging OM. - File oldDbLocation = followerOM.getMetadataManager().getStore() - .getDbLocation(); - followerOM.getOmRatisServer().getOmStateMachine().pause(); - followerOM.getMetadataManager().getStore().close(); - followerOM.replaceOMDBWithCheckpoint(leaderOMSnaphsotIndex, oldDbLocation, - leaderDbCheckpoint.getCheckpointLocation()); - - // Reload the follower OM with new DB checkpoint from the leader OM. - followerOM.reloadOMState(leaderOMSnaphsotIndex, leaderOMSnapshotTermIndex); - followerOM.getOmRatisServer().getOmStateMachine().unpause( - leaderOMSnaphsotIndex, leaderOMSnapshotTermIndex); - - // After the new checkpoint is loaded and state machine is unpaused, the - // follower OM lastAppliedIndex must match the snapshot index of the - // checkpoint. + followerOM.installCheckpoint(leaderOMNodeId, leaderDbCheckpoint); + + // After the new checkpoint is installed, the follower OM + // lastAppliedIndex must >= the snapshot index of the checkpoint. It + // could be great than snapshot index if there is any conf entry from ratis. followerOMLastAppliedIndex = followerOM.getOmRatisServer() .getLastAppliedTermIndex().getIndex(); - Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex); - Assert.assertEquals(leaderOMSnapshotTermIndex, - followerOM.getOmRatisServer().getLastAppliedTermIndex().getTerm()); + assertTrue(followerOMLastAppliedIndex >= leaderOMSnaphsotIndex); + assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex() + .getTerm() >= leaderOMSnapshotTermIndex); // Verify that the follower OM's DB contains the transactions which were // made while it was inactive. @@ -206,4 +198,133 @@ public void testInstallSnapshot() throws Exception { followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key))); } } + + @Test + public void testInstallOldCheckpointFailure() throws Exception { + // Get the leader OM + String leaderOMNodeId = OmFailoverProxyUtil + .getFailoverProxyProvider(objectStore.getClientProxy()) + .getCurrentProxyOMNodeId(); + + OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); + + // Find the inactive OM and start it + String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId(); + if (cluster.isOMActive(followerNodeId)) { + followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId(); + } + cluster.startInactiveOM(followerNodeId); + + OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); + OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer(); + + // Do some transactions so that the log index increases on follower OM + writeKeysToIncreaseLogIndex(followerRatisServer, 100); + + TermIndex leaderCheckpointTermIndex = leaderOM.getOmRatisServer() + .getLastAppliedTermIndex(); + DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore() + .getCheckpoint(false); + + // Do some more transactions to increase the log index further on + // follower OM such that it is more than the checkpoint index taken on + // leader OM. + writeKeysToIncreaseLogIndex(followerOM.getOmRatisServer(), + leaderCheckpointTermIndex.getIndex() + 100); + + GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO); + GenericTestUtils.LogCapturer logCapture = + GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); + + // Install the old checkpoint on the follower OM. This should fail as the + // followerOM is already ahead of that transactionLogIndex and the OM + // state should be reloaded. + TermIndex followerTermIndex = followerRatisServer.getLastAppliedTermIndex(); + TermIndex newTermIndex = followerOM.installCheckpoint( + leaderOMNodeId, leaderDbCheckpoint); + + String errorMsg = "Cannot proceed with InstallSnapshot as OM is at " + + "TermIndex " + followerTermIndex + " and checkpoint has lower " + + "TermIndex"; + Assert.assertTrue(logCapture.getOutput().contains(errorMsg)); + Assert.assertNull("OM installed checkpoint even though checkpoint " + + "logIndex is less than it's lastAppliedIndex", newTermIndex); + Assert.assertEquals(followerTermIndex, + followerRatisServer.getLastAppliedTermIndex()); + } + + @Test + public void testInstallCorruptedCheckpointFailure() throws Exception { + // Get the leader OM + String leaderOMNodeId = OmFailoverProxyUtil + .getFailoverProxyProvider(objectStore.getClientProxy()) + .getCurrentProxyOMNodeId(); + + OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); + OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer(); + + // Find the inactive OM + String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId(); + if (cluster.isOMActive(followerNodeId)) { + followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId(); + } + OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); + OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer(); + + // Do some transactions so that the log index increases + writeKeysToIncreaseLogIndex(leaderRatisServer, 100); + + DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore() + .getCheckpoint(false); + Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation(); + OMTransactionInfo leaderCheckpointTrxnInfo = OzoneManagerRatisUtils + .getTrxnInfoFromCheckpoint(conf, leaderCheckpointLocation); + + // Corrupt the leader checkpoint and install that on the OM. The + // operation should fail and OM should shutdown. + boolean delete = true; + for (File file : leaderCheckpointLocation.toFile() + .listFiles()) { + if (file.getName().contains(".sst")) { + if (delete) { + file.delete(); + delete = false; + } else { + delete = true; + } + } + } + + GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.ERROR); + GenericTestUtils.LogCapturer logCapture = + GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG); + followerOM.setExitManagerForTesting(new DummyExitManager()); + + followerOM.installCheckpoint(leaderOMNodeId, leaderCheckpointLocation, + leaderCheckpointTrxnInfo); + + Assert.assertTrue(logCapture.getOutput().contains("System Exit: " + + "Failed to reload OM state and instantiate services.")); + } + + private List writeKeysToIncreaseLogIndex( + OzoneManagerRatisServer omRatisServer, long targetLogIndex) + throws IOException, InterruptedException { + List keys = new ArrayList<>(); + long logIndex = omRatisServer.getLastAppliedTermIndex().getIndex(); + while (logIndex < targetLogIndex) { + keys.add(createKey(ozoneBucket)); + Thread.sleep(100); + logIndex = omRatisServer.getLastAppliedTermIndex().getIndex(); + } + return keys; + } + + private class DummyExitManager extends ExitManager { + @Override + public void exitSystem(int status, String message, Throwable throwable, + Logger log) { + log.error("System Exit: " + message, throwable); + } + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java index d77f4d9d1341..844c859ac028 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java @@ -20,6 +20,7 @@ import java.util.UUID; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -31,11 +32,10 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmFailoverProxyUtil; -import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OzoneManager; - -import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -124,7 +124,7 @@ public void testDownloadCheckpoint() throws Exception { .getOzoneManagerDBSnapshot(leaderOMNodeId); long leaderSnapshotIndex = leaderOM.getRatisSnapshotIndex(); - long downloadedSnapshotIndex = getDownloadSnapshotIndex(omSnapshot); + long downloadedSnapshotIndex = getDownloadedSnapshotIndex(omSnapshot); // The snapshot index downloaded from leader OM should match the ratis // snapshot index on the leader OM @@ -133,21 +133,13 @@ public void testDownloadCheckpoint() throws Exception { leaderSnapshotIndex, downloadedSnapshotIndex); } - private long getDownloadSnapshotIndex(DBCheckpoint dbCheckpoint) + private long getDownloadedSnapshotIndex(DBCheckpoint dbCheckpoint) throws Exception { - OzoneConfiguration configuration = new OzoneConfiguration(conf); - configuration.set(OMConfigKeys.OZONE_OM_DB_DIRS, - dbCheckpoint.getCheckpointLocation().getParent().toString()); - - OmMetadataManagerImpl omMetadataManager = - new OmMetadataManagerImpl(configuration); - - long transactionIndex = - OMTransactionInfo.readTransactionInfo(omMetadataManager) - .getTransactionIndex(); - omMetadataManager.stop(); - return transactionIndex; + OMTransactionInfo trxnInfoFromCheckpoint = + OzoneManagerRatisUtils.getTrxnInfoFromCheckpoint(conf, + dbCheckpoint.getCheckpointLocation()); + return trxnInfoFromCheckpoint.getTransactionIndex(); } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index d48c6fa9a36f..6c8b50595ca1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -77,9 +77,11 @@ import org.apache.commons.lang3.StringUtils; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import org.apache.ratis.util.ExitUtils; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,6 +251,20 @@ public void start(OzoneConfiguration configuration) throws IOException { if (store == null) { File metaDir = OMStorage.getOmDbDir(configuration); + // Check if there is a DB Inconsistent Marker in the metaDir. This + // marker indicates that the DB is in an inconsistent state and hence + // the OM process should be terminated. + File markerFile = new File(metaDir, DB_TRANSIENT_MARKER); + if (markerFile.exists()) { + LOG.error("File {} marks that OM DB is in an inconsistent state."); + // Note - The marker file should be deleted only after fixing the DB. + // In an HA setup, this can be done by replacing this DB with a + // checkpoint from another OM. + String errorMsg = "Cannot load OM DB as it is in an inconsistent " + + "state."; + ExitUtils.terminate(1, errorMsg, LOG); + } + RocksDBConfiguration rocksDBConfiguration = configuration.getObject(RocksDBConfiguration.class); @@ -273,10 +289,15 @@ public void start(OzoneConfiguration configuration) throws IOException { public static DBStore loadDB(OzoneConfiguration configuration, File metaDir) throws IOException { + return loadDB(configuration, metaDir, OM_DB_NAME); + } + + public static DBStore loadDB(OzoneConfiguration configuration, File metaDir, + String dbName) throws IOException { RocksDBConfiguration rocksDBConfiguration = configuration.getObject(RocksDBConfiguration.class); DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(configuration, - rocksDBConfiguration).setName(OM_DB_NAME) + rocksDBConfiguration).setName(dbName) .setPath(Paths.get(metaDir.getPath())); DBStore dbStore = addOMTablesAndCodecs(dbStoreBuilder).build(); return dbStore; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 0905d81887b1..8a49fa780b37 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -155,6 +155,7 @@ import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType; import org.apache.hadoop.ozone.security.acl.OzoneObjInfo; import org.apache.hadoop.ozone.security.acl.RequestContext; +import org.apache.hadoop.ozone.util.ExitManager; import org.apache.hadoop.ozone.util.OzoneVersionInfo; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -194,6 +195,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE; import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT; @@ -217,6 +219,7 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.LifeCycle; import org.bouncycastle.pkcs.PKCS10CertificationRequest; @@ -308,6 +311,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private boolean isNativeAuthorizerEnabled; + private ExitManager exitManager; + private enum State { INITIALIZED, RUNNING, @@ -3066,51 +3071,52 @@ public List getAcl(OzoneObj obj) throws IOException { /** * Download and install latest checkpoint from leader OM. - * If the download checkpoints snapshot index is greater than this OM's - * last applied transaction index, then re-initialize the OM state via this - * checkpoint. Before re-initializing OM state, the OM Ratis server should - * be stopped so that no new transactions can be applied. * * @param leaderId peerNodeID of the leader OM - * @return If checkpoint is installed, return the corresponding termIndex. - * Otherwise, return null. + * @return If checkpoint is installed successfully, return the + * corresponding termIndex. Otherwise, return null. */ - public TermIndex installSnapshot(String leaderId) { + public TermIndex installSnapshotFromLeader(String leaderId) { if (omSnapshotProvider == null) { LOG.error("OM Snapshot Provider is not configured as there are no peer " + "nodes."); return null; } - DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId); - Path newDBlocation = omDBcheckpoint.getCheckpointLocation(); + DBCheckpoint omDBCheckpoint = getDBCheckpointFromLeader(leaderId); + LOG.info("Downloaded checkpoint from Leader {} to the location {}", + leaderId, omDBCheckpoint.getCheckpointLocation()); - LOG.info("Downloaded checkpoint from Leader {}, in to the location {}", - leaderId, newDBlocation); + TermIndex termIndex = null; + try { + termIndex = installCheckpoint(leaderId, omDBCheckpoint); + } catch (Exception ex) { + LOG.error("Failed to install snapshot from Leader OM: {}", ex); + } + return termIndex; + } - // Check if current ratis log index is smaller than the downloaded - // checkpoint transaction index. If yes, proceed by stopping the ratis - // server so that the OM state can be re-initialized. If no, then do not - // proceed with installSnapshot. + /** + * Install checkpoint. If the checkpoints snapshot index is greater than + * OM's last applied transaction index, then re-initialize the OM + * state via this checkpoint. Before re-initializing OM state, the OM Ratis + * server should be stopped so that no new transactions can be applied. + */ + TermIndex installCheckpoint(String leaderId, DBCheckpoint omDBCheckpoint) + throws Exception { - OMTransactionInfo omTransactionInfo = null; + Path checkpointLocation = omDBCheckpoint.getCheckpointLocation(); + OMTransactionInfo checkpointTrxnInfo = OzoneManagerRatisUtils + .getTrxnInfoFromCheckpoint(configuration, checkpointLocation); - Path dbDir = newDBlocation.getParent(); - if (dbDir == null) { - LOG.error("Incorrect DB location path {} received from checkpoint.", - newDBlocation); - return null; - } + LOG.info("Installing checkpoint with OMTransactionInfo {}", + checkpointTrxnInfo); - try { - omTransactionInfo = - OzoneManagerRatisUtils.getTransactionInfoFromDownloadedSnapshot( - configuration, dbDir); - } catch (Exception ex) { - LOG.error("Failed during opening downloaded snapshot from " + - "{} to obtain transaction index", newDBlocation, ex); - return null; - } + return installCheckpoint(leaderId, checkpointLocation, checkpointTrxnInfo); + } + + TermIndex installCheckpoint(String leaderId, Path checkpointLocation, + OMTransactionInfo checkpointTrxnInfo) throws Exception { File oldDBLocation = metadataManager.getStore().getDbLocation(); try { @@ -3123,58 +3129,74 @@ public TermIndex installSnapshot(String leaderId) { omRatisServer.getOmStateMachine().pause(); } catch (Exception e) { LOG.error("Failed to stop/ pause the services. Cannot proceed with " + - "installing the new checkpoint.", e); - return null; - } - - //TODO: un-pause SM if any failures and retry? - - long lastAppliedIndex = omRatisServer.getLastAppliedTermIndex().getIndex(); - - boolean canProceed = - OzoneManagerRatisUtils.verifyTransactionInfo(omTransactionInfo, - lastAppliedIndex, leaderId, newDBlocation); - - // If downloaded DB has transaction info less than current one, return. - if (!canProceed) { - return null; + "installing the new checkpoint."); + // During stopServices, if KeyManager was stopped successfully and + // OMMetadataManager stop failed, we should restart the KeyManager. + keyManager.start(configuration); + throw e; } - long leaderIndex = omTransactionInfo.getTransactionIndex(); - long leaderTerm = omTransactionInfo.getCurrentTerm(); + File dbBackup = null; + TermIndex termIndex = omRatisServer.getLastAppliedTermIndex(); + long term = termIndex.getTerm(); + long lastAppliedIndex = termIndex.getIndex(); + // Check if current applied log index is smaller than the downloaded + // checkpoint transaction index. If yes, proceed by stopping the ratis + // server so that the OM state can be re-initialized. If no then do not + // proceed with installSnapshot. + boolean canProceed = OzoneManagerRatisUtils.verifyTransactionInfo( + checkpointTrxnInfo, lastAppliedIndex, leaderId, checkpointLocation); - File dbBackup; - try { - dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, oldDBLocation, - newDBlocation); - } catch (Exception e) { - LOG.error("OM DB checkpoint replacement with new downloaded checkpoint " + - "failed.", e); - return null; + if (canProceed) { + try { + dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, oldDBLocation, + checkpointLocation); + term = checkpointTrxnInfo.getTerm(); + lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex(); + LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, index: {}", + leaderId, term, lastAppliedIndex); + } catch (Exception e) { + LOG.error("Failed to install Snapshot from {} as OM failed to replace" + + " DB with downloaded checkpoint. Reloading old OM state.", e); + } + } else { + LOG.warn("Cannot proceed with InstallSnapshot as OM is at TermIndex {} " + + "and checkpoint has lower TermIndex {}. Reloading old state of OM.", + termIndex, checkpointTrxnInfo.getTermIndex()); } // Reload the OM DB store with the new checkpoint. // Restart (unpause) the state machine and update its last applied index // to the installed checkpoint's snapshot index. try { - reloadOMState(leaderIndex, leaderTerm); - omRatisServer.getOmStateMachine().unpause(leaderIndex, leaderTerm); - } catch (IOException e) { - LOG.error("Failed to reload OM state with new DB checkpoint.", e); - return null; + reloadOMState(lastAppliedIndex, term); + omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term); + LOG.info("Reloaded OM state with Term: {} and Index: {}", term, + lastAppliedIndex); + } catch (IOException ex) { + String errorMsg = "Failed to reload OM state and instantiate services."; + exitManager.exitSystem(1, errorMsg, ex, LOG); } // Delete the backup DB try { - FileUtils.deleteFully(dbBackup); + if (dbBackup != null) { + FileUtils.deleteFully(dbBackup); + } } catch (IOException e) { LOG.error("Failed to delete the backup of the original DB {}", dbBackup); } + if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) { + // Install Snapshot failed and old state was reloaded. Return null to + // Ratis to indicate that installation failed. + return null; + } + // TODO: We should only return the snpashotIndex to the leader. // Should be fixed after RATIS-586 - TermIndex newTermIndex = TermIndex.newTermIndex(leaderTerm, leaderIndex); + TermIndex newTermIndex = TermIndex.newTermIndex(term, lastAppliedIndex); return newTermIndex; } @@ -3208,16 +3230,17 @@ void stopServices() throws Exception { * * @param lastAppliedIndex the last applied index in the current OM DB. * @param checkpointPath path to the new DB checkpoint - * @return location of the backup of the original DB + * @return location of backup of the original DB * @throws Exception */ File replaceOMDBWithCheckpoint(long lastAppliedIndex, File oldDB, - Path checkpointPath) throws Exception { + Path checkpointPath) throws IOException { // Take a backup of the current DB String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX + lastAppliedIndex + "_" + System.currentTimeMillis(); - File dbBackup = new File(oldDB.getParentFile(), dbBackupName); + File dbDir = oldDB.getParentFile(); + File dbBackup = new File(dbDir, dbBackupName); try { Files.move(oldDB.toPath(), dbBackup.toPath()); @@ -3228,13 +3251,28 @@ File replaceOMDBWithCheckpoint(long lastAppliedIndex, File oldDB, } // Move the new DB checkpoint into the om metadata dir + Path markerFile = new File(dbDir, DB_TRANSIENT_MARKER).toPath(); try { + // Create a Transient Marker file. This file will be deleted if the + // checkpoint DB is successfully moved to the old DB location or if the + // old DB backup is reset to its location. If not, then the OM DB is in + // an inconsistent state and this marker file will fail OM from + // starting up. + Files.createFile(markerFile); Files.move(checkpointPath, oldDB.toPath()); + Files.deleteIfExists(markerFile); } catch (IOException e) { LOG.error("Failed to move downloaded DB checkpoint {} to metadata " + "directory {}. Resetting to original DB.", checkpointPath, oldDB.toPath()); - Files.move(dbBackup.toPath(), oldDB.toPath()); + try { + Files.move(dbBackup.toPath(), oldDB.toPath()); + Files.deleteIfExists(markerFile); + } catch (IOException ex) { + String errorMsg = "Failed to reset to original DB. OM is in an " + + "inconsistent state."; + ExitUtils.terminate(1, errorMsg, ex, LOG); + } throw e; } return dbBackup; @@ -3452,4 +3490,8 @@ private Pair resolveBucketLink( visited); } + @VisibleForTesting + void setExitManagerForTesting(ExitManager exitManagerForTesting) { + this.exitManager = exitManagerForTesting; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMTransactionInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMTransactionInfo.java index 24417515ef13..28c8c3a91f27 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMTransactionInfo.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMTransactionInfo.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Objects; +import org.apache.ratis.server.protocol.TermIndex; import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY; @@ -33,7 +34,7 @@ */ public final class OMTransactionInfo { - private long currentTerm; // term associated with the ratis log index. + private long term; // term associated with the ratis log index. // Transaction index corresponds to ratis log index private long transactionIndex; @@ -43,12 +44,12 @@ private OMTransactionInfo(String transactionInfo) { Preconditions.checkState(tInfo.length==2, "Incorrect TransactionInfo value"); - currentTerm = Long.parseLong(tInfo[0]); + term = Long.parseLong(tInfo[0]); transactionIndex = Long.parseLong(tInfo[1]); } private OMTransactionInfo(long currentTerm, long transactionIndex) { - this.currentTerm = currentTerm; + this.term = currentTerm; this.transactionIndex = transactionIndex; } @@ -56,8 +57,8 @@ private OMTransactionInfo(long currentTerm, long transactionIndex) { * Get current term. * @return currentTerm */ - public long getCurrentTerm() { - return currentTerm; + public long getTerm() { + return term; } /** @@ -68,6 +69,10 @@ public long getTransactionIndex() { return transactionIndex; } + public TermIndex getTermIndex() { + return TermIndex.newTermIndex(term, transactionIndex); + } + /** * Generate String form of transaction info which need to be persisted in OM * DB finally in byte array. @@ -75,7 +80,7 @@ public long getTransactionIndex() { */ private String generateTransactionInfo() { StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(currentTerm); + stringBuilder.append(term); stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY); stringBuilder.append(transactionIndex); @@ -109,13 +114,13 @@ public boolean equals(Object o) { return false; } OMTransactionInfo that = (OMTransactionInfo) o; - return currentTerm == that.currentTerm && + return term == that.term && transactionIndex == that.transactionIndex; } @Override public int hashCode() { - return Objects.hash(currentTerm, transactionIndex); + return Objects.hash(term, transactionIndex); } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index c042fcb7eedd..3f7429ab7dd0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -379,7 +379,7 @@ public CompletableFuture notifyInstallSnapshotFromLeader( } CompletableFuture future = CompletableFuture.supplyAsync( - () -> ozoneManager.installSnapshot(leaderNodeId), + () -> ozoneManager.installSnapshotFromLeader(leaderNodeId), installSnapshotExecutor); return future; } @@ -521,9 +521,9 @@ public void loadSnapshotInfoFromDB() throws IOException { ozoneManager.getMetadataManager()); if (omTransactionInfo != null) { setLastAppliedTermIndex(TermIndex.newTermIndex( - omTransactionInfo.getCurrentTerm(), + omTransactionInfo.getTerm(), omTransactionInfo.getTransactionIndex())); - snapshotInfo.updateTermIndex(omTransactionInfo.getCurrentTerm(), + snapshotInfo.updateTermIndex(omTransactionInfo.getTerm(), omTransactionInfo.getTransactionIndex()); } LOG.info("LastAppliedIndex is set from TransactionInfo from OM DB as {}", diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 4aaaf13f6a4d..ddb6841ae31e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -228,15 +228,35 @@ public static Status exceptionToResponseStatus(IOException exception) { } /** - * Obtain Transaction info from downloaded snapshot DB. + * Obtain OMTransactionInfo from Checkpoint. + */ + public static OMTransactionInfo getTrxnInfoFromCheckpoint( + OzoneConfiguration conf, Path dbPath) throws Exception { + + if (dbPath != null) { + Path dbDir = dbPath.getParent(); + Path dbFile = dbPath.getFileName(); + if (dbDir != null && dbFile != null) { + return getTransactionInfoFromDB(conf, dbDir, dbFile.toString()); + } + } + + throw new IOException("Checkpoint " + dbPath + " does not have proper " + + "DB location"); + } + + /** + * Obtain Transaction info from DB. * @param tempConfig + * @param dbDir path to DB * @return OMTransactionInfo * @throws Exception */ - public static OMTransactionInfo getTransactionInfoFromDownloadedSnapshot( - OzoneConfiguration tempConfig, Path dbDir) throws Exception { - DBStore dbStore = - OmMetadataManagerImpl.loadDB(tempConfig, dbDir.toFile()); + private static OMTransactionInfo getTransactionInfoFromDB( + OzoneConfiguration tempConfig, Path dbDir, String dbName) + throws Exception { + DBStore dbStore = OmMetadataManagerImpl.loadDB(tempConfig, dbDir.toFile(), + dbName); Table transactionInfoTable = dbStore.getTable(TRANSACTION_INFO_TABLE, @@ -245,8 +265,11 @@ public static OMTransactionInfo getTransactionInfoFromDownloadedSnapshot( OMTransactionInfo omTransactionInfo = transactionInfoTable.get(TRANSACTION_INFO_KEY); dbStore.close(); - OzoneManager.LOG.info("Downloaded checkpoint with OMTransactionInfo {}", - omTransactionInfo); + + if (omTransactionInfo == null) { + throw new IOException("Failed to read OMTransactionInfo from DB " + + dbName + " at " + dbDir); + } return omTransactionInfo; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java index 1c78251abb92..a11c60b9435d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java @@ -113,8 +113,10 @@ public OzoneManagerSnapshotProvider(MutableConfigurationSource conf, public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID) throws IOException { String snapshotTime = Long.toString(System.currentTimeMillis()); - String snapshotFileName = Paths.get(omSnapshotDir.getAbsolutePath(), - snapshotTime, OM_DB_NAME).toFile().getAbsolutePath(); + String snapshotFileName = OM_DB_NAME + "-" + leaderOMNodeID + + "-" + snapshotTime; + String snapshotFilePath = Paths.get(omSnapshotDir.getAbsolutePath(), + snapshotFileName).toFile().getAbsolutePath(); File targetFile = new File(snapshotFileName + ".tar.gz"); String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID) @@ -141,11 +143,11 @@ public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID) }); // Untar the checkpoint file. - Path untarredDbDir = Paths.get(snapshotFileName); + Path untarredDbDir = Paths.get(snapshotFilePath); FileUtil.unTar(targetFile, untarredDbDir.toFile()); FileUtils.deleteQuietly(targetFile); - LOG.info("Sucessfully downloaded latest checkpoint from leader OM: {}", + LOG.info("Successfully downloaded latest checkpoint from leader OM: {}", leaderOMNodeID); RocksDBCheckpoint omCheckpoint = new RocksDBCheckpoint(untarredDbDir); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java index 054c97f396c7..6226c5bbc9f1 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java @@ -76,7 +76,7 @@ public void testTransactionTable() throws Exception { OMTransactionInfo omTransactionInfo = omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY); - Assert.assertEquals(3, omTransactionInfo.getCurrentTerm()); + Assert.assertEquals(3, omTransactionInfo.getTerm()); Assert.assertEquals(250, omTransactionInfo.getTransactionIndex()); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java index 7b86006b9379..372679b2b3eb 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java @@ -139,7 +139,7 @@ public void testDoubleBufferWithDummyResponse() throws Exception { Assert.assertEquals(lastAppliedIndex, omTransactionInfo.getTransactionIndex()); - Assert.assertEquals(term, omTransactionInfo.getCurrentTerm()); + Assert.assertEquals(term, omTransactionInfo.getTerm()); } /** diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java index b3693415b183..260e2cd17c10 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java @@ -202,7 +202,7 @@ public void testDoubleBufferWithMixOfTransactions() throws Exception { Assert.assertEquals(lastAppliedIndex, omTransactionInfo.getTransactionIndex()); - Assert.assertEquals(term, omTransactionInfo.getCurrentTerm()); + Assert.assertEquals(term, omTransactionInfo.getTerm()); } /**