Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.DBProfile;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
Expand Down Expand Up @@ -57,9 +58,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.RocksDBException;

import java.io.File;

Expand Down Expand Up @@ -504,8 +503,7 @@ public void testUpdateContainerUnsupportedRequest() throws Exception {
}

@Test
public void testContainerRocksDB()
throws StorageContainerException, RocksDBException {
public void testContainerRocksDB() throws Exception {
closeContainer();
keyValueContainer = new KeyValueContainer(
keyValueContainerData, CONF);
Expand All @@ -518,7 +516,7 @@ public void testContainerRocksDB()
long cacheSize = Long.parseLong(store
.getProperty("rocksdb.block-cache-capacity"));
Assert.assertEquals(defaultCacheSize, cacheSize);
for (ColumnFamilyHandle handle : store.getColumnFamilyHandles()) {
for (ColumnFamily handle : store.getColumnFamilies()) {
cacheSize = Long.parseLong(
store.getProperty(handle, "rocksdb.block-cache-capacity"));
Assert.assertEquals(defaultCacheSize, cacheSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
*/
package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

import java.io.IOException;

/**
* Batch operation implementation for rocks db.
*/
Expand All @@ -41,33 +39,26 @@ public RDBBatchOperation(WriteBatch writeBatch) {
this.writeBatch = writeBatch;
}

public void commit(RocksDB db, WriteOptions writeOptions) throws IOException {
try {
db.write(writeOptions, writeBatch);
} catch (RocksDBException e) {
throw new IOException("Unable to write the batch.", e);
}
public void commit(RocksDatabase db) throws IOException {
db.batchWrite(writeBatch);
}

public void commit(RocksDatabase db, WriteOptions writeOptions)
throws IOException {
db.batchWrite(writeBatch, writeOptions);
}

@Override
public void close() {
writeBatch.close();
}

public void delete(ColumnFamilyHandle handle, byte[] key) throws IOException {
try {
writeBatch.delete(handle, key);
} catch (RocksDBException e) {
throw new IOException("Can't record batch delete operation.", e);
}
public void delete(ColumnFamily family, byte[] key) throws IOException {
family.batchDelete(writeBatch, key);
}

public void put(ColumnFamilyHandle handle, byte[] key, byte[] value)
public void put(ColumnFamily family, byte[] key, byte[] value)
throws IOException {
try {
writeBatch.put(handle, key, value);
} catch (RocksDBException e) {
throw new IOException("Can't record batch put operation.", e);
}
family.batchPut(writeBatch, key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;

import org.apache.commons.lang3.StringUtils;
import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.RocksCheckpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,29 +36,21 @@
*/
public class RDBCheckpointManager {

private final Checkpoint checkpoint;
private final RocksDB db;
private final RocksCheckpoint checkpoint;
public static final String RDB_CHECKPOINT_DIR_PREFIX = "checkpoint_";
private static final Logger LOG =
LoggerFactory.getLogger(RDBCheckpointManager.class);
private String checkpointNamePrefix = "";

public RDBCheckpointManager(RocksDB rocksDB) {
this.db = rocksDB;
this.checkpoint = Checkpoint.create(rocksDB);
}
private final String checkpointNamePrefix;

/**
* Create a checkpoint manager with a prefix to be added to the
* snapshots created.
*
* @param rocksDB DB instance
* @param checkpointPrefix prefix string.
*/
public RDBCheckpointManager(RocksDB rocksDB, String checkpointPrefix) {
this.db = rocksDB;
public RDBCheckpointManager(RocksDatabase db, String checkpointPrefix) {
this.checkpointNamePrefix = checkpointPrefix;
this.checkpoint = Checkpoint.create(rocksDB);
this.checkpoint = db.createCheckpoint();
}

/**
Expand All @@ -79,20 +71,21 @@ public RocksDBCheckpoint createCheckpoint(String parentDir) {

Path checkpointPath = Paths.get(parentDir, checkpointDir);
Instant start = Instant.now();
checkpoint.createCheckpoint(checkpointPath.toString());
checkpoint.createCheckpoint(checkpointPath);
//Best guesstimate here. Not accurate.
final long latest = checkpoint.getLatestSequenceNumber();
Instant end = Instant.now();

long duration = Duration.between(start, end).toMillis();
LOG.info("Created checkpoint at {} in {} milliseconds",
checkpointPath.toString(), duration);
checkpointPath, duration);

return new RocksDBCheckpoint(
checkpointPath,
currentTime,
db.getLatestSequenceNumber(), //Best guesstimate here. Not accurate.
latest,
duration);

} catch (RocksDBException e) {
} catch (IOException e) {
LOG.error("Unable to create RocksDB Snapshot.", e);
}
return null;
Expand Down
Loading