diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index 4f9e45ce6755..1a1945924faf 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -28,6 +28,7 @@ .StorageContainerException; import org.apache.hadoop.hdds.utils.db.DBProfile; import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -57,9 +58,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.Mockito; -import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.RocksDBException; import java.io.File; @@ -504,8 +503,7 @@ public void testUpdateContainerUnsupportedRequest() throws Exception { } @Test - public void testContainerRocksDB() - throws StorageContainerException, RocksDBException { + public void testContainerRocksDB() throws Exception { closeContainer(); keyValueContainer = new KeyValueContainer( keyValueContainerData, CONF); @@ -518,7 +516,7 @@ public void testContainerRocksDB() long cacheSize = Long.parseLong(store .getProperty("rocksdb.block-cache-capacity")); Assert.assertEquals(defaultCacheSize, cacheSize); - for (ColumnFamilyHandle handle : store.getColumnFamilyHandles()) { + for (ColumnFamily handle : store.getColumnFamilies()) { cacheSize = Long.parseLong( store.getProperty(handle, "rocksdb.block-cache-capacity")); Assert.assertEquals(defaultCacheSize, cacheSize); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 42843b080d73..bed2c8fb680c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -18,14 +18,12 @@ */ package org.apache.hadoop.hdds.utils.db; -import java.io.IOException; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; +import java.io.IOException; + /** * Batch operation implementation for rocks db. */ @@ -41,12 +39,13 @@ public RDBBatchOperation(WriteBatch writeBatch) { this.writeBatch = writeBatch; } - public void commit(RocksDB db, WriteOptions writeOptions) throws IOException { - try { - db.write(writeOptions, writeBatch); - } catch (RocksDBException e) { - throw new IOException("Unable to write the batch.", e); - } + public void commit(RocksDatabase db) throws IOException { + db.batchWrite(writeBatch); + } + + public void commit(RocksDatabase db, WriteOptions writeOptions) + throws IOException { + db.batchWrite(writeBatch, writeOptions); } @Override @@ -54,20 +53,12 @@ public void close() { writeBatch.close(); } - public void delete(ColumnFamilyHandle handle, byte[] key) throws IOException { - try { - writeBatch.delete(handle, key); - } catch (RocksDBException e) { - throw new IOException("Can't record batch delete operation.", e); - } + public void delete(ColumnFamily family, byte[] key) throws IOException { + family.batchDelete(writeBatch, key); } - public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) + public void put(ColumnFamily family, byte[] key, byte[] value) throws IOException { - try { - writeBatch.put(handle, key, value); - } catch (RocksDBException e) { - throw new IOException("Can't record batch put operation.", e); - } + family.batchPut(writeBatch, key, value); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java index 18793647a081..9cc23b53ba52 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java @@ -19,15 +19,15 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; import org.apache.commons.lang3.StringUtils; -import org.rocksdb.Checkpoint; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.RocksCheckpoint; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,29 +36,21 @@ */ public class RDBCheckpointManager { - private final Checkpoint checkpoint; - private final RocksDB db; + private final RocksCheckpoint checkpoint; public static final String RDB_CHECKPOINT_DIR_PREFIX = "checkpoint_"; private static final Logger LOG = LoggerFactory.getLogger(RDBCheckpointManager.class); - private String checkpointNamePrefix = ""; - - public RDBCheckpointManager(RocksDB rocksDB) { - this.db = rocksDB; - this.checkpoint = Checkpoint.create(rocksDB); - } + private final String checkpointNamePrefix; /** * Create a checkpoint manager with a prefix to be added to the * snapshots created. * - * @param rocksDB DB instance * @param checkpointPrefix prefix string. */ - public RDBCheckpointManager(RocksDB rocksDB, String checkpointPrefix) { - this.db = rocksDB; + public RDBCheckpointManager(RocksDatabase db, String checkpointPrefix) { this.checkpointNamePrefix = checkpointPrefix; - this.checkpoint = Checkpoint.create(rocksDB); + this.checkpoint = db.createCheckpoint(); } /** @@ -79,20 +71,21 @@ public RocksDBCheckpoint createCheckpoint(String parentDir) { Path checkpointPath = Paths.get(parentDir, checkpointDir); Instant start = Instant.now(); - checkpoint.createCheckpoint(checkpointPath.toString()); + checkpoint.createCheckpoint(checkpointPath); + //Best guesstimate here. Not accurate. + final long latest = checkpoint.getLatestSequenceNumber(); Instant end = Instant.now(); long duration = Duration.between(start, end).toMillis(); LOG.info("Created checkpoint at {} in {} milliseconds", - checkpointPath.toString(), duration); + checkpointPath, duration); return new RocksDBCheckpoint( checkpointPath, currentTime, - db.getLatestSequenceNumber(), //Best guesstimate here. Not accurate. + latest, duration); - - } catch (RocksDBException e) { + } catch (IOException e) { LOG.error("Unable to create RocksDB Snapshot.", e); } return null; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index 7e45994599f0..467cf4462db7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -24,52 +24,39 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.utils.RocksDBStoreMBean; import org.apache.hadoop.hdds.utils.db.cache.TableCache; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.metrics2.util.MBeans; import com.google.common.base.Preconditions; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.DBOptions; -import org.rocksdb.FlushOptions; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.TransactionLogIterator; import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hdds.utils.HddsServerUtil.toIOException; - /** * RocksDB Store that supports creating Tables in DB. */ public class RDBStore implements DBStore { private static final Logger LOG = LoggerFactory.getLogger(RDBStore.class); - private RocksDB db; - private File dbLocation; - private final WriteOptions writeOptions; - private final DBOptions dbOptions; + private final RocksDatabase db; + private final File dbLocation; private final CodecRegistry codecRegistry; - private final Map handleTable; private ObjectName statMBeanName; - private RDBCheckpointManager checkPointManager; - private String checkpointsParentDir; - private List columnFamilyHandles; - private RDBMetrics rdbMetrics; + private final RDBCheckpointManager checkPointManager; + private final String checkpointsParentDir; + private final RDBMetrics rdbMetrics; @VisibleForTesting public RDBStore(File dbFile, DBOptions options, @@ -78,53 +65,19 @@ public RDBStore(File dbFile, DBOptions options, false); } - public RDBStore(File dbFile, DBOptions options, + public RDBStore(File dbFile, DBOptions dbOptions, WriteOptions writeOptions, Set families, CodecRegistry registry, boolean readOnly) throws IOException { Preconditions.checkNotNull(dbFile, "DB file location cannot be null"); Preconditions.checkNotNull(families); Preconditions.checkArgument(!families.isEmpty()); - handleTable = new HashMap<>(); codecRegistry = registry; - final List columnFamilyDescriptors = - new ArrayList<>(); - columnFamilyHandles = new ArrayList<>(); - - for (TableConfig family : families) { - columnFamilyDescriptors.add(family.getDescriptor()); - } - - dbOptions = options; dbLocation = dbFile; - this.writeOptions = writeOptions; try { - // This logic has been added to support old column families that have - // been removed, or those that may have been created in a future version. - // TODO : Revisit this logic during upgrade implementation. - List columnFamiliesInDb = getColumnFamiliesInExistingDb(); - List extraCf = columnFamiliesInDb.stream().filter( - cf -> !families.contains(cf)).collect(Collectors.toList()); - if (!extraCf.isEmpty()) { - LOG.info("Found the following extra column families in existing DB : " + - "{}", extraCf); - extraCf.forEach(cf -> columnFamilyDescriptors.add(cf.getDescriptor())); - } - - if (readOnly) { - db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(), - columnFamilyDescriptors, columnFamilyHandles); - } else { - db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(), - columnFamilyDescriptors, columnFamilyHandles); - } - - for (int x = 0; x < columnFamilyHandles.size(); x++) { - handleTable.put( - StringUtils.bytes2String(columnFamilyHandles.get(x).getName()), - columnFamilyHandles.get(x)); - } + db = RocksDatabase.open(dbFile, dbOptions, writeOptions, + families, readOnly); if (dbOptions.statistics() != null) { Map jmxProperties = new HashMap<>(); @@ -154,79 +107,38 @@ public RDBStore(File dbFile, DBOptions options, checkPointManager = new RDBCheckpointManager(db, dbLocation.getName()); rdbMetrics = RDBMetrics.create(); - } catch (RocksDBException e) { + } catch (IOException e) { String msg = "Failed init RocksDB, db path : " + dbFile.getAbsolutePath() + ", " + "exception :" + (e.getCause() == null ? e.getClass().getCanonicalName() + " " + e.getMessage() : e.getCause().getClass().getCanonicalName() + " " + e.getCause().getMessage()); - throw toIOException(msg, e); + throw new IOException(msg, e); } if (LOG.isDebugEnabled()) { LOG.debug("RocksDB successfully opened."); LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath()); - LOG.debug("[Option] createIfMissing = {}", options.createIfMissing()); - LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles()); - } - } - - /** - * Read DB and return existing column families. - * @return List of column families - * @throws RocksDBException on Error. - */ - private List getColumnFamiliesInExistingDb() - throws RocksDBException { - List bytes = RocksDB.listColumnFamilies(new Options(), - dbLocation.getAbsolutePath()); - List columnFamiliesInDb = bytes.stream() - .map(cfbytes -> new TableConfig(StringUtils.bytes2String(cfbytes), - DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE.getColumnFamilyOptions())) - .collect(Collectors.toList()); - if (LOG.isDebugEnabled()) { - LOG.debug("Found column Families in DB : {}", - columnFamiliesInDb); + LOG.debug("[Option] createIfMissing = {}", dbOptions.createIfMissing()); + LOG.debug("[Option] maxOpenFiles= {}", dbOptions.maxOpenFiles()); } - return columnFamiliesInDb; } @Override public void compactDB() throws IOException { - if (db != null) { - try { - db.compactRange(); - } catch (RocksDBException e) { - throw toIOException("Failed to compact db", e); - } - } + db.compactRange(); } @Override - public void close() throws IOException { - - for (final ColumnFamilyHandle handle : handleTable.values()) { - handle.close(); - } - + public void close() { if (statMBeanName != null) { MBeans.unregister(statMBeanName); statMBeanName = null; } RDBMetrics.unRegister(); - if (db != null) { - db.close(); - } - - if (dbOptions != null) { - dbOptions.close(); - } - - if (writeOptions != null) { - writeOptions.close(); - } + db.close(); } @Override @@ -260,11 +172,7 @@ public void move(K sourceKey, K destKey, V value, @Override public long getEstimatedKeyCount() throws IOException { - try { - return db.getLongProperty("rocksdb.estimate-num-keys"); - } catch (RocksDBException e) { - throw toIOException("Unable to get the estimated count.", e); - } + return db.estimateNumKeys(); } @Override @@ -275,7 +183,7 @@ public BatchOperation initBatchOperation() { @Override public void commitBatchOperation(BatchOperation operation) throws IOException { - ((RDBBatchOperation) operation).commit(db, writeOptions); + ((RDBBatchOperation) operation).commit(db); } @@ -286,11 +194,11 @@ protected ObjectName getStatMBeanName() { @Override public Table getTable(String name) throws IOException { - ColumnFamilyHandle handle = handleTable.get(name); + final ColumnFamily handle = db.getColumnFamily(name); if (handle == null) { throw new IOException("No such table in this DB. TableName : " + name); } - return new RDBTable(this.db, handle, this.writeOptions, rdbMetrics); + return new RDBTable(this.db, handle, rdbMetrics); } @Override @@ -311,33 +219,22 @@ public Table getTable(String name, @Override public ArrayList listTables() { ArrayList
returnList = new ArrayList<>(); - for (ColumnFamilyHandle handle : handleTable.values()) { - returnList.add(new RDBTable(db, handle, writeOptions, rdbMetrics)); + for (ColumnFamily family : getColumnFamilies()) { + returnList.add(new RDBTable(db, family, rdbMetrics)); } return returnList; } @Override public void flushDB() throws IOException { - try (FlushOptions flushOptions = new FlushOptions()) { - flushOptions.setWaitForFlush(true); - db.flush(flushOptions); - } catch (RocksDBException e) { - throw toIOException("Unable to Flush RocksDB data", e); - } + db.flush(); } @Override public void flushLog(boolean sync) throws IOException { - if (db != null) { - try { - // for RocksDB it is sufficient to flush the WAL as entire db can - // be reconstructed using it. - db.flushWal(sync); - } catch (RocksDBException e) { - throw toIOException("Failed to flush db", e); - } - } + // for RocksDB it is sufficient to flush the WAL as entire db can + // be reconstructed using it. + db.flushWal(sync); } @Override @@ -358,20 +255,14 @@ public Map getTableNames() { Map tableNames = new HashMap<>(); StringCodec stringCodec = new StringCodec(); - for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - try { - tableNames.put(columnFamilyHandle.getID(), stringCodec - .fromPersistedFormat(columnFamilyHandle.getName())); - } catch (RocksDBException | IOException e) { - LOG.error("Unexpected exception while reading column family handle " + - "name", e); - } + for (ColumnFamily columnFamily : getColumnFamilies()) { + tableNames.put(columnFamily.getID(), columnFamily.getName(stringCodec)); } return tableNames; } - public List getColumnFamilyHandles() { - return Collections.unmodifiableList(columnFamilyHandles); + public Collection getColumnFamilies() { + return db.getColumnFamilies(); } @Override @@ -428,7 +319,7 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) } transactionLogIterator.next(); } - } catch (RocksDBException e) { + } catch (RocksDBException | IOException e) { LOG.error("Unable to get delta updates since sequenceNumber {} ", sequenceNumber, e); } @@ -437,17 +328,17 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) } @VisibleForTesting - public RocksDB getDb() { + public RocksDatabase getDb() { return db; } - public String getProperty(String property) throws RocksDBException { + public String getProperty(String property) throws IOException { return db.getProperty(property); } - public String getProperty(ColumnFamilyHandle handle, String property) - throws RocksDBException { - return db.getProperty(handle, property); + public String getProperty(ColumnFamily family, String property) + throws IOException { + return db.getProperty(family, property); } public RDBMetrics getMetrics() { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 3e28170d4cbb..f6321fc1fb80 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -20,27 +20,19 @@ package org.apache.hadoop.hdds.utils.db; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.ArrayList; import java.util.Arrays; +import java.util.function.Supplier; import org.apache.hadoop.hdds.annotation.InterfaceAudience; -import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.Holder; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; import org.rocksdb.RocksIterator; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hdds.utils.HddsServerUtil.toIOException; - /** * RocksDB implementation of ozone metadata store. This class should be only * used as part of TypedTable as it's underlying implementation to access the @@ -53,52 +45,37 @@ class RDBTable implements Table { private static final Logger LOG = LoggerFactory.getLogger(RDBTable.class); - private final RocksDB db; - private final ColumnFamilyHandle handle; - private final WriteOptions writeOptions; + private final RocksDatabase db; + private final ColumnFamily family; private final RDBMetrics rdbMetrics; /** * Constructs a TableStore. * * @param db - DBstore that we are using. - * @param handle - ColumnFamily Handle. - * @param writeOptions - RocksDB write Options. + * @param family - ColumnFamily Handle. */ - RDBTable(RocksDB db, ColumnFamilyHandle handle, - WriteOptions writeOptions, RDBMetrics rdbMetrics) { + RDBTable(RocksDatabase db, ColumnFamily family, + RDBMetrics rdbMetrics) { this.db = db; - this.handle = handle; - this.writeOptions = writeOptions; + this.family = family; this.rdbMetrics = rdbMetrics; } - /** - * Returns the Column family Handle. - * - * @return ColumnFamilyHandle. - */ - public ColumnFamilyHandle getHandle() { - return handle; + public ColumnFamily getColumnFamily() { + return family; } @Override public void put(byte[] key, byte[] value) throws IOException { - try { - db.put(handle, writeOptions, key, value); - } catch (RocksDBException e) { - LOG.error("Failed to write to DB. Key: {}", new String(key, - StandardCharsets.UTF_8)); - throw toIOException("Failed to put key-value to metadata " - + "store", e); - } + db.put(family, key, value); } @Override public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) throws IOException { if (batch instanceof RDBBatchOperation) { - ((RDBBatchOperation) batch).put(getHandle(), key, value); + ((RDBBatchOperation) batch).put(family, key, value); } else { throw new IllegalArgumentException("batch should be RDBBatchOperation"); } @@ -115,37 +92,23 @@ public boolean isEmpty() throws IOException { @Override public boolean isExist(byte[] key) throws IOException { - try { - // RocksDB#keyMayExist - // If the key definitely does not exist in the database, then this - // method returns false, else true. - rdbMetrics.incNumDBKeyMayExistChecks(); - Holder outValue = new Holder<>(); - boolean keyMayExist = db.keyMayExist(handle, key, outValue); - if (keyMayExist) { - boolean keyExists = - (outValue.getValue() != null && outValue.getValue().length > 0) || - (db.get(handle, key) != null); - if (!keyExists) { - rdbMetrics.incNumDBKeyMayExistMisses(); - } - return keyExists; - } + rdbMetrics.incNumDBKeyMayExistChecks(); + final Supplier holder = db.keyMayExistHolder(family, key); + if (holder == null) { return false; - } catch (RocksDBException e) { - throw toIOException( - "Error in accessing DB. ", e); } + final byte[] value = holder.get(); + final boolean exists = (value != null && value.length > 0) + || db.get(family, key) != null; + if (!exists) { + rdbMetrics.incNumDBKeyMayExistMisses(); + } + return exists; } @Override public byte[] get(byte[] key) throws IOException { - try { - return db.get(handle, key); - } catch (RocksDBException e) { - throw toIOException( - "Failed to get the value for the given key", e); - } + return db.get(family, key); } /** @@ -163,43 +126,31 @@ public byte[] getSkipCache(byte[] bytes) throws IOException { @Override public byte[] getIfExist(byte[] key) throws IOException { - try { - // RocksDB#keyMayExist - // If the key definitely does not exist in the database, then this - // method returns false, else true. - rdbMetrics.incNumDBKeyGetIfExistChecks(); - boolean keyMayExist = db.keyMayExist(handle, key, null); - if (keyMayExist) { - // Not using out value from string builder, as that is causing - // IllegalArgumentException during protobuf parsing. - rdbMetrics.incNumDBKeyGetIfExistGets(); - byte[] val; - val = db.get(handle, key); - if (val == null) { - rdbMetrics.incNumDBKeyGetIfExistMisses(); - } - return val; + rdbMetrics.incNumDBKeyGetIfExistChecks(); + final boolean keyMayExist = db.keyMayExist(family, key); + if (keyMayExist) { + // Not using out value from string builder, as that is causing + // IllegalArgumentException during protobuf parsing. + rdbMetrics.incNumDBKeyGetIfExistGets(); + final byte[] val = db.get(family, key); + if (val == null) { + rdbMetrics.incNumDBKeyGetIfExistMisses(); } - return null; - } catch (RocksDBException e) { - throw toIOException("Error in accessing DB. ", e); + return val; } + return null; } @Override public void delete(byte[] key) throws IOException { - try { - db.delete(handle, key); - } catch (RocksDBException e) { - throw toIOException("Failed to delete the given key", e); - } + db.delete(family, key); } @Override public void deleteWithBatch(BatchOperation batch, byte[] key) throws IOException { if (batch instanceof RDBBatchOperation) { - ((RDBBatchOperation) batch).delete(getHandle(), key); + ((RDBBatchOperation) batch).delete(family, key); } else { throw new IllegalArgumentException("batch should be RDBBatchOperation"); } @@ -208,18 +159,12 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) @Override public TableIterator iterator() { - ReadOptions readOptions = new ReadOptions(); - readOptions.setFillCache(false); - return new RDBStoreIterator(db.newIterator(handle, readOptions), this); + return new RDBStoreIterator(db.newIterator(family, false), this); } @Override public String getName() throws IOException { - try { - return StringUtils.bytes2String(this.getHandle().getName()); - } catch (RocksDBException rdbEx) { - throw toIOException("Unable to get the table name.", rdbEx); - } + return family.getName(); } @Override @@ -229,12 +174,7 @@ public void close() throws Exception { @Override public long getEstimatedKeyCount() throws IOException { - try { - return db.getLongProperty(handle, "rocksdb.estimate-num-keys"); - } catch (RocksDBException e) { - throw toIOException( - "Failed to get estimated key count of table " + getName(), e); - } + return db.estimateNumKeys(family); } @Override @@ -261,7 +201,7 @@ private List getRangeKVs(byte[] startKey, throw new IllegalArgumentException( "Invalid count given " + count + ", count must be greater than 0"); } - try (RocksIterator it = db.newIterator(handle)) { + try (RocksIterator it = db.newIterator(family)) { if (startKey == null) { it.seekToFirst(); } else { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java new file mode 100644 index 000000000000..2d358e10b25f --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.utils.db; + +import org.apache.hadoop.hdds.utils.HddsServerUtil; +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.DBOptions; +import org.rocksdb.FlushOptions; +import org.rocksdb.Holder; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.TransactionLogIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.StringUtils.bytes2String; + +/** + * A wrapper class for {@link RocksDB}. + * When there is a {@link RocksDBException} with error, + * this class will close the underlying {@link org.rocksdb.RocksObject}s. + */ +public final class RocksDatabase { + static final Logger LOG = LoggerFactory.getLogger(RocksDatabase.class); + + static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys"; + + static IOException toIOException(Object name, String op, RocksDBException e) { + return HddsServerUtil.toIOException(name + ": Failed to " + op, e); + } + + /** + * Read DB and return existing column families. + * + * @return a list of column families. + * @see RocksDB#listColumnFamilies(Options, String) + */ + private static List getColumnFamilies(File file) + throws RocksDBException { + final List columnFamilies = RocksDB.listColumnFamilies( + new Options(), file.getAbsolutePath()) + .stream() + .map(TableConfig::newTableConfig) + .collect(Collectors.toList()); + if (LOG.isDebugEnabled()) { + LOG.debug("Found column families in DB {}: {}", file, columnFamilies); + } + return columnFamilies; + } + + static RocksDatabase open(File dbFile, DBOptions dbOptions, + WriteOptions writeOptions, Set families, + boolean readOnly) throws IOException { + List descriptors = null; + RocksDB db = null; + final Map columnFamilies = new HashMap<>(); + try { + // This logic has been added to support old column families that have + // been removed, or those that may have been created in a future version. + // TODO : Revisit this logic during upgrade implementation. + final Stream extra = getColumnFamilies(dbFile).stream() + .filter(extraColumnFamily(families)); + descriptors = Stream.concat(families.stream(), extra) + .map(TableConfig::getDescriptor) + .collect(Collectors.toList()); + + // open RocksDB + final List handles = new ArrayList<>(); + if (readOnly) { + db = RocksDB.openReadOnly(dbOptions, dbFile.getAbsolutePath(), + descriptors, handles); + } else { + db = RocksDB.open(dbOptions, dbFile.getAbsolutePath(), + descriptors, handles); + } + // init a column family map. + for (ColumnFamilyHandle h : handles) { + final ColumnFamily f = new ColumnFamily(h); + columnFamilies.put(f.getName(), f); + } + return new RocksDatabase(dbFile, db, dbOptions, writeOptions, + descriptors, Collections.unmodifiableMap(columnFamilies)); + } catch (RocksDBException e) { + close(columnFamilies, db, descriptors, writeOptions, dbOptions); + throw toIOException(RocksDatabase.class, "open " + dbFile, e); + } + } + + private static void close(ColumnFamilyDescriptor d) { + runWithTryCatch(() -> d.getOptions().close(), new Object() { + @Override + public String toString() { + return d.getClass() + ":" + bytes2String(d.getName()); + } + }); + } + + private static void close(Map columnFamilies, + RocksDB db, List descriptors, + WriteOptions writeOptions, DBOptions dbOptions) { + if (columnFamilies != null) { + for (ColumnFamily f : columnFamilies.values()) { + runWithTryCatch(() -> f.getHandle().close(), f); + } + } + + if (db != null) { + runWithTryCatch(db::close, "db"); + } + + if (descriptors != null) { + descriptors.forEach(RocksDatabase::close); + } + + if (writeOptions != null) { + runWithTryCatch(writeOptions::close, "writeOptions"); + } + if (dbOptions != null) { + runWithTryCatch(dbOptions::close, "dbOptions"); + } + } + + private static void runWithTryCatch(Runnable runnable, Object name) { + try { + runnable.run(); + } catch (Throwable t) { + LOG.error("Failed to close " + name, t); + } + } + + static Predicate extraColumnFamily(Set families) { + return f -> { + if (families.contains(f)) { + return false; + } + LOG.info("Found an extra column family in existing DB: {}", f); + return true; + }; + } + + /** + * Represents a checkpoint of the db. + * + * @see Checkpoint + */ + final class RocksCheckpoint { + private final Checkpoint checkpoint; + + private RocksCheckpoint() { + this.checkpoint = Checkpoint.create(db); + } + + public void createCheckpoint(Path path) throws IOException { + try { + checkpoint.createCheckpoint(path.toString()); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "createCheckpoint " + path, e); + } + } + + public long getLatestSequenceNumber() { + return RocksDatabase.this.getLatestSequenceNumber(); + } + } + + /** + * Represents a column family of the db. + * + * @see ColumnFamilyHandle + */ + public static final class ColumnFamily { + private final byte[] nameBytes; + private final String name; + private final ColumnFamilyHandle handle; + + public ColumnFamily(ColumnFamilyHandle handle) throws RocksDBException { + this.nameBytes = handle.getName(); + this.name = bytes2String(nameBytes); + this.handle = handle; + LOG.debug("new ColumnFamily for {}", name); + } + + public String getName() { + return name; + } + + public String getName(StringCodec codec) { + return codec.fromPersistedFormat(nameBytes); + } + + private ColumnFamilyHandle getHandle() { + return handle; + } + + public int getID() { + return getHandle().getID(); + } + + public void batchDelete(WriteBatch writeBatch, byte[] key) + throws IOException { + try { + writeBatch.delete(getHandle(), key); + } catch (RocksDBException e) { + throw toIOException(this, "batchDelete key " + bytes2String(key), e); + } + } + + public void batchPut(WriteBatch writeBatch, byte[] key, byte[] value) + throws IOException { + try { + writeBatch.put(getHandle(), key, value); + } catch (RocksDBException e) { + throw toIOException(this, "batchPut key " + bytes2String(key), e); + } + } + + @Override + public String toString() { + return "ColumnFamily-" + getName(); + } + } + + private final String name; + private final RocksDB db; + private final DBOptions dbOptions; + private final WriteOptions writeOptions; + private final List descriptors; + private final Map columnFamilies; + + private final AtomicBoolean isClosed = new AtomicBoolean(); + + private RocksDatabase(File dbFile, RocksDB db, DBOptions dbOptions, + WriteOptions writeOptions, + List descriptors, + Map columnFamilies) { + this.name = getClass().getSimpleName() + "[" + dbFile + "]"; + this.db = db; + this.dbOptions = dbOptions; + this.writeOptions = writeOptions; + this.descriptors = descriptors; + this.columnFamilies = columnFamilies; + } + + void close() { + if (isClosed.compareAndSet(false, true)) { + close(columnFamilies, db, descriptors, writeOptions, dbOptions); + } + } + + private void closeOnError(RocksDBException e) { + if (shouldClose(e)) { + close(); + } + } + + private boolean shouldClose(RocksDBException e) { + switch (e.getStatus().getCode()) { + case Corruption: + case IOError: + return true; + default: + return false; + } + } + + public void put(ColumnFamily family, byte[] key, byte[] value) + throws IOException { + try { + db.put(family.getHandle(), writeOptions, key, value); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "put " + bytes2String(key), e); + } + } + + public void flush() throws IOException { + try (FlushOptions options = new FlushOptions()) { + options.setWaitForFlush(true); + db.flush(options); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "flush", e); + } + } + + public void flushWal(boolean sync) throws IOException { + try { + db.flushWal(sync); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "flushWal with sync=" + sync, e); + } + } + + public void compactRange() throws IOException { + try { + db.compactRange(); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "compactRange", e); + } + } + + RocksCheckpoint createCheckpoint() { + return new RocksCheckpoint(); + } + + /** + * @return false if the key definitely does not exist in the database; + * otherwise, return true. + * @see RocksDB#keyMayExist(ColumnFamilyHandle, byte[], Holder) + */ + public boolean keyMayExist(ColumnFamily family, byte[] key) { + return db.keyMayExist(family.getHandle(), key, null); + } + + /** + * @return the null if the key definitely does not exist in the database; + * otherwise, return a {@link Supplier}. + * @see RocksDB#keyMayExist(ColumnFamilyHandle, byte[], Holder) + */ + public Supplier keyMayExistHolder(ColumnFamily family, + byte[] key) { + final Holder out = new Holder<>(); + return db.keyMayExist(family.getHandle(), key, out) ? out::getValue : null; + } + + public ColumnFamily getColumnFamily(String key) { + return columnFamilies.get(key); + } + + public Collection getColumnFamilies() { + return Collections.unmodifiableCollection(columnFamilies.values()); + } + + public byte[] get(ColumnFamily family, byte[] key) throws IOException { + try { + return db.get(family.getHandle(), key); + } catch (RocksDBException e) { + closeOnError(e); + final String message = "get " + bytes2String(key) + " from " + family; + throw toIOException(this, message, e); + } + } + + public long estimateNumKeys() throws IOException { + return getLongProperty(ESTIMATE_NUM_KEYS); + } + + public long estimateNumKeys(ColumnFamily family) throws IOException { + return getLongProperty(family, ESTIMATE_NUM_KEYS); + } + + private long getLongProperty(String key) throws IOException { + try { + return db.getLongProperty(key); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "getLongProperty " + key, e); + } + } + + private long getLongProperty(ColumnFamily family, String key) + throws IOException { + try { + return db.getLongProperty(family.getHandle(), key); + } catch (RocksDBException e) { + closeOnError(e); + final String message = "getLongProperty " + key + " from " + family; + throw toIOException(this, message, e); + } + } + + public String getProperty(String key) throws IOException { + try { + return db.getProperty(key); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "getProperty " + key, e); + } + } + + public String getProperty(ColumnFamily family, String key) + throws IOException { + try { + return db.getProperty(family.getHandle(), key); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "getProperty " + key + " from " + family, e); + } + } + + public TransactionLogIterator getUpdatesSince(long sequenceNumber) + throws IOException { + try { + return db.getUpdatesSince(sequenceNumber); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "getUpdatesSince " + sequenceNumber, e); + } + } + + public long getLatestSequenceNumber() { + return db.getLatestSequenceNumber(); + } + + public RocksIterator newIterator(ColumnFamily family) { + return db.newIterator(family.getHandle()); + } + + public RocksIterator newIterator(ColumnFamily family, boolean fillCache) { + try (ReadOptions readOptions = new ReadOptions()) { + readOptions.setFillCache(fillCache); + return db.newIterator(family.getHandle(), readOptions); + } + } + + public void batchWrite(WriteBatch writeBatch, WriteOptions options) + throws IOException { + try { + db.write(options, writeBatch); + } catch (RocksDBException e) { + closeOnError(e); + throw toIOException(this, "batchWrite", e); + } + } + + public void batchWrite(WriteBatch writeBatch) throws IOException { + batchWrite(writeBatch, writeOptions); + } + + public void delete(ColumnFamily family, byte[] key) throws IOException { + try { + db.delete(family.getHandle(), key); + } catch (RocksDBException e) { + closeOnError(e); + final String message = "delete " + bytes2String(key) + " from " + family; + throw toIOException(this, message, e); + } + } + + @Override + public String toString() { + return name; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java index 33ab57b8121d..dc432ab9f1e3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java @@ -37,7 +37,7 @@ public byte[] toPersistedFormat(String object) throws IOException { } @Override - public String fromPersistedFormat(byte[] rawData) throws IOException { + public String fromPersistedFormat(byte[] rawData) { if (rawData != null) { return StringUtils.bytes2String(rawData); } else { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableConfig.java index 1bad84e50a40..83ea152001e3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableConfig.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableConfig.java @@ -30,6 +30,11 @@ * Class that maintains Table Configuration. */ public class TableConfig { + static TableConfig newTableConfig(byte[] bytes) { + return new TableConfig(StringUtils.bytes2String(bytes), + DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE.getColumnFamilyOptions()); + } + private final String name; private final ColumnFamilyOptions columnFamilyOptions; @@ -58,7 +63,7 @@ public String getName() { */ public ColumnFamilyDescriptor getDescriptor() { return new ColumnFamilyDescriptor(StringUtils.string2Bytes(name), - columnFamilyOptions); + new ColumnFamilyOptions(columnFamilyOptions)); } /** diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 00a86d6ac7a9..dfb6e6f54aae 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -104,7 +104,7 @@ public void tearDown() throws Exception { public void getHandle() throws Exception { try (Table testTable = rdbStore.getTable("First")) { Assertions.assertNotNull(testTable); - Assertions.assertNotNull(((RDBTable) testTable).getHandle()); + Assertions.assertNotNull(((RDBTable) testTable).getColumnFamily()); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index efebbf0b9220..87afa3497e2a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -78,7 +79,6 @@ import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER; import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; @@ -412,7 +412,7 @@ void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, if (null != dbUpdates && dbUpdates.getCurrentSequenceNumber() != -1) { latestSequenceNumberOfOM = dbUpdates.getLatestSequenceNumber(); RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore(); - RocksDB rocksDB = rocksDBStore.getDb(); + final RocksDatabase rocksDB = rocksDBStore.getDb(); numUpdates = dbUpdates.getData().size(); if (numUpdates > 0) { metrics.incrNumUpdatesInDeltaTotal(numUpdates); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index c73990297ab5..15b2ad6310ba 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.DBUpdates; @@ -77,7 +78,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.ArgumentCaptor; -import org.rocksdb.RocksDB; import org.rocksdb.TransactionLogIterator; import org.rocksdb.WriteBatch; @@ -204,6 +204,11 @@ public void testGetOzoneManagerDBSnapshot() throws Exception { .listFiles().length == 2); } + + static RocksDatabase getRocksDatabase(OMMetadataManager om) { + return ((RDBStore)om.getStore()).getDb(); + } + @Test public void testGetAndApplyDeltaUpdatesFromOM() throws Exception { @@ -214,7 +219,7 @@ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception { writeDataToOm(sourceOMMetadataMgr, "key_one"); writeDataToOm(sourceOMMetadataMgr, "key_two"); - RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb(); + final RocksDatabase rocksDB = getRocksDatabase(sourceOMMetadataMgr); TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L); DBUpdates dbUpdatesWrapper = new DBUpdates(); while (transactionLogIterator.isValid()) { @@ -276,7 +281,7 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit() throws Exception { writeDataToOm(sourceOMMetadataMgr, "key_one"); writeDataToOm(sourceOMMetadataMgr, "key_two"); - RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb(); + final RocksDatabase rocksDB = getRocksDatabase(sourceOMMetadataMgr); TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L); DBUpdates[] dbUpdatesWrapper = new DBUpdates[4]; int index = 0; diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java index ed03973c9865..38ee89da5d80 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.codec.OMDBDefinition; @@ -50,7 +51,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.TransactionLogIterator; import org.rocksdb.WriteBatch; @@ -309,9 +309,9 @@ public void testGetValueType() throws IOException { @NotNull private List getBytesFromOmMetaManager(int getUpdatesSince) - throws RocksDBException { + throws RocksDBException, IOException { RDBStore rdbStore = (RDBStore) omMetadataManager.getStore(); - RocksDB rocksDB = rdbStore.getDb(); + final RocksDatabase rocksDB = rdbStore.getDb(); // Get all updates from source DB TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(getUpdatesSince);