Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType,
lock.lock();
try {
db = (ReferenceCountedDB) this.get(containerDBPath);
if (db != null) {
if (db != null && !db.isClosed()) {
metrics.incNumCacheHits();
db.incrementReference();
return db;
} else {
metrics.incNumCacheMisses();
}
if (db != null && db.isClosed()) {
removeDB(containerDBPath);
}
metrics.incNumCacheMisses();
} finally {
lock.unlock();
}
Expand All @@ -170,18 +172,19 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType,
try {
ReferenceCountedDB currentDB =
(ReferenceCountedDB) this.get(containerDBPath);
if (currentDB != null) {
if (currentDB != null && !currentDB.isClosed()) {
// increment the reference before returning the object
currentDB.incrementReference();
// clean the db created in previous step
cleanupDb(db);
return currentDB;
} else {
this.put(containerDBPath, db);
// increment the reference before returning the object
db.incrementReference();
return db;
} else if (currentDB != null && currentDB.isClosed()) {
removeDB(containerDBPath);
}
this.put(containerDBPath, db);
// increment the reference before returning the object
db.incrementReference();
return db;
} finally {
lock.unlock();
}
Expand All @@ -200,8 +203,11 @@ public void removeDB(String containerDBPath) {
try {
ReferenceCountedDB db = (ReferenceCountedDB)this.get(containerDBPath);
if (db != null) {
Preconditions.checkArgument(cleanupDb(db), "refCount:",
db.getReferenceCount());
boolean cleaned = cleanupDb(db);
if (!db.isClosed()) {
Preconditions.checkArgument(cleaned, "refCount:",
db.getReferenceCount());
}
}
this.remove(containerDBPath);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -70,7 +71,7 @@ public void decrementReference() {
}

public boolean cleanup() {
if (referenceCount.get() == 0 && store != null) {
if (store != null && store.isClosed() || referenceCount.get() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Close {} refCnt {}", containerDBPath,
referenceCount.get());
Expand All @@ -80,7 +81,7 @@ public boolean cleanup() {
return true;
} catch (Exception e) {
LOG.error("Error closing DB. Container: " + containerDBPath, e);
return false;
return true;
}
} else {
return false;
Expand All @@ -92,7 +93,15 @@ public DatanodeStore getStore() {
}

@Override
public void close() {
public void close() throws IOException {
decrementReference();
}

/**
* Returns if the underlying DB is closed. This call is threadsafe.
* @return true if the DB is closed.
*/
public boolean isClosed() {
return store.isClosed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,19 @@ public BlockIterator<BlockData> getBlockIterator(KeyPrefixFilter filter) {
blockDataTableWithIterator.iterator(), filter);
}

@Override
public boolean isClosed() {
if (this.store == null) {
return true;
}
return this.store.isClosed();
}

@Override
public void close() throws IOException {
this.store.close();
}

@Override
public void flushDB() throws IOException {
store.flushDB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;

import java.io.Closeable;
import java.io.IOException;

/**
* Interface for interacting with datanode databases.
*/
public interface DatanodeStore {
public interface DatanodeStore extends Closeable {

/**
* Start datanode manager.
Expand Down Expand Up @@ -91,4 +92,11 @@ public interface DatanodeStore {

BlockIterator<BlockData>
getBlockIterator(MetadataKeyFilters.KeyPrefixFilter filter);

/**
* Returns if the underlying DB is closed. This call is thread safe.
* @return true if the DB is closed.
*/
boolean isClosed();

}
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,41 @@ public void testConcurrentDBGet() throws Exception {
Assert.assertEquals(1, cache.size());
db.cleanup();
}

@Test
public void testUnderlyingDBzIsClosed() throws Exception {
File root = new File(testRoot);
root.mkdirs();

OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2);

ContainerCache cache = ContainerCache.getInstance(conf);
cache.clear();
Assert.assertEquals(0, cache.size());
File containerDir1 = new File(root, "cont100");

createContainerDB(conf, containerDir1);
ReferenceCountedDB db1 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
ReferenceCountedDB db2 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertEquals(db1, db2);
db1.getStore().getStore().close();
ReferenceCountedDB db3 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
ReferenceCountedDB db4 = cache.getDB(100, "RocksDB",
containerDir1.getPath(),
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), conf);
Assert.assertNotEquals(db3, db2);
Assert.assertEquals(db4, db3);
db1.close();
db2.close();
db3.close();
db4.close();
cache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.hdds.utils.db;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -35,7 +36,7 @@
*
*/
@InterfaceStability.Evolving
public interface DBStore extends AutoCloseable, BatchOperationHandler {
public interface DBStore extends Closeable, BatchOperationHandler {

/**
* Gets an existing TableStore.
Expand Down Expand Up @@ -197,4 +198,10 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException;

/**
* Return if the underlying DB is closed. This call is thread safe.
* @return true if the DB is closed.
*/
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
return dbUpdatesWrapper;
}

@Override
public boolean isClosed() {
return db.isClosed();
}

@VisibleForTesting
public RocksDatabase getDb() {
return db;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class RocksDatabase {

static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";


static IOException toIOException(Object name, String op, RocksDBException e) {
return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
}
Expand Down Expand Up @@ -176,6 +177,10 @@ static Predicate<TableConfig> extraColumnFamily(Set<TableConfig> families) {
};
}

public boolean isClosed() {
return isClosed.get();
}

/**
* Represents a checkpoint of the db.
*
Expand Down Expand Up @@ -280,7 +285,7 @@ private RocksDatabase(File dbFile, RocksDB db, DBOptions dbOptions,
this.columnFamilies = columnFamilies;
}

void close() {
public void close() {
if (isClosed.compareAndSet(false, true)) {
close(columnFamilies, db, descriptors, writeOptions, dbOptions);
}
Expand Down