Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType,
lock.lock();
try {
db = (ReferenceCountedDB) this.get(containerDBPath);
if (db != null) {
if (db != null && !db.isClosed()) {
metrics.incNumCacheHits();
db.incrementReference();
return db;
} else {
metrics.incNumCacheMisses();
}
if (db != null && db.isClosed()) {
removeDB(containerDBPath);
}
metrics.incNumCacheMisses();
} finally {
lock.unlock();
}
Expand All @@ -170,18 +172,19 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType,
try {
ReferenceCountedDB currentDB =
(ReferenceCountedDB) this.get(containerDBPath);
if (currentDB != null) {
if (currentDB != null && !currentDB.isClosed()) {
// increment the reference before returning the object
currentDB.incrementReference();
// clean the db created in previous step
cleanupDb(db);
return currentDB;
} else {
this.put(containerDBPath, db);
// increment the reference before returning the object
db.incrementReference();
return db;
} else if (currentDB != null && currentDB.isClosed()) {
removeDB(containerDBPath);
}
this.put(containerDBPath, db);
// increment the reference before returning the object
db.incrementReference();
return db;
} finally {
lock.unlock();
}
Expand All @@ -200,8 +203,11 @@ public void removeDB(String containerDBPath) {
try {
ReferenceCountedDB db = (ReferenceCountedDB)this.get(containerDBPath);
if (db != null) {
Preconditions.checkArgument(cleanupDb(db), "refCount:",
db.getReferenceCount());
boolean cleaned = cleanupDb(db);
if (!db.isClosed()) {
Preconditions.checkArgument(cleaned, "refCount:",
db.getReferenceCount());
}
}
this.remove(containerDBPath);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -67,7 +68,8 @@ public void decrementReference() {
}

public boolean cleanup() {
if (referenceCount.get() == 0 && getStore() != null) {
if (getStore() != null && getStore().isClosed()
|| referenceCount.get() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Close {} refCnt {}", getContainerDBPath(),
referenceCount.get());
Expand All @@ -85,7 +87,15 @@ public boolean cleanup() {
}

@Override
public void close() {
public void close() throws IOException {
decrementReference();
}

/**
* Returns if the underlying DB is closed. This call is threadsafe.
* @return true if the DB is closed.
*/
public boolean isClosed() {
return getStore().isClosed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public abstract class AbstractDatanodeStore implements DatanodeStore {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractDatanodeStore.class);
private DBStore store;
private volatile DBStore store;
private final AbstractDatanodeDBDefinition dbDef;
private final ColumnFamilyOptions cfOptions;

Expand Down Expand Up @@ -143,7 +143,7 @@ public void start(ConfigurationSource config)
}

@Override
public void stop() throws Exception {
public synchronized void stop() throws Exception {
if (store != null) {
store.close();
store = null;
Expand Down Expand Up @@ -189,6 +189,19 @@ public BlockIterator<BlockData> getBlockIterator(long containerID,
blockDataTableWithIterator.iterator(), filter);
}

@Override
public synchronized boolean isClosed() {
if (this.store == null) {
return true;
}
return this.store.isClosed();
}

@Override
public void close() throws IOException {
this.store.close();
}

@Override
public void flushDB() throws IOException {
store.flushDB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;

import java.io.Closeable;
import java.io.IOException;

/**
* Interface for interacting with datanode databases.
*/
public interface DatanodeStore {
public interface DatanodeStore extends Closeable {

/**
* Start datanode manager.
Expand Down Expand Up @@ -92,4 +93,10 @@ BlockIterator<BlockData> getBlockIterator(long containerID)

BlockIterator<BlockData> getBlockIterator(long containerID,
KeyPrefixFilter filter) throws IOException;

/**
* Returns if the underlying DB is closed. This call is thread safe.
* @return true if the DB is closed.
*/
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;


/**
* Test ContainerCache with evictions.
Expand Down Expand Up @@ -73,7 +75,7 @@ public void testContainerCacheEviction() throws Exception {

ContainerCache cache = ContainerCache.getInstance(conf);
cache.clear();
Assert.assertEquals(0, cache.size());
assertEquals(0, cache.size());
File containerDir1 = new File(root, "cont1");
File containerDir2 = new File(root, "cont2");
File containerDir3 = new File(root, "cont3");
Expand All @@ -92,52 +94,52 @@ public void testContainerCacheEviction() throws Exception {
ReferenceCountedDB db1 = cache.getDB(1, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertEquals(1, db1.getReferenceCount());
Assert.assertEquals(numDbGetCount + 1, metrics.getNumDbGetOps());
assertEquals(1, db1.getReferenceCount());
assertEquals(numDbGetCount + 1, metrics.getNumDbGetOps());
ReferenceCountedDB db2 = cache.getDB(1, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertEquals(2, db2.getReferenceCount());
Assert.assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses());
Assert.assertEquals(2, db1.getReferenceCount());
Assert.assertEquals(db1, db2);
Assert.assertEquals(numDbGetCount + 2, metrics.getNumDbGetOps());
Assert.assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses());
assertEquals(2, db2.getReferenceCount());
assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses());
assertEquals(2, db1.getReferenceCount());
assertEquals(db1, db2);
assertEquals(numDbGetCount + 2, metrics.getNumDbGetOps());
assertEquals(numCacheMisses + 1, metrics.getNumCacheMisses());

// add one more references to ContainerCache.
ReferenceCountedDB db3 = cache.getDB(2, "RocksDB",
containerDir2.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertEquals(1, db3.getReferenceCount());
assertEquals(1, db3.getReferenceCount());

// and close the reference
db3.close();
Assert.assertEquals(0, db3.getReferenceCount());
assertEquals(0, db3.getReferenceCount());

// add one more reference to ContainerCache and verify that it will not
// evict the least recent entry as it has reference.
ReferenceCountedDB db4 = cache.getDB(3, "RocksDB",
containerDir3.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertEquals(1, db4.getReferenceCount());
assertEquals(1, db4.getReferenceCount());

Assert.assertEquals(2, cache.size());
assertEquals(2, cache.size());
Assert.assertNotNull(cache.get(containerDir1.getPath()));
Assert.assertNull(cache.get(containerDir2.getPath()));

// Now close both the references for container1
db1.close();
db2.close();
Assert.assertEquals(0, db1.getReferenceCount());
Assert.assertEquals(0, db2.getReferenceCount());
assertEquals(0, db1.getReferenceCount());
assertEquals(0, db2.getReferenceCount());


// The reference count for container1 is 0 but it is not evicted.
ReferenceCountedDB db5 = cache.getDB(1, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertEquals(1, db5.getReferenceCount());
Assert.assertEquals(db1, db5);
assertEquals(1, db5.getReferenceCount());
assertEquals(db1, db5);
db5.close();
db4.close();

Expand All @@ -157,7 +159,7 @@ public void testConcurrentDBGet() throws Exception {
conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2);
ContainerCache cache = ContainerCache.getInstance(conf);
cache.clear();
Assert.assertEquals(0, cache.size());
assertEquals(0, cache.size());
File containerDir = new File(root, "cont1");
createContainerDB(conf, containerDir);
ExecutorService executorService = Executors.newFixedThreadPool(2);
Expand Down Expand Up @@ -188,7 +190,44 @@ public void testConcurrentDBGet() throws Exception {
db.close();
db.close();
db.close();
Assert.assertEquals(1, cache.size());
assertEquals(1, cache.size());
db.cleanup();
}

@Test
public void testUnderlyingDBzIsClosed() throws Exception {
File root = new File(testRoot);
root.mkdirs();

OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2);

ContainerCache cache = ContainerCache.getInstance(conf);
cache.clear();
assertEquals(0, cache.size());
File containerDir1 = new File(root, "cont100");

createContainerDB(conf, containerDir1);
ReferenceCountedDB db1 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
ReferenceCountedDB db2 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
assertEquals(db1, db2);
db1.getStore().getStore().close();
ReferenceCountedDB db3 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
ReferenceCountedDB db4 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertNotEquals(db3, db2);
assertEquals(db4, db3);
db1.close();
db2.close();
db3.close();
db4.close();
cache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.hdds.utils.db;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -35,7 +36,7 @@
*
*/
@InterfaceStability.Evolving
public interface DBStore extends AutoCloseable, BatchOperationHandler {
public interface DBStore extends Closeable, BatchOperationHandler {

/**
* Gets an existing TableStore.
Expand Down Expand Up @@ -197,4 +198,10 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException;

/**
* Return if the underlying DB is closed. This call is thread safe.
* @return true if the DB is closed.
*/
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
return dbUpdatesWrapper;
}

@Override
public boolean isClosed() {
return db.isClosed();
}

@VisibleForTesting
public RocksDatabase getDb() {
return db;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class RocksDatabase {

static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";


static IOException toIOException(Object name, String op, RocksDBException e) {
return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
}
Expand Down Expand Up @@ -177,6 +178,10 @@ static Predicate<TableConfig> extraColumnFamily(Set<TableConfig> families) {
};
}

public boolean isClosed() {
return isClosed.get();
}

/**
* Represents a checkpoint of the db.
*
Expand Down Expand Up @@ -281,7 +286,7 @@ private RocksDatabase(File dbFile, RocksDB db, DBOptions dbOptions,
this.columnFamilies = columnFamilies;
}

void close() {
public void close() {
if (isClosed.compareAndSet(false, true)) {
close(columnFamilies, db, descriptors, writeOptions, dbOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public void close() throws Exception {
// This test does not assert anything if there is any error this test
// will throw and fail.
rdbStore.close();
Assertions.assertTrue(rdbStore.isClosed());
}

@Test
public void closeUnderlyingDB() throws Exception {
Assertions.assertNotNull(rdbStore, "DBStore cannot be null");
rdbStore.getDb().close();
Assertions.assertTrue(rdbStore.isClosed());
}

@Test
Expand Down