diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java index 1551836858ea..741c65e130a7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java @@ -143,13 +143,15 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType, lock.lock(); try { db = (ReferenceCountedDB) this.get(containerDBPath); - if (db != null) { + if (db != null && !db.isClosed()) { metrics.incNumCacheHits(); db.incrementReference(); return db; - } else { - metrics.incNumCacheMisses(); } + if (db != null && db.isClosed()) { + removeDB(containerDBPath); + } + metrics.incNumCacheMisses(); } finally { lock.unlock(); } @@ -170,18 +172,19 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType, try { ReferenceCountedDB currentDB = (ReferenceCountedDB) this.get(containerDBPath); - if (currentDB != null) { + if (currentDB != null && !currentDB.isClosed()) { // increment the reference before returning the object currentDB.incrementReference(); // clean the db created in previous step cleanupDb(db); return currentDB; - } else { - this.put(containerDBPath, db); - // increment the reference before returning the object - db.incrementReference(); - return db; + } else if (currentDB != null && currentDB.isClosed()) { + removeDB(containerDBPath); } + this.put(containerDBPath, db); + // increment the reference before returning the object + db.incrementReference(); + return db; } finally { lock.unlock(); } @@ -200,8 +203,11 @@ public void removeDB(String containerDBPath) { try { ReferenceCountedDB db = (ReferenceCountedDB)this.get(containerDBPath); if (db != null) { - Preconditions.checkArgument(cleanupDb(db), "refCount:", - db.getReferenceCount()); + boolean cleaned = cleanupDb(db); + if (!db.isClosed()) { + Preconditions.checkArgument(cleaned, "refCount:", + db.getReferenceCount()); + } } this.remove(containerDBPath); } finally { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ReferenceCountedDB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ReferenceCountedDB.java index 3f858009c047..a992b2dea5ae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ReferenceCountedDB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ReferenceCountedDB.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -67,7 +68,8 @@ public void decrementReference() { } public boolean cleanup() { - if (referenceCount.get() == 0 && getStore() != null) { + if (getStore() != null && getStore().isClosed() + || referenceCount.get() == 0) { if (LOG.isDebugEnabled()) { LOG.debug("Close {} refCnt {}", getContainerDBPath(), referenceCount.get()); @@ -85,7 +87,15 @@ public boolean cleanup() { } @Override - public void close() { + public void close() throws IOException { decrementReference(); } + + /** + * Returns if the underlying DB is closed. This call is threadsafe. + * @return true if the DB is closed. + */ + public boolean isClosed() { + return getStore().isClosed(); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index cab08dd5cd0a..c43b37cd89c4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -65,7 +65,7 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); - private DBStore store; + private volatile DBStore store; private final AbstractDatanodeDBDefinition dbDef; private final ColumnFamilyOptions cfOptions; @@ -143,7 +143,7 @@ public void start(ConfigurationSource config) } @Override - public void stop() throws Exception { + public synchronized void stop() throws Exception { if (store != null) { store.close(); store = null; @@ -189,6 +189,19 @@ public BlockIterator getBlockIterator(long containerID, blockDataTableWithIterator.iterator(), filter); } + @Override + public synchronized boolean isClosed() { + if (this.store == null) { + return true; + } + return this.store.isClosed(); + } + + @Override + public void close() throws IOException { + this.store.close(); + } + @Override public void flushDB() throws IOException { store.flushDB(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index d48a93232bfb..c175e8e2625a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -27,12 +27,13 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; +import java.io.Closeable; import java.io.IOException; /** * Interface for interacting with datanode databases. */ -public interface DatanodeStore { +public interface DatanodeStore extends Closeable { /** * Start datanode manager. @@ -92,4 +93,10 @@ BlockIterator getBlockIterator(long containerID) BlockIterator getBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException; + + /** + * Returns if the underlying DB is closed. This call is thread safe. + * @return true if the DB is closed. + */ + boolean isClosed(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestContainerCache.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestContainerCache.java index a6ef4617c9e1..6188d0277c0e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestContainerCache.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestContainerCache.java @@ -41,6 +41,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.junit.Assert.assertEquals; + /** * Test ContainerCache with evictions. @@ -73,7 +75,7 @@ public void testContainerCacheEviction() throws Exception { ContainerCache cache = ContainerCache.getInstance(conf); cache.clear(); - Assert.assertEquals(0, cache.size()); + assertEquals(0, cache.size()); File containerDir1 = new File(root, "cont1"); File containerDir2 = new File(root, "cont2"); File containerDir3 = new File(root, "cont3"); @@ -92,52 +94,52 @@ public void testContainerCacheEviction() throws Exception { ReferenceCountedDB db1 = cache.getDB(1, "RocksDB", containerDir1.getPath(), VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); - Assert.assertEquals(1, db1.getReferenceCount()); - Assert.assertEquals(numDbGetCount + 1, metrics.getNumDbGetOps()); + assertEquals(1, db1.getReferenceCount()); + assertEquals(numDbGetCount + 1, metrics.getNumDbGetOps()); ReferenceCountedDB db2 = cache.getDB(1, "RocksDB", containerDir1.getPath(), VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); - Assert.assertEquals(2, db2.getReferenceCount()); - Assert.assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses()); - Assert.assertEquals(2, db1.getReferenceCount()); - Assert.assertEquals(db1, db2); - Assert.assertEquals(numDbGetCount + 2, metrics.getNumDbGetOps()); - Assert.assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses()); + assertEquals(2, db2.getReferenceCount()); + assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses()); + assertEquals(2, db1.getReferenceCount()); + assertEquals(db1, db2); + assertEquals(numDbGetCount + 2, metrics.getNumDbGetOps()); + assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses()); // add one more references to ContainerCache. ReferenceCountedDB db3 = cache.getDB(2, "RocksDB", containerDir2.getPath(), VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); - Assert.assertEquals(1, db3.getReferenceCount()); + assertEquals(1, db3.getReferenceCount()); // and close the reference db3.close(); - Assert.assertEquals(0, db3.getReferenceCount()); + assertEquals(0, db3.getReferenceCount()); // add one more reference to ContainerCache and verify that it will not // evict the least recent entry as it has reference. ReferenceCountedDB db4 = cache.getDB(3, "RocksDB", containerDir3.getPath(), VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); - Assert.assertEquals(1, db4.getReferenceCount()); + assertEquals(1, db4.getReferenceCount()); - Assert.assertEquals(2, cache.size()); + assertEquals(2, cache.size()); Assert.assertNotNull(cache.get(containerDir1.getPath())); Assert.assertNull(cache.get(containerDir2.getPath())); // Now close both the references for container1 db1.close(); db2.close(); - Assert.assertEquals(0, db1.getReferenceCount()); - Assert.assertEquals(0, db2.getReferenceCount()); + assertEquals(0, db1.getReferenceCount()); + assertEquals(0, db2.getReferenceCount()); // The reference count for container1 is 0 but it is not evicted. ReferenceCountedDB db5 = cache.getDB(1, "RocksDB", containerDir1.getPath(), VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); - Assert.assertEquals(1, db5.getReferenceCount()); - Assert.assertEquals(db1, db5); + assertEquals(1, db5.getReferenceCount()); + assertEquals(db1, db5); db5.close(); db4.close(); @@ -157,7 +159,7 @@ public void testConcurrentDBGet() throws Exception { conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2); ContainerCache cache = ContainerCache.getInstance(conf); cache.clear(); - Assert.assertEquals(0, cache.size()); + assertEquals(0, cache.size()); File containerDir = new File(root, "cont1"); createContainerDB(conf, containerDir); ExecutorService executorService = Executors.newFixedThreadPool(2); @@ -188,7 +190,44 @@ public void testConcurrentDBGet() throws Exception { db.close(); db.close(); db.close(); - Assert.assertEquals(1, cache.size()); + assertEquals(1, cache.size()); db.cleanup(); } + + @Test + public void testUnderlyingDBzIsClosed() throws Exception { + File root = new File(testRoot); + root.mkdirs(); + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2); + + ContainerCache cache = ContainerCache.getInstance(conf); + cache.clear(); + assertEquals(0, cache.size()); + File containerDir1 = new File(root, "cont100"); + + createContainerDB(conf, containerDir1); + ReferenceCountedDB db1 = cache.getDB(100, "RocksDB", + containerDir1.getPath(), + VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); + ReferenceCountedDB db2 = cache.getDB(100, "RocksDB", + containerDir1.getPath(), + VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); + assertEquals(db1, db2); + db1.getStore().getStore().close(); + ReferenceCountedDB db3 = cache.getDB(100, "RocksDB", + containerDir1.getPath(), + VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); + ReferenceCountedDB db4 = cache.getDB(100, "RocksDB", + containerDir1.getPath(), + VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf); + Assert.assertNotEquals(db3, db2); + assertEquals(db4, db3); + db1.close(); + db2.close(); + db3.close(); + db4.close(); + cache.clear(); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 2ac2bdc73062..f385da9b89bd 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +36,7 @@ * */ @InterfaceStability.Evolving -public interface DBStore extends AutoCloseable, BatchOperationHandler { +public interface DBStore extends Closeable, BatchOperationHandler { /** * Gets an existing TableStore. @@ -197,4 +198,10 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber) */ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) throws SequenceNumberNotFoundException; + + /** + * Return if the underlying DB is closed. This call is thread safe. + * @return true if the DB is closed. + */ + boolean isClosed(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index 467cf4462db7..baaa17e49ac7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -327,6 +327,11 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) return dbUpdatesWrapper; } + @Override + public boolean isClosed() { + return db.isClosed(); + } + @VisibleForTesting public RocksDatabase getDb() { return db; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 50b372396448..4ac83c912234 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -64,6 +64,7 @@ public final class RocksDatabase { static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys"; + static IOException toIOException(Object name, String op, RocksDBException e) { return HddsServerUtil.toIOException(name + ": Failed to " + op, e); } @@ -177,6 +178,10 @@ static Predicate extraColumnFamily(Set families) { }; } + public boolean isClosed() { + return isClosed.get(); + } + /** * Represents a checkpoint of the db. * @@ -281,7 +286,7 @@ private RocksDatabase(File dbFile, RocksDB db, DBOptions dbOptions, this.columnFamilies = columnFamilies; } - void close() { + public void close() { if (isClosed.compareAndSet(false, true)) { close(columnFamilies, db, descriptors, writeOptions, dbOptions); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java index b40fe61c1dc9..3b7a3fc80d72 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java @@ -111,6 +111,14 @@ public void close() throws Exception { // This test does not assert anything if there is any error this test // will throw and fail. rdbStore.close(); + Assertions.assertTrue(rdbStore.isClosed()); + } + + @Test + public void closeUnderlyingDB() throws Exception { + Assertions.assertNotNull(rdbStore, "DBStore cannot be null"); + rdbStore.getDb().close(); + Assertions.assertTrue(rdbStore.isClosed()); } @Test