Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ public CodecBuffer toCodecBuffer(@Nonnull String object,
}

@Override
public String fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws IOException {
public String fromCodecBuffer(@Nonnull CodecBuffer buffer) {
return decode(buffer.asReadOnlyByteBuffer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -344,13 +343,7 @@ public File getDbLocation() {

@Override
public Map<Integer, String> getTableNames() {
Map<Integer, String> tableNames = new HashMap<>();
StringCodec stringCodec = StringCodec.get();

for (ColumnFamily columnFamily : getColumnFamilies()) {
tableNames.put(columnFamily.getID(), columnFamily.getName(stringCodec));
}
return tableNames;
return db.getColumnFamilyNames();
}

public Collection<ColumnFamily> getColumnFamilies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
import org.apache.ozone.rocksdiff.RocksDiffUtils;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.MemoizedSupplier;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.Holder;
Expand All @@ -51,7 +52,6 @@
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -66,7 +66,6 @@
import java.util.stream.Stream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions.closeDeeply;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator.managed;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator.managed;
Expand All @@ -86,10 +85,14 @@ public final class RocksDatabase implements Closeable {
}
private static final ManagedReadOptions DEFAULT_READ_OPTION =
new ManagedReadOptions();
private static Map<String, List<ColumnFamilyHandle>> dbNameToCfHandleMap =
new HashMap<>();

private final StackTraceElement[] stackTrace;
static String bytes2String(byte[] bytes) {
return StringCodec.get().fromPersistedFormat(bytes);
}

static String bytes2String(ByteBuffer bytes) {
return StringCodec.get().decode(bytes);
}

static IOException toIOException(Object name, String op, RocksDBException e) {
return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
Expand Down Expand Up @@ -158,15 +161,7 @@ static RocksDatabase open(File dbFile, ManagedDBOptions dbOptions,
db = ManagedRocksDB.open(dbOptions, dbFile.getAbsolutePath(),
descriptors, handles);
}
dbNameToCfHandleMap.put(db.get().getName(), handles);
// init a column family map.
AtomicLong counter = new AtomicLong(0);
for (ColumnFamilyHandle h : handles) {
final ColumnFamily f = new ColumnFamily(h, counter);
columnFamilies.put(f.getName(), f);
}
return new RocksDatabase(dbFile, db, dbOptions, writeOptions,
descriptors, Collections.unmodifiableMap(columnFamilies), counter);
return new RocksDatabase(dbFile, db, dbOptions, writeOptions, descriptors, handles);
} catch (RocksDBException e) {
close(columnFamilies, db, descriptors, writeOptions, dbOptions);
throw toIOException(RocksDatabase.class, "open " + dbFile, e);
Expand Down Expand Up @@ -260,17 +255,13 @@ public void close() throws IOException {
*
* @see ColumnFamilyHandle
*/
public static final class ColumnFamily {
public final class ColumnFamily {
private final byte[] nameBytes;
private AtomicLong counter;
private final String name;
private final ColumnFamilyHandle handle;
private AtomicBoolean isClosed = new AtomicBoolean(false);

public ColumnFamily(ColumnFamilyHandle handle, AtomicLong counter)
throws RocksDBException {
private ColumnFamily(ColumnFamilyHandle handle) throws RocksDBException {
this.nameBytes = handle.getName();
this.counter = counter;
this.name = bytes2String(nameBytes);
this.handle = handle;
LOG.debug("new ColumnFamily for {}", name);
Expand All @@ -289,10 +280,6 @@ public ColumnFamilyHandle getHandle() {
return handle;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please remove @VisibleForTesting. it is used in src code now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @VisibleForTesting tag was added by HDDS-8122 since it changed the method from protected to public for a unit test. The getHandle() method is already used many times by in RocksDatabase and a few times in RDBStore.

}

public int getID() {
return getHandle().getID();
}

public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
throws IOException {
try (UncheckedAutoCloseable ignored = acquire()) {
Expand Down Expand Up @@ -331,10 +318,6 @@ public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
}
}

public void markClosed() {
isClosed.set(true);
}

private UncheckedAutoCloseable acquire() throws IOException {
if (isClosed.get()) {
throw new IOException("Rocks Database is closed");
Expand All @@ -353,27 +336,49 @@ public String toString() {
}

private final String name;
private final Throwable creationStackTrace = new Throwable("Object creation stack trace");

private final ManagedRocksDB db;
private final ManagedDBOptions dbOptions;
private final ManagedWriteOptions writeOptions;
private final List<ColumnFamilyDescriptor> descriptors;
/** column family names -> {@link ColumnFamily}. */
private final Map<String, ColumnFamily> columnFamilies;
/** {@link ColumnFamilyHandle#getID()} -> column family names. */
private final Supplier<Map<Integer, String>> columnFamilyNames;

private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicLong counter;
/** Count the number of operations running concurrently. */
private final AtomicLong counter = new AtomicLong();

private RocksDatabase(File dbFile, ManagedRocksDB db,
ManagedDBOptions dbOptions, ManagedWriteOptions writeOptions,
List<ColumnFamilyDescriptor> descriptors,
Map<String, ColumnFamily> columnFamilies, AtomicLong counter) {
List<ColumnFamilyDescriptor> descriptors, List<ColumnFamilyHandle> handles) throws RocksDBException {
this.name = getClass().getSimpleName() + "[" + dbFile + "]";
this.db = db;
this.dbOptions = dbOptions;
this.writeOptions = writeOptions;
this.descriptors = descriptors;
this.columnFamilies = columnFamilies;
this.counter = counter;
this.stackTrace = Thread.currentThread().getStackTrace();
this.columnFamilies = toColumnFamilyMap(handles);
this.columnFamilyNames = MemoizedSupplier.valueOf(() -> toColumnFamilyNameMap(columnFamilies.values()));
}

private Map<String, ColumnFamily> toColumnFamilyMap(List<ColumnFamilyHandle> handles) throws RocksDBException {
final Map<String, ColumnFamily> map = new HashMap<>();
for (ColumnFamilyHandle h : handles) {
final ColumnFamily f = new ColumnFamily(h);
map.put(f.getName(), f);
}
return Collections.unmodifiableMap(map);
}

private static Map<Integer, String> toColumnFamilyNameMap(Collection<ColumnFamily> families) {
return Collections.unmodifiableMap(families.stream()
.collect(Collectors.toMap(f -> f.getHandle().getID(), ColumnFamily::getName)));
}

Map<Integer, String> getColumnFamilyNames() {
return columnFamilyNames.get();
}

@Override
Expand All @@ -389,10 +394,6 @@ private void close(boolean isSync) {
// Then close all attached listeners
dbOptions.listeners().forEach(listener -> listener.close());

if (columnFamilies != null) {
columnFamilies.values().stream().forEach(f -> f.markClosed());
}

if (isSync) {
waitAndClose();
return;
Expand Down Expand Up @@ -579,20 +580,9 @@ public void compactRange(String cfName) throws IOException {
}
}

private ColumnFamilyHandle getColumnFamilyHandle(String cfName)
throws IOException {
for (ColumnFamilyHandle cf : getCfHandleMap().get(db.get().getName())) {
try {
String table = new String(cf.getName(), UTF_8);
if (cfName.equals(table)) {
return cf;
}
} catch (RocksDBException e) {
closeOnError(e);
throw toIOException(this, "columnFamilyHandle.getName", e);
}
}
return null;
private ColumnFamilyHandle getColumnFamilyHandle(String columnFamilyName) {
final ColumnFamily columnFamily = getColumnFamily(columnFamilyName);
return columnFamily != null ? columnFamily.getHandle() : null;
}

public void compactRange(ColumnFamily family, final byte[] begin,
Expand Down Expand Up @@ -896,20 +886,10 @@ public void deleteFilesNotMatchingPrefix(Map<String, String> prefixPairs)
}
}

public static Map<String, List<ColumnFamilyHandle>> getCfHandleMap() {
return dbNameToCfHandleMap;
}

@Override
protected void finalize() throws Throwable {
if (!isClosed()) {
String warning = "RocksDatabase is not closed properly.";
if (LOG.isDebugEnabled()) {
String debugMessage = String.format("%n StackTrace for unclosed " +
"RocksDatabase instance: %s", Arrays.toString(stackTrace));
warning = warning.concat(debugMessage);
}
LOG.warn(warning);
LOG.warn("RocksDatabase {} is not closed properly.", name, creationStackTrace);
}
super.finalize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public void testDBDefinition() throws Exception {
ArrayList<String> missingDBDefTables = new ArrayList<>();

// Get list of tables from the RocksDB Store
Collection<String> missingOmDBTables =
store.getTableNames().values();
final Collection<String> missingOmDBTables = new ArrayList<>(store.getTableNames().values());
missingOmDBTables.remove("default");
int countOmDBTables = missingOmDBTables.size();
// Remove the file if it is found in both the datastructures
Expand Down