Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3656,4 +3656,22 @@
without using the optimised DAG based pruning approach
</description>
</property>

<property>
<name>ozone.om.snapshot.sst_dumptool.pool.size</name>
<value>1</value>
<tag>OZONE, OM</tag>
<description>
Threadpool size for SST Dumptool which would be used for computing snapdiff when native library is enabled.
</description>
</property>

<property>
<name>ozone.om.snapshot.sst_dumptool.buffer.size</name>
<value>8KB</value>
<tag>OZONE, OM</tag>
<description>
Buffer size for SST Dumptool Pipe which would be used for computing snapdiff when native library is enabled.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,39 @@

package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Iterator to Parse output of RocksDBSSTDumpTool.
*/
public class ManagedSSTDumpIterator implements
Iterator<ManagedSSTDumpIterator.KeyValue>, AutoCloseable {
private static final String SST_DUMP_TOOL_CLASS =
"org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool";
public abstract class ManagedSSTDumpIterator<T> implements ClosableIterator<T> {

private static final Logger LOG =
LoggerFactory.getLogger(ManagedSSTDumpIterator.class);
private static final String PATTERN_REGEX =
"'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";
"'([^=>]+)' seq:([0-9]+), type:([0-9]+) => ";

public static final int PATTERN_KEY_GROUP_NUMBER = 1;
public static final int PATTERN_SEQ_GROUP_NUMBER = 2;
public static final int PATTERN_TYPE_GROUP_NUMBER = 3;
private static final Pattern PATTERN_MATCHER =
Pattern.compile(PATTERN_REGEX);
private static final Pattern PATTERN_MATCHER = Pattern.compile(PATTERN_REGEX);
private BufferedReader processOutput;
private StringBuilder stdoutString;

Expand All @@ -56,32 +61,33 @@ public class ManagedSSTDumpIterator implements

private ManagedSSTDumpTool.SSTDumpToolTask sstDumpToolTask;
private AtomicBoolean open;
private StackTraceElement[] stackTrace;


public ManagedSSTDumpIterator(ManagedSSTDumpTool sstDumpTool,
String sstFilePath,
ManagedOptions options) throws IOException,
NativeLibraryNotLoadedException {
String sstFilePath, ManagedOptions options)
throws IOException, NativeLibraryNotLoadedException {
File sstFile = new File(sstFilePath);
if (!sstFile.exists()) {
throw new IOException(String.format("File in path : %s doesn't exist",
sstFile.getAbsolutePath()));
sstFile.getAbsolutePath()));
}
if (!sstFile.isFile()) {
throw new IOException(String.format("Path given: %s is not a file",
sstFile.getAbsolutePath()));
sstFile.getAbsolutePath()));
}
init(sstDumpTool, sstFile, options);
this.stackTrace = Thread.currentThread().getStackTrace();
}

private void init(ManagedSSTDumpTool sstDumpTool, File sstFile,
ManagedOptions options)
throws NativeLibraryNotLoadedException {
String[] args = {"--file=" + sstFile.getAbsolutePath(),
"--command=scan"};
throws NativeLibraryNotLoadedException {
String[] args = {"--file=" + sstFile.getAbsolutePath(), "--command=scan"};
this.sstDumpToolTask = sstDumpTool.run(args, options);
processOutput = new BufferedReader(new InputStreamReader(
sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8));
processOutput = new BufferedReader(
new InputStreamReader(sstDumpToolTask.getPipedOutput(),
StandardCharsets.UTF_8));
stdoutString = new StringBuilder();
currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
charBuffer = new char[8192];
Expand All @@ -97,15 +103,16 @@ private void checkSanityOfProcess() {
if (!this.open.get()) {
throw new RuntimeException("Iterator has been closed");
}
if (sstDumpToolTask.getFuture().isDone()
&& sstDumpToolTask.exitValue() != 0) {
if (sstDumpToolTask.getFuture().isDone() &&
sstDumpToolTask.exitValue() != 0) {
throw new RuntimeException("Process Terminated with non zero " +
String.format("exit value %d", sstDumpToolTask.exitValue()));
String.format("exit value %d", sstDumpToolTask.exitValue()));
}
}

/**
* Checks the status of the process & sees if there is another record.
*
* @return True if next exists & false otherwise
* Throws Runtime Exception in case of SST File read failure
*/
Expand All @@ -116,31 +123,41 @@ public boolean hasNext() {
return nextKey != null;
}

/**
* Transforms Key to a certain value.
*
* @param value
* @return transformed Value
*/
protected abstract T getTransformedValue(KeyValue value);

/**
* Returns the next record from SSTDumpTool.
*
* @return next Key
* Throws Runtime Exception incase of failure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this need to be changed to NoSuchElementException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function checkSanityOfProcess() checks if the exit code is not 0, in that case it throws a RuntimeException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the whole comment is unnecessary. @Override will get the Javadoc comment from iterator interface. There is no needed to specifying the same thing. Talking about the RTE, anything can cause that and stating that doesn't make much difference.

*/
@Override
public KeyValue next() {
public T next() {
checkSanityOfProcess();
currentKey = nextKey;
nextKey = null;
while (!currentMatcher.find()) {
try {
if (prevMatchEndIndex != 0) {
stdoutString = new StringBuilder(stdoutString.substring(
prevMatchEndIndex, stdoutString.length()));
stdoutString = new StringBuilder(
stdoutString.substring(prevMatchEndIndex, stdoutString.length()));
prevMatchEndIndex = 0;
currentMatcher = PATTERN_MATCHER.matcher(stdoutString);
}
int numberOfCharsRead = processOutput.read(charBuffer);
if (numberOfCharsRead < 0) {
if (currentKey != null) {
currentKey.setValue(stdoutString.substring(0,
Math.max(stdoutString.length() - 1, 0)));
Math.max(stdoutString.length() - 1, 0)));
return getTransformedValue(currentKey);
}
return currentKey;
throw new NoSuchElementException("No more elements found");
}
stdoutString.append(charBuffer, 0, numberOfCharsRead);
currentMatcher.reset();
Expand All @@ -150,30 +167,42 @@ public KeyValue next() {
}
if (currentKey != null) {
currentKey.setValue(stdoutString.substring(prevMatchEndIndex,
currentMatcher.start() - 1));
currentMatcher.start() - 1));
}
prevMatchEndIndex = currentMatcher.end();
nextKey = new KeyValue(
currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
return currentKey;
nextKey = new KeyValue(currentMatcher.group(PATTERN_KEY_GROUP_NUMBER),
currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER),
currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER));
return getTransformedValue(currentKey);
}

@Override
public synchronized void close() throws Exception {
public synchronized void close() throws UncheckedIOException {
if (this.sstDumpToolTask != null) {
if (!this.sstDumpToolTask.getFuture().isDone()) {
this.sstDumpToolTask.getFuture().cancel(true);
}
this.processOutput.close();
try {
this.processOutput.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
open.compareAndSet(true, false);
}

@Override
protected void finalize() throws Throwable {
if (open.get()) {
LOG.warn("{} is not closed properly." +
" StackTrace for unclosed instance: {}",
this.getClass().getName(),
Arrays.stream(stackTrace)
.map(StackTraceElement::toString).collect(
Collectors.joining("\n")));
}
this.close();
super.finalize();
}

/**
Expand Down Expand Up @@ -214,12 +243,8 @@ public String getValue() {

@Override
public String toString() {
return "KeyValue{" +
"key='" + key + '\'' +
", sequence=" + sequence +
", type=" + type +
", value='" + value + '\'' +
'}';
return "KeyValue{" + "key='" + key + '\'' + ", sequence=" + sequence +
", type=" + type + ", value='" + value + '\'' + '}';
}
}
}
4 changes: 4 additions & 0 deletions hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-rocks-native</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading