diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 018ff8066767..f8f908b90980 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3656,4 +3656,22 @@
without using the optimised DAG based pruning approach
+
+
+ ozone.om.snapshot.sst_dumptool.pool.size
+ 1
+ OZONE, OM
+
+ Threadpool size for SST Dumptool which would be used for computing snapdiff when native library is enabled.
+
+
+
+
+ ozone.om.snapshot.sst_dumptool.buffer.size
+ 8KB
+ OZONE, OM
+
+ Buffer size for SST Dumptool Pipe which would be used for computing snapdiff when native library is enabled.
+
+
diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
index 35aaeb33b0a8..a083a2b98810 100644
--- a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
+++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java
@@ -17,34 +17,39 @@
package org.apache.hadoop.hdds.utils.db.managed;
+import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.eclipse.jetty.io.RuntimeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Iterator to Parse output of RocksDBSSTDumpTool.
*/
-public class ManagedSSTDumpIterator implements
- Iterator, AutoCloseable {
- private static final String SST_DUMP_TOOL_CLASS =
- "org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool";
+public abstract class ManagedSSTDumpIterator implements ClosableIterator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
private static final String PATTERN_REGEX =
- "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
+ "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
public static final int PATTERN_KEY_GROUP_NUMBER = 1;
public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
- private static final Pattern PATTERN_MATCHER =
- Pattern.compile(PATTERN_REGEX);
+ private static final Pattern PATTERN_MATCHER = Pattern.compile(PATTERN_REGEX);
private BufferedReader processOutput;
private StringBuilder stdoutString;
@@ -56,32 +61,33 @@ public class ManagedSSTDumpIterator implements
private ManagedSSTDumpTool.SSTDumpToolTask sstDumpToolTask;
private AtomicBoolean open;
+ private StackTraceElement[] stackTrace;
public ManagedSSTDumpIterator(ManagedSSTDumpTool sstDumpTool,
- String sstFilePath,
- ManagedOptions options) throws IOException,
- NativeLibraryNotLoadedException {
+ String sstFilePath, ManagedOptions options)
+ throws IOException, NativeLibraryNotLoadedException {
File sstFile = new File(sstFilePath);
if (!sstFile.exists()) {
throw new IOException(String.format("File in path : %s doesn't exist",
- sstFile.getAbsolutePath()));
+ sstFile.getAbsolutePath()));
}
if (!sstFile.isFile()) {
throw new IOException(String.format("Path given: %s is not a file",
- sstFile.getAbsolutePath()));
+ sstFile.getAbsolutePath()));
}
init(sstDumpTool, sstFile, options);
+ this.stackTrace = Thread.currentThread().getStackTrace();
}
private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
ManagedOptions options)
- throws NativeLibraryNotLoadedException {
- String[] args = {"--file=" + sstFile.getAbsolutePath(),
- "--command=scan"};
+ throws NativeLibraryNotLoadedException {
+ String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
this.sstDumpToolTask = sstDumpTool.run(args, options);
- processOutput = new BufferedReader(new InputStreamReader(
- sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
+ processOutput = new BufferedReader(
+ new InputStreamReader(sstDumpToolTask.getPipedOutput(),
+ StandardCharsets.UTF_8));
stdoutString = new StringBuilder();
currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
charBuffer = new char[8192];
@@ -97,15 +103,16 @@ private void checkSanityOfProcess() {
if (!this.open.get()) {
throw new RuntimeException("Iterator has been closed");
}
- if (sstDumpToolTask.getFuture().isDone()
- && sstDumpToolTask.exitValue() != 0) {
+ if (sstDumpToolTask.getFuture().isDone() &&
+ sstDumpToolTask.exitValue() != 0) {
throw new RuntimeException("Process Terminated with non zero " +
- String.format("exit value %d", sstDumpToolTask.exitValue()));
+ String.format("exit value %d", sstDumpToolTask.exitValue()));
}
}
/**
* Checks the status of the process & sees if there is another record.
+ *
* @return True if next exists & false otherwise
* Throws Runtime Exception in case of SST File read failure
*/
@@ -116,21 +123,30 @@ public boolean hasNext() {
return nextKey != null;
}
+ /**
+ * Transforms Key to a certain value.
+ *
+ * @param value
+ * @return transformed Value
+ */
+ protected abstract T getTransformedValue(KeyValue value);
+
/**
* Returns the next record from SSTDumpTool.
+ *
* @return next Key
* Throws Runtime Exception incase of failure.
*/
@Override
- public KeyValue next() {
+ public T next() {
checkSanityOfProcess();
currentKey = nextKey;
nextKey = null;
while (!currentMatcher.find()) {
try {
if (prevMatchEndIndex != 0) {
- stdoutString = new StringBuilder(stdoutString.substring(
- prevMatchEndIndex, stdoutString.length()));
+ stdoutString = new StringBuilder(
+ stdoutString.substring(prevMatchEndIndex, stdoutString.length()));
prevMatchEndIndex = 0;
currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
}
@@ -138,9 +154,10 @@ public KeyValue next() {
if (numberOfCharsRead < 0) {
if (currentKey != null) {
currentKey.setValue(stdoutString.substring(0,
- Math.max(stdoutString.length() - 1, 0)));
+ Math.max(stdoutString.length() - 1, 0)));
+ return getTransformedValue(currentKey);
}
- return currentKey;
+ throw new NoSuchElementException("No more elements found");
}
stdoutString.append(charBuffer, 0, numberOfCharsRead);
currentMatcher.reset();
@@ -150,30 +167,42 @@ public KeyValue next() {
}
if (currentKey != null) {
currentKey.setValue(stdoutString.substring(prevMatchEndIndex,
- currentMatcher.start() - 1));
+ currentMatcher.start() - 1));
}
prevMatchEndIndex = currentMatcher.end();
- nextKey = new KeyValue(
- currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
- currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
- currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
- return currentKey;
+ nextKey = new KeyValue(currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
+ currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
+ currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
+ return getTransformedValue(currentKey);
}
@Override
- public synchronized void close() throws Exception {
+ public synchronized void close() throws UncheckedIOException {
if (this.sstDumpToolTask != null) {
if (!this.sstDumpToolTask.getFuture().isDone()) {
this.sstDumpToolTask.getFuture().cancel(true);
}
- this.processOutput.close();
+ try {
+ this.processOutput.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
open.compareAndSet(true, false);
}
@Override
protected void finalize() throws Throwable {
+ if (open.get()) {
+ LOG.warn("{} is not closed properly." +
+ " StackTrace for unclosed instance: {}",
+ this.getClass().getName(),
+ Arrays.stream(stackTrace)
+ .map(StackTraceElement::toString).collect(
+ Collectors.joining("\n")));
+ }
this.close();
+ super.finalize();
}
/**
@@ -214,12 +243,8 @@ public String getValue() {
@Override
public String toString() {
- return "KeyValue{" +
- "key='" + key + '\'' +
- ", sequence=" + sequence +
- ", type=" + type +
- ", value='" + value + '\'' +
- '}';
+ return "KeyValue{" + "key='" + key + '\'' + ", sequence=" + sequence +
+ ", type=" + type + ", value='" + value + '\'' + '}';
}
}
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index f2a932b40a14..3c3764d3865e 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -73,6 +73,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
junit-jupiter-params
test
+
+ org.apache.ozone
+ hdds-rocks-native
+
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
index cf8e59331dc6..ce05cd9715d9 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/ManagedSstFileReader.java
@@ -18,13 +18,19 @@
package org.apache.ozone.rocksdb.util;
-import org.rocksdb.Options;
+import org.apache.hadoop.util.ClosableIterator;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
-import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -46,42 +52,153 @@ public class ManagedSstFileReader {
public ManagedSstFileReader(final Collection sstFiles) {
this.sstFiles = sstFiles;
}
- public Stream getKeyStream() throws RocksDBException {
- final ManagedSstFileIterator itr = new ManagedSstFileIterator(sstFiles);
- final Spliterator spliterator = Spliterators
- .spliteratorUnknownSize(itr, 0);
- return StreamSupport.stream(spliterator, false).onClose(itr::close);
+
+ public static Stream getStreamFromIterator(ClosableIterator itr) {
+ final Spliterator spliterator =
+ Spliterators.spliteratorUnknownSize(itr, 0);
+ return StreamSupport.stream(spliterator, false).onClose(() -> {
+ itr.close();
+ });
+ }
+
+ public Stream getKeyStream() throws RocksDBException,
+ NativeLibraryNotLoadedException, IOException {
+ // TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough.
+ final MultipleSstFileIterator itr =
+ new MultipleSstFileIterator(sstFiles) {
+ private ManagedOptions options;
+ private ReadOptions readOptions;
+
+ @Override
+ protected void init() {
+ this.options = new ManagedOptions();
+ this.readOptions = new ManagedReadOptions();
+ }
+
+ @Override
+ protected ClosableIterator getKeyIteratorForFile(String file)
+ throws RocksDBException {
+ return new ManagedSstFileIterator(file, options, readOptions) {
+ @Override
+ protected String getIteratorValue(
+ SstFileReaderIterator iterator) {
+ return new String(iterator.key(), UTF_8);
+ }
+ };
+ }
+
+ @Override
+ public void close() throws UncheckedIOException {
+ super.close();
+ options.close();
+ readOptions.close();
+ }
+ };
+ return getStreamFromIterator(itr);
+ }
+
+ public Stream getKeyStreamWithTombstone(
+ ManagedSSTDumpTool sstDumpTool) throws IOException, RocksDBException,
+ NativeLibraryNotLoadedException {
+ final MultipleSstFileIterator itr =
+ new MultipleSstFileIterator(sstFiles) {
+ //TODO: [SNAPSHOT] Check if default Options is enough.
+ private ManagedOptions options;
+
+ @Override
+ protected void init() {
+ this.options = new ManagedOptions();
+ }
+
+ @Override
+ protected ClosableIterator getKeyIteratorForFile(String file)
+ throws NativeLibraryNotLoadedException, IOException {
+ return new ManagedSSTDumpIterator(sstDumpTool, file,
+ options) {
+ @Override
+ protected String getTransformedValue(KeyValue value) {
+ return value.getKey();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws UncheckedIOException {
+ super.close();
+ options.close();
+ }
+ };
+ return getStreamFromIterator(itr);
+ }
+
+ private abstract static class ManagedSstFileIterator implements
+ ClosableIterator {
+ private SstFileReader fileReader;
+ private SstFileReaderIterator fileReaderIterator;
+
+ ManagedSstFileIterator(String path, ManagedOptions options,
+ ReadOptions readOptions)
+ throws RocksDBException {
+ this.fileReader = new SstFileReader(options);
+ this.fileReader.open(path);
+ this.fileReaderIterator = fileReader.newIterator(readOptions);
+ fileReaderIterator.seekToFirst();
+ }
+
+ @Override
+ public void close() {
+ this.fileReaderIterator.close();
+ this.fileReader.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return fileReaderIterator.isValid();
+ }
+
+ protected abstract String getIteratorValue(SstFileReaderIterator iterator);
+
+ @Override
+ public String next() {
+ String value = getIteratorValue(fileReaderIterator);
+ fileReaderIterator.next();
+ return value;
+ }
}
- private static final class ManagedSstFileIterator implements
- Iterator, Closeable {
+ private abstract static class MultipleSstFileIterator implements
+ ClosableIterator {
private final Iterator fileNameIterator;
- private final Options options;
- private final ReadOptions readOptions;
+
private String currentFile;
- private SstFileReader currentFileReader;
- private SstFileReaderIterator currentFileIterator;
+ private ClosableIterator currentFileIterator;
- private ManagedSstFileIterator(Collection files)
- throws RocksDBException {
- // TODO: Check if default Options and ReadOptions is enough.
- this.options = new Options();
- this.readOptions = new ReadOptions();
+ private MultipleSstFileIterator(Collection files)
+ throws IOException, RocksDBException,
+ NativeLibraryNotLoadedException {
this.fileNameIterator = files.iterator();
+ init();
moveToNextFile();
}
+ protected abstract void init();
+
+ protected abstract ClosableIterator getKeyIteratorForFile(String file)
+ throws RocksDBException, NativeLibraryNotLoadedException,
+ IOException;
+
@Override
public boolean hasNext() {
try {
do {
- if (currentFileIterator.isValid()) {
+ if (currentFileIterator.hasNext()) {
return true;
}
} while (moveToNextFile());
- } catch (RocksDBException e) {
- // TODO: This exception has to be handled by the caller.
+ } catch (IOException | RocksDBException |
+ NativeLibraryNotLoadedException e) {
+ // TODO: [Snapshot] This exception has to be handled by the caller.
// We have to do better exception handling.
throw new RuntimeException(e);
}
@@ -89,37 +206,36 @@ public boolean hasNext() {
}
@Override
- public String next() {
+ public T next() {
if (hasNext()) {
- final String value = new String(currentFileIterator.key(), UTF_8);
- currentFileIterator.next();
- return value;
+ return currentFileIterator.next();
}
- throw new NoSuchElementException("No more keys");
+ throw new NoSuchElementException("No more elements found.");
}
@Override
- public void close() {
- closeCurrentFile();
+ public void close() throws UncheckedIOException {
+ try {
+ closeCurrentFile();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- private boolean moveToNextFile() throws RocksDBException {
+ private boolean moveToNextFile() throws IOException, RocksDBException,
+ NativeLibraryNotLoadedException {
if (fileNameIterator.hasNext()) {
closeCurrentFile();
currentFile = fileNameIterator.next();
- currentFileReader = new SstFileReader(options);
- currentFileReader.open(currentFile);
- currentFileIterator = currentFileReader.newIterator(readOptions);
- currentFileIterator.seekToFirst();
+ this.currentFileIterator = getKeyIteratorForFile(currentFile);
return true;
}
return false;
}
- private void closeCurrentFile() {
+ private void closeCurrentFile() throws IOException {
if (currentFile != null) {
currentFileIterator.close();
- currentFileReader.close();
currentFile = null;
}
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 937835fdb774..34e17519e0a1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -28,6 +28,17 @@
* Ozone Manager Constants.
*/
public final class OMConfigKeys {
+ public static final String
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE =
+ "ozone.om.snapshot.sst_dumptool.pool.size";
+ public static final int
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT = 1;
+ public static final String
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE =
+ "ozone.om.snapshot.sst_dumptool.buffer.size";
+ public static final String
+ OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT = "8KB";
+
/**
* Never constructed.
*/
diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml
index 974565206d92..a2b8b284d05d 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -254,6 +254,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
org.junit.jupiter
junit-jupiter-params
+
+ org.apache.ozone
+ hdds-rocks-native
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 60e7ea7fd0d5..09a828e33dd3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -30,6 +30,14 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.NativeConstants;
+import org.apache.hadoop.hdds.utils.NativeLibraryLoader;
+import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -41,9 +49,11 @@
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.IntegerCodec;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -73,6 +83,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.SynchronousQueue;
+
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getSnapshotInfo;
import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
@@ -124,6 +136,9 @@ public class SnapshotDiffManager implements AutoCloseable {
private final PersistentMap snapDiffJobTable;
private final ExecutorService executorService;
+ private boolean isNativeRocksToolsLoaded;
+
+ private ManagedSSTDumpTool sstDumpTool;
public SnapshotDiffManager(ManagedRocksDB db,
RocksDBCheckpointDiffer differ,
@@ -170,6 +185,36 @@ public SnapshotDiffManager(ManagedRocksDB db,
// TODO: [SNAPSHOT] Load jobs only if it is leader node.
// It could a event-triggered form OM when node is leader and up.
this.loadJobsOnStartUp();
+ isNativeRocksToolsLoaded = NativeLibraryLoader.getInstance()
+ .loadLibrary(NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME);
+ if (isNativeRocksToolsLoaded) {
+ isNativeRocksToolsLoaded = initSSTDumpTool(
+ ozoneManager.getConfiguration());
+ }
+ }
+
+ private boolean initSSTDumpTool(OzoneConfiguration conf) {
+ try {
+ int threadPoolSize = conf.getInt(
+ OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE,
+ OMConfigKeys
+ .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT);
+ int bufferSize = (int) conf.getStorageSize(
+ OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE,
+ OMConfigKeys
+ .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ExecutorService execService = new ThreadPoolExecutor(0,
+ threadPoolSize, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new ThreadFactoryBuilder()
+ .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
+ .build(),
+ new ThreadPoolExecutor.DiscardPolicy());
+ sstDumpTool = new ManagedSSTDumpTool(execService, bufferSize);
+ } catch (NativeLibraryNotLoadedException e) {
+ return false;
+ }
+ return true;
}
private Map getTablePrefixes(
@@ -209,6 +254,13 @@ private DifferSnapshotInfo getDSIFromSI(SnapshotInfo snapshotInfo,
getTablePrefixes(snapshotOMMM, volumeName, bucketName));
}
+ private Set getSSTFileListForSnapshot(OmSnapshot snapshot,
+ List tablesToLookUp) throws RocksDBException {
+ return RdbUtil.getSSTFilesForComparison(snapshot
+ .getMetadataManager().getStore().getDbLocation()
+ .getPath(), tablesToLookUp);
+ }
+
@SuppressWarnings("parameternumber")
public SnapshotDiffResponse getSnapshotDiffReport(
final String volume,
@@ -408,7 +460,6 @@ private void generateSnapshotDiffReport(final String jobKey,
ColumnFamilyHandle fromSnapshotColumnFamily = null;
ColumnFamilyHandle toSnapshotColumnFamily = null;
ColumnFamilyHandle objectIDsColumnFamily = null;
-
try {
// JobId is prepended to column families name to make them unique
// for request.
@@ -428,30 +479,19 @@ private void generateSnapshotDiffReport(final String jobKey,
// Note: Store objectId and keyName as byte array to reduce unnecessary
// serialization and deserialization.
final PersistentMap objectIdToKeyNameMapForFromSnapshot =
- new RocksDbPersistentMap<>(db,
- fromSnapshotColumnFamily,
- codecRegistry,
- byte[].class,
- byte[].class);
-
+ new RocksDbPersistentMap<>(db, fromSnapshotColumnFamily,
+ codecRegistry, byte[].class, byte[].class);
// ObjectId to keyName map to keep key info for toSnapshot.
final PersistentMap objectIdToKeyNameMapForToSnapshot =
- new RocksDbPersistentMap<>(db,
- toSnapshotColumnFamily,
- codecRegistry,
- byte[].class,
- byte[].class);
-
+ new RocksDbPersistentMap<>(db, toSnapshotColumnFamily, codecRegistry,
+ byte[].class, byte[].class);
// Set of unique objectId between fromSnapshot and toSnapshot.
final PersistentSet objectIDsToCheckMap =
- new RocksDbPersistentSet<>(db,
- objectIDsColumnFamily,
- codecRegistry,
+ new RocksDbPersistentSet<>(db, objectIDsColumnFamily, codecRegistry,
byte[].class);
final BucketLayout bucketLayout = getBucketLayout(volume, bucket,
fromSnapshot.getMetadataManager());
-
final Table fsKeyTable =
fromSnapshot.getMetadataManager().getKeyTable(bucketLayout);
final Table tsKeyTable =
@@ -466,40 +506,82 @@ private void generateSnapshotDiffReport(final String jobKey,
Map tablePrefixes =
getTablePrefixes(toSnapshot.getMetadataManager(), volume, bucket);
-
- final Set deltaFilesForKeyOrFileTable =
- getDeltaFiles(fromSnapshot, toSnapshot,
- Collections.singletonList(fsKeyTable.getName()), fsInfo, tsInfo,
+ List tablesToLookUp =
+ Collections.singletonList(fsKeyTable.getName());
+ final Set deltaFilesForKeyOrFileTable = getDeltaFiles(
+ fromSnapshot, toSnapshot, tablesToLookUp, fsInfo, tsInfo,
useFullDiff, tablePrefixes);
- addToObjectIdMap(fsKeyTable,
- tsKeyTable,
- deltaFilesForKeyOrFileTable,
- objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot,
- objectIDsToCheckMap,
- tablePrefixes);
+ // Workaround to handle deletes if native rockstools for reading
+ // tombstone is not loaded.
+ // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+ if (!isNativeRocksToolsLoaded) {
+ deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ }
+ try {
+ addToObjectIdMap(fsKeyTable, tsKeyTable,
+ Pair.of(isNativeRocksToolsLoaded, deltaFilesForKeyOrFileTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException e) {
+ // Workaround to handle deletes if use of native rockstools for reading
+ // tombstone fails.
+ // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+ deltaFilesForKeyOrFileTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ try {
+ addToObjectIdMap(fsKeyTable, tsKeyTable,
+ Pair.of(false, deltaFilesForKeyOrFileTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException ex) {
+ // This code should be never executed.
+ throw new IllegalStateException(ex);
+ }
+ }
if (bucketLayout.isFileSystemOptimized()) {
final Table fsDirTable =
fromSnapshot.getMetadataManager().getDirectoryTable();
final Table tsDirTable =
toSnapshot.getMetadataManager().getDirectoryTable();
+ tablesToLookUp = Collections.singletonList(fsDirTable.getName());
final Set deltaFilesForDirTable =
- getDeltaFiles(fromSnapshot, toSnapshot,
- Collections.singletonList(fsDirTable.getName()), fsInfo, tsInfo,
- useFullDiff, tablePrefixes);
- addToObjectIdMap(fsDirTable,
- tsDirTable,
- deltaFilesForDirTable,
- objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot,
- objectIDsToCheckMap,
- tablePrefixes);
+ getDeltaFiles(fromSnapshot, toSnapshot, tablesToLookUp, fsInfo,
+ tsInfo, useFullDiff, tablePrefixes);
+ if (!isNativeRocksToolsLoaded) {
+ deltaFilesForDirTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ }
+ try {
+ addToObjectIdMap(fsDirTable, tsDirTable,
+ Pair.of(isNativeRocksToolsLoaded, deltaFilesForDirTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot,
+ objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException e) {
+ try {
+ // Workaround to handle deletes if use of native rockstools for
+ // reading tombstone fails.
+ // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
+ deltaFilesForDirTable.addAll(getSSTFileListForSnapshot(
+ fromSnapshot, tablesToLookUp));
+ addToObjectIdMap(fsDirTable, tsDirTable,
+ Pair.of(false, deltaFilesForDirTable),
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ tablePrefixes);
+ } catch (NativeLibraryNotLoadedException ex) {
+ // This code should be never executed.
+ throw new IllegalStateException(ex);
+ }
+ }
}
-
- generateDiffReport(jobId,
- objectIDsToCheckMap,
+ generateDiffReport(jobId, objectIDsToCheckMap,
objectIdToKeyNameMapForFromSnapshot,
objectIdToKeyNameMapForToSnapshot);
updateJobStatus(jobKey, IN_PROGRESS, DONE);
@@ -517,22 +599,26 @@ private void generateSnapshotDiffReport(final String jobKey,
private void addToObjectIdMap(Table fsTable,
Table tsTable,
- Set deltaFiles,
+ Pair>
+ isNativeRocksToolsLoadedDeltaFilesPair,
PersistentMap oldObjIdToKeyMap,
PersistentMap newObjIdToKeyMap,
PersistentSet objectIDsToCheck,
Map tablePrefixes)
- throws IOException {
+ throws IOException, NativeLibraryNotLoadedException {
+ Set deltaFiles = isNativeRocksToolsLoadedDeltaFilesPair.getRight();
if (deltaFiles.isEmpty()) {
return;
}
-
+ boolean nativeRocksToolsLoaded =
+ isNativeRocksToolsLoadedDeltaFilesPair.getLeft();
boolean isDirectoryTable =
fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE);
-
- try (Stream keysToCheck = new ManagedSstFileReader(deltaFiles)
- .getKeyStream()) {
+ ManagedSstFileReader sstFileReader = new ManagedSstFileReader(deltaFiles);
+ try (Stream keysToCheck = nativeRocksToolsLoaded
+ ? sstFileReader.getKeyStreamWithTombstone(sstDumpTool)
+ : sstFileReader.getKeyStream()) {
keysToCheck.forEach(key -> {
try {
final WithObjectID oldKey = fsTable.get(key);
@@ -602,19 +688,6 @@ private Set getDeltaFiles(OmSnapshot fromSnapshot,
List sstDiffList =
differ.getSSTDiffListWithFullPath(toDSI, fromDSI);
deltaFiles.addAll(sstDiffList);
-
- // TODO: [SNAPSHOT] Remove the workaround below when the SnapDiff logic
- // can read tombstones in SST files.
- // Workaround: Append "From DB" SST files to the deltaFiles list so that
- // the current SnapDiff logic correctly handles deleted keys.
- if (!deltaFiles.isEmpty()) {
- Set fromSnapshotFiles = RdbUtil.getSSTFilesForComparison(
- fromSnapshot.getMetadataManager()
- .getStore().getDbLocation().getPath(),
- tablesToLookUp);
- deltaFiles.addAll(fromSnapshotFiles);
- }
- // End of Workaround
}
if (useFullDiff || deltaFiles.isEmpty()) {