diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index 58ef24a1621f..6735b2b9785f 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -49,6 +49,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> config test-utils erasurecode + rocks-native @@ -248,6 +249,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> bcpkix-jdk15on ${bouncycastle.version} + + org.apache.ozone + hdds-rocks-native + ${hdds.version} + diff --git a/hadoop-hdds/rocks-native/pom.xml b/hadoop-hdds/rocks-native/pom.xml new file mode 100644 index 000000000000..6112326cc7cd --- /dev/null +++ b/hadoop-hdds/rocks-native/pom.xml @@ -0,0 +1,421 @@ + + + + + hdds + org.apache.ozone + 1.4.0-SNAPSHOT + + 4.0.0 + Apache Ozone HDDS RocksDB Tools + hdds-rocks-native + + + + org.apache.ozone + hdds-managed-rocksdb + + + org.eclipse.jetty + jetty-io + + + + 8 + 8 + + + + + rocks_tools_native + + + rocks_tools_native + + + + 23 + true + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + get-cpu-count + generate-sources + + cpu-count + + + system.numCores + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + + + rocksdb source download + generate-sources + + wget + + + https://github.com/facebook/rocksdb/archive/refs/tags/v${rocksdb.version}.tar.gz + rocksdb-v${rocksdb.version}.tar.gz + ${project.build.directory}/rocksdb + + + + zlib source download + generate-sources + + wget + + + https://zlib.net/zlib-${zlib.version}.tar.gz + zlib-${zlib.version}.tar.gz + ${project.build.directory}/zlib + + + + bzip2 source download + generate-sources + + wget + + + https://sourceware.org/pub/bzip2/bzip2-${bzip2.version}.tar.gz + bzip2-v${bzip2.version}.tar.gz + ${project.build.directory}/bzip2 + + + + lz4 source download + generate-sources + + wget + + + https://github.com/lz4/lz4/archive/refs/tags/v${lz4.version}.tar.gz + lz4-v${lz4.version}.tar.gz + ${project.build.directory}/lz4 + + + + snappy source download + generate-sources + + wget + + + https://github.com/google/snappy/archive/refs/tags/${snappy.version}.tar.gz + snappy-v${snappy.version}.tar.gz + ${project.build.directory}/snappy + + + + zstd source download + generate-sources + + wget + + + https://github.com/facebook/zstd/archive/refs/tags/v${zstd.version}.tar.gz + zstd-v${zstd.version}.tar.gz + ${project.build.directory}/zstd + + + + + + org.apache.maven.plugins + maven-patch-plugin + 1.1.1 + + ${basedir}/src/main/patches/rocks-native.patch + 1 + ${project.build.directory}/rocksdb/rocksdb-${rocksdb.version} + + + + patch + process-sources + + apply + + + + + + maven-antrun-plugin + + + unzip-artifact + generate-sources + + + + + + + + + + + + run + + + + build-zlib + process-sources + + + + + + + + + + + + run + + + + build-bzip2 + process-sources + + + + + + + + + run + + + + build-lz4 + process-sources + + + + + + + + + run + + + + build-zstd + process-sources + + + + + + + + + run + + + + build-snappy + process-sources + + + + + + + + + + + + + run + + + + build-rocksjava + process-resources + + + + + + + + + + + + + + + + + run + + + + build-rocks-tools + process-classes + + + + + + + + + + + + + + + + + + + + + + + + run + + + + copy-lib-file + prepare-package + + + + + + + + + run + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + **/*.class + **/lib*.dylib + **/lib*.so + **/lib*.jnilib + **/lib*.dll + + + + + + + + java-8 + + 1.8 + + rocks_tools_native + + + + + + org.codehaus.mojo + native-maven-plugin + + + compile + + javah + + + ${env.JAVA_HOME}/bin/javah + + org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool + org.apache.hadoop.hdds.utils.db.managed.PipeInputStream + + ${project.build.directory}/native/javah + + + + + + + + + java-11 + + 11 + + rocks_tools_native + + + + + + org.codehaus.mojo + exec-maven-plugin + + + javach + + exec + + compile + + ${env.JAVA_HOME}/bin/javac + + -classpath + ${project.build.outputDirectory} + -h + ${project.build.directory}/native/javah + ${project.basedir}/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java + ${project.basedir}/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java + + + + + + + + + + diff --git a/hadoop-hdds/rocks-native/src/CMakeLists.txt b/hadoop-hdds/rocks-native/src/CMakeLists.txt new file mode 100644 index 000000000000..84463fc169e1 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/CMakeLists.txt @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# CMake configuration. +# + +cmake_minimum_required(VERSION 2.8) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC") +project(ozone_native) +set(CMAKE_BUILD_TYPE Release) +find_package(JNI REQUIRED) +include_directories(${JNI_INCLUDE_DIRS}) +set(CMAKE_CXX_STANDARD ${CMAKE_STANDARDS}) + +set(linked_libraries "") +if(NOT GENERATED_JAVAH) + message(FATAL_ERROR "You must set the CMake variable GENERATED_JAVAH") +endif() +include_directories(${GENERATED_JAVAH}) +if(${SST_DUMP_INCLUDE}) + include_directories(${ROCKSDB_HEADERS}) + set(SOURCE_FILES ${NATIVE_DIR}/SSTDumpTool.cpp ${NATIVE_DIR}/PipeInputStream.cpp ${NATIVE_DIR}/Pipe.h ${NATIVE_DIR}/Pipe.cpp ${NATIVE_DIR}/cplusplus_to_java_convert.h) + ADD_LIBRARY(rocksdb STATIC IMPORTED) + set_target_properties( + rocksdb + PROPERTIES + IMPORTED_LOCATION ${ROCKSDB_LIB}/librocksdb_debug.a) + ADD_LIBRARY(rocks_tools STATIC IMPORTED) + set_target_properties( + rocks_tools + PROPERTIES + IMPORTED_LOCATION ${ROCKSDB_LIB}/librocksdb_tools_debug.a) + ADD_LIBRARY(bz2 STATIC IMPORTED) + set_target_properties( + bz2 + PROPERTIES + IMPORTED_LOCATION ${BZIP2_LIB}/libbz2.a) + ADD_LIBRARY(zlib STATIC IMPORTED) + set_target_properties( + zlib + PROPERTIES + IMPORTED_LOCATION ${ZLIB_LIB}/libz.a) + ADD_LIBRARY(lz4 STATIC IMPORTED) + set_target_properties( + lz4 + PROPERTIES + IMPORTED_LOCATION ${LZ4_LIB}/liblz4.a) + ADD_LIBRARY(snappy STATIC IMPORTED) + set_target_properties( + snappy + PROPERTIES + IMPORTED_LOCATION ${SNAPPY_LIB}/libsnappy.a) + ADD_LIBRARY(zstd STATIC IMPORTED) + set_target_properties( + zstd + PROPERTIES + IMPORTED_LOCATION ${ZSTD_LIB}/libzstd.a) + set(linked_libraries ${linked_libraries} bz2 zlib rocks_tools rocksdb lz4 snappy zstd) +endif() +add_library(ozone_rocksdb_tools SHARED ${SOURCE_FILES}) +target_link_libraries(ozone_rocksdb_tools ${linked_libraries}) diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeConstants.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeConstants.java new file mode 100644 index 000000000000..d3121144d37a --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +/** + * Native Constants. + */ +public final class NativeConstants { + + private NativeConstants() { + + } + public static final String ROCKS_TOOLS_NATIVE_LIBRARY_NAME + = "ozone_rocksdb_tools"; +} diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java new file mode 100644 index 000000000000..35427c822fd4 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class to load Native Libraries. + */ +public class NativeLibraryLoader { + + private static final Logger LOG = + LoggerFactory.getLogger(NativeLibraryLoader.class); + private static final String OS = System.getProperty("os.name").toLowerCase(); + private Map librariesLoaded; + private static volatile NativeLibraryLoader instance; + + public NativeLibraryLoader(final Map librariesLoaded) { + this.librariesLoaded = librariesLoaded; + } + + private static synchronized void initNewInstance() { + if (instance == null) { + instance = new NativeLibraryLoader(new ConcurrentHashMap<>()); + } + } + + public static NativeLibraryLoader getInstance() { + if (instance == null) { + initNewInstance(); + } + return instance; + } + + public static String getJniLibraryFileName(String libraryName) { + return appendLibOsSuffix("lib" + libraryName); + } + + public static boolean isMac() { + return OS.startsWith("mac"); + } + + public static boolean isWindows() { + return OS.startsWith("win"); + } + + public static boolean isLinux() { + return OS.startsWith("linux"); + } + + private static String getLibOsSuffix() { + if (isMac()) { + return ".dylib"; + } else if (isWindows()) { + return ".dll"; + } else if (isLinux()) { + return ".so"; + } + throw new UnsatisfiedLinkError(String.format("Unsupported OS %s", OS)); + } + + private static String appendLibOsSuffix(String libraryFileName) { + return libraryFileName + getLibOsSuffix(); + } + + public static boolean isLibraryLoaded(final String libraryName) { + return getInstance().librariesLoaded + .getOrDefault(libraryName, false); + } + + public synchronized boolean loadLibrary(final String libraryName) { + if (isLibraryLoaded(libraryName)) { + return true; + } + boolean loaded = false; + try { + loaded = false; + try { + System.loadLibrary(libraryName); + loaded = true; + } catch (Throwable e) { + + } + if (!loaded) { + Optional file = copyResourceFromJarToTemp(libraryName); + if (file.isPresent()) { + System.load(file.get().getAbsolutePath()); + loaded = true; + } + } + } catch (Throwable e) { + LOG.warn("Unable to load library: {}", libraryName, e); + } + this.librariesLoaded.put(libraryName, loaded); + return isLibraryLoaded(libraryName); + } + + private Optional copyResourceFromJarToTemp(final String libraryName) + throws IOException { + final String libraryFileName = getJniLibraryFileName(libraryName); + InputStream is = null; + try { + is = getClass().getClassLoader().getResourceAsStream(libraryFileName); + if (is == null) { + return Optional.empty(); + } + + // create a temporary file to copy the library to + final File temp = File.createTempFile(libraryName, getLibOsSuffix()); + if (!temp.exists()) { + return Optional.empty(); + } else { + temp.deleteOnExit(); + } + + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + return Optional.ofNullable(temp); + } finally { + if (is != null) { + is.close(); + } + } + } +} diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryNotLoadedException.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryNotLoadedException.java new file mode 100644 index 000000000000..b4313a607dce --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryNotLoadedException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +/** + Exception when native library not loaded. + */ +public class NativeLibraryNotLoadedException extends Exception { + public NativeLibraryNotLoadedException(String libraryName) { + super(String.format("Unable to load library %s from both " + + "java.library.path & resource file %s from jar.", libraryName, + NativeLibraryLoader.getJniLibraryFileName(libraryName))); + } +} diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java new file mode 100644 index 000000000000..35aaeb33b0a8 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpIterator.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.utils.db.managed; + +import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException; +import org.eclipse.jetty.io.RuntimeIOException; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Iterator to Parse output of RocksDBSSTDumpTool. + */ +public class ManagedSSTDumpIterator implements + Iterator, AutoCloseable { + private static final String SST_DUMP_TOOL_CLASS = + "org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool"; + private static final String PATTERN_REGEX = + "'([^=>]+)' seq:([0-9]+), type:([0-9]+) => "; + + public static final int PATTERN_KEY_GROUP_NUMBER = 1; + public static final int PATTERN_SEQ_GROUP_NUMBER = 2; + public static final int PATTERN_TYPE_GROUP_NUMBER = 3; + private static final Pattern PATTERN_MATCHER = + Pattern.compile(PATTERN_REGEX); + private BufferedReader processOutput; + private StringBuilder stdoutString; + + private Matcher currentMatcher; + private int prevMatchEndIndex; + private KeyValue currentKey; + private char[] charBuffer; + private KeyValue nextKey; + + private ManagedSSTDumpTool.SSTDumpToolTask sstDumpToolTask; + private AtomicBoolean open; + + + public ManagedSSTDumpIterator(ManagedSSTDumpTool sstDumpTool, + String sstFilePath, + ManagedOptions options) throws IOException, + NativeLibraryNotLoadedException { + File sstFile = new File(sstFilePath); + if (!sstFile.exists()) { + throw new IOException(String.format("File in path : %s doesn't exist", + sstFile.getAbsolutePath())); + } + if (!sstFile.isFile()) { + throw new IOException(String.format("Path given: %s is not a file", + sstFile.getAbsolutePath())); + } + init(sstDumpTool, sstFile, options); + } + + private void init(ManagedSSTDumpTool sstDumpTool, File sstFile, + ManagedOptions options) + throws NativeLibraryNotLoadedException { + String[] args = {"--file=" + sstFile.getAbsolutePath(), + "--command=scan"}; + this.sstDumpToolTask = sstDumpTool.run(args, options); + processOutput = new BufferedReader(new InputStreamReader( + sstDumpToolTask.getPipedOutput(), StandardCharsets.UTF_8)); + stdoutString = new StringBuilder(); + currentMatcher = PATTERN_MATCHER.matcher(stdoutString); + charBuffer = new char[8192]; + open = new AtomicBoolean(true); + next(); + } + + /** + * Throws Runtime exception in the case iterator is closed or + * the native Dumptool exited with non zero exit value. + */ + private void checkSanityOfProcess() { + if (!this.open.get()) { + throw new RuntimeException("Iterator has been closed"); + } + if (sstDumpToolTask.getFuture().isDone() + && sstDumpToolTask.exitValue() != 0) { + throw new RuntimeException("Process Terminated with non zero " + + String.format("exit value %d", sstDumpToolTask.exitValue())); + } + } + + /** + * Checks the status of the process & sees if there is another record. + * @return True if next exists & false otherwise + * Throws Runtime Exception in case of SST File read failure + */ + + @Override + public boolean hasNext() { + checkSanityOfProcess(); + return nextKey != null; + } + + /** + * Returns the next record from SSTDumpTool. + * @return next Key + * Throws Runtime Exception incase of failure. + */ + @Override + public KeyValue next() { + checkSanityOfProcess(); + currentKey = nextKey; + nextKey = null; + while (!currentMatcher.find()) { + try { + if (prevMatchEndIndex != 0) { + stdoutString = new StringBuilder(stdoutString.substring( + prevMatchEndIndex, stdoutString.length())); + prevMatchEndIndex = 0; + currentMatcher = PATTERN_MATCHER.matcher(stdoutString); + } + int numberOfCharsRead = processOutput.read(charBuffer); + if (numberOfCharsRead < 0) { + if (currentKey != null) { + currentKey.setValue(stdoutString.substring(0, + Math.max(stdoutString.length() - 1, 0))); + } + return currentKey; + } + stdoutString.append(charBuffer, 0, numberOfCharsRead); + currentMatcher.reset(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + if (currentKey != null) { + currentKey.setValue(stdoutString.substring(prevMatchEndIndex, + currentMatcher.start() - 1)); + } + prevMatchEndIndex = currentMatcher.end(); + nextKey = new KeyValue( + currentMatcher.group(PATTERN_KEY_GROUP_NUMBER), + currentMatcher.group(PATTERN_SEQ_GROUP_NUMBER), + currentMatcher.group(PATTERN_TYPE_GROUP_NUMBER)); + return currentKey; + } + + @Override + public synchronized void close() throws Exception { + if (this.sstDumpToolTask != null) { + if (!this.sstDumpToolTask.getFuture().isDone()) { + this.sstDumpToolTask.getFuture().cancel(true); + } + this.processOutput.close(); + } + open.compareAndSet(true, false); + } + + @Override + protected void finalize() throws Throwable { + this.close(); + } + + /** + * Class containing Parsed KeyValue Record from Sst Dumptool output. + */ + public static final class KeyValue { + private String key; + private Integer sequence; + private Integer type; + + private String value; + + private KeyValue(String key, String sequence, String type) { + this.key = key; + this.sequence = Integer.valueOf(sequence); + this.type = Integer.valueOf(type); + } + + private void setValue(String value) { + this.value = value; + } + + public String getKey() { + return key; + } + + public Integer getSequence() { + return sequence; + } + + public Integer getType() { + return type; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return "KeyValue{" + + "key='" + key + '\'' + + ", sequence=" + sequence + + ", type=" + type + + ", value='" + value + '\'' + + '}'; + } + } +} diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java new file mode 100644 index 000000000000..940438a45764 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSSTDumpTool.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.managed; + +import org.apache.hadoop.hdds.utils.NativeLibraryLoader; +import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import static org.apache.hadoop.hdds.utils.NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME; + +/** + * JNI for RocksDB SSTDumpTool. Pipes the output to an output stream + */ +public class ManagedSSTDumpTool { + + static { + NativeLibraryLoader.getInstance() + .loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME); + } + private int bufferCapacity; + private ExecutorService executorService; + + public ManagedSSTDumpTool(ExecutorService executorService, + int bufferCapacity) + throws NativeLibraryNotLoadedException { + if (!NativeLibraryLoader.isLibraryLoaded(ROCKS_TOOLS_NATIVE_LIBRARY_NAME)) { + throw new NativeLibraryNotLoadedException( + ROCKS_TOOLS_NATIVE_LIBRARY_NAME); + } + this.bufferCapacity = bufferCapacity; + this.executorService = executorService; + } + + public SSTDumpToolTask run(String[] args, ManagedOptions options) + throws NativeLibraryNotLoadedException { + PipeInputStream pipeInputStream = new PipeInputStream(bufferCapacity); + return new SSTDumpToolTask(this.executorService.submit(() -> + this.runInternal(args, options.getNativeHandle(), + pipeInputStream.getNativeHandle())), pipeInputStream); + } + + public SSTDumpToolTask run(Map args, ManagedOptions options) + throws NativeLibraryNotLoadedException { + return this.run(args.entrySet().stream().map(e -> "--" + + (e.getValue() == null || e.getValue().isEmpty() ? e.getKey() : + e.getKey() + "=" + e.getValue())).toArray(String[]::new), options); + } + + private native int runInternal(String[] args, long optionsHandle, + long pipeHandle); + + /** + * Class holding piped output of SST Dumptool & future of command. + */ + static class SSTDumpToolTask { + private Future future; + private PipeInputStream pipedOutput; + + SSTDumpToolTask(Future future, PipeInputStream pipedOutput) { + this.future = future; + this.pipedOutput = pipedOutput; + } + + public Future getFuture() { + return future; + } + + public PipeInputStream getPipedOutput() { + return pipedOutput; + } + + public int exitValue() { + if (this.future.isDone()) { + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + return 1; + } + } + return 0; + } + } +} diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java new file mode 100644 index 000000000000..741761ae49db --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/PipeInputStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.managed; + +import org.apache.hadoop.hdds.utils.NativeLibraryLoader; +import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.hdds.utils.NativeConstants.ROCKS_TOOLS_NATIVE_LIBRARY_NAME; + +/** + * JNI for reading data from pipe. + */ +public class PipeInputStream extends InputStream { + + static { + NativeLibraryLoader.getInstance() + .loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME); + } + private byte[] byteBuffer; + private long nativeHandle; + private int numberOfBytesLeftToRead; + private int index = 0; + private int capacity; + + private AtomicBoolean cleanup; + + PipeInputStream(int capacity) throws NativeLibraryNotLoadedException { + if (!NativeLibraryLoader.isLibraryLoaded(ROCKS_TOOLS_NATIVE_LIBRARY_NAME)) { + throw new NativeLibraryNotLoadedException( + ROCKS_TOOLS_NATIVE_LIBRARY_NAME); + } + + this.byteBuffer = new byte[capacity]; + this.numberOfBytesLeftToRead = 0; + this.capacity = capacity; + this.nativeHandle = newPipe(); + this.cleanup = new AtomicBoolean(false); + } + + long getNativeHandle() { + return nativeHandle; + } + + @Override + public int read() { + if (numberOfBytesLeftToRead < 0) { + this.close(); + return -1; + } + if (numberOfBytesLeftToRead == 0) { + numberOfBytesLeftToRead = readInternal(byteBuffer, capacity, + nativeHandle); + index = 0; + return read(); + } + numberOfBytesLeftToRead--; + int ret = byteBuffer[index] & 0xFF; + index += 1; + return ret; + } + + private native long newPipe(); + + private native int readInternal(byte[] buff, int numberOfBytes, + long pipeHandle); + + private native void closeInternal(long pipeHandle); + + @Override + public void close() { + if (this.cleanup.compareAndSet(false, true)) { + closeInternal(this.nativeHandle); + } + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } +} diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/package-info.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/package-info.java new file mode 100644 index 000000000000..2388b6ab083c --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/db/managed/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Annotation processors used at compile time by the Ozone project to validate + * internal annotations and related code as needed, if needed. + */ + +package org.apache.hadoop.hdds.utils.db.managed; diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/package-info.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/package-info.java new file mode 100644 index 000000000000..4b605616617d --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Annotation processors used at compile time by the Ozone project to validate + * internal annotations and related code as needed, if needed. + */ + +package org.apache.hadoop.hdds.utils; diff --git a/hadoop-hdds/rocks-native/src/main/native/Pipe.cpp b/hadoop-hdds/rocks-native/src/main/native/Pipe.cpp new file mode 100644 index 000000000000..f1dd54438700 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/native/Pipe.cpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Pipe.h" +#include + +const int Pipe::READ_FILE_DESCRIPTOR_IDX = 0; +const int Pipe::WRITE_FILE_DESCRIPTOR_IDX = 1; + +Pipe::Pipe() { + pipe(p); + open = true; +} + +Pipe::~Pipe() { + ::close(p[Pipe::READ_FILE_DESCRIPTOR_IDX]); + ::close(p[Pipe::WRITE_FILE_DESCRIPTOR_IDX]); +} + +void Pipe::close() { + open = false; +} diff --git a/hadoop-hdds/rocks-native/src/main/native/Pipe.h b/hadoop-hdds/rocks-native/src/main/native/Pipe.h new file mode 100644 index 000000000000..aa75c6311cbc --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/native/Pipe.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ROCKS_NATIVE_PIPE_H +#define ROCKS_NATIVE_PIPE_H + +#include + +class Pipe { + public: + static const int READ_FILE_DESCRIPTOR_IDX; + static const int WRITE_FILE_DESCRIPTOR_IDX; + Pipe(); + ~Pipe(); + void close(); + int getReadFd() { + return getPipeFileDescriptorIndex(READ_FILE_DESCRIPTOR_IDX); + } + + int getWriteFd() { + return getPipeFileDescriptorIndex(WRITE_FILE_DESCRIPTOR_IDX); + } + + int getPipeFileDescriptorIndex(int idx) { + return p[idx]; + } + + bool isOpen() { + return open; + } + + + private: + int p[2]; + FILE* wr; + bool open; + +}; + +#endif //ROCKS_NATIVE_PIPE_H diff --git a/hadoop-hdds/rocks-native/src/main/native/PipeInputStream.cpp b/hadoop-hdds/rocks-native/src/main/native/PipeInputStream.cpp new file mode 100644 index 000000000000..53f60cdd65af --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/native/PipeInputStream.cpp @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "Pipe.h" +#include "cplusplus_to_java_convert.h" +#include "org_apache_hadoop_hdds_utils_db_managed_PipeInputStream.h" + + +jlong Java_org_apache_hadoop_hdds_utils_db_managed_PipeInputStream_newPipe(JNIEnv *, jobject) { + Pipe *pipe = new Pipe(); + return GET_CPLUSPLUS_POINTER(pipe); +} + +jint Java_org_apache_hadoop_hdds_utils_db_managed_PipeInputStream_readInternal(JNIEnv *env, jobject object, jbyteArray jbyteArray, jint capacity, jlong nativeHandle) { + int cap_int = capacity; + Pipe *pipe = reinterpret_cast(nativeHandle); + jbyte *b = (env)->GetByteArrayElements(jbyteArray, JNI_FALSE); + cap_int = read(pipe->getReadFd(), b, cap_int); + if (cap_int == 0) { + if (!pipe->isOpen()) { + cap_int = -1; + } + } + env->ReleaseByteArrayElements(jbyteArray, b, 0); + return cap_int; +} + +void Java_org_apache_hadoop_hdds_utils_db_managed_PipeInputStream_closeInternal(JNIEnv *env, jobject object, jlong nativeHandle) { + delete reinterpret_cast(nativeHandle); +} + diff --git a/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp b/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp new file mode 100644 index 000000000000..b200f49c0b06 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/native/SSTDumpTool.cpp @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "org_apache_hadoop_hdds_utils_db_managed_ManagedSSTDumpTool.h" +#include "rocksdb/options.h" +#include "rocksdb/sst_dump_tool.h" +#include +#include "cplusplus_to_java_convert.h" +#include "Pipe.h" +#include + +jint Java_org_apache_hadoop_hdds_utils_db_managed_ManagedSSTDumpTool_runInternal(JNIEnv *env, jobject obj, + jobjectArray argsArray, jlong optionsHandle, jlong pipeHandle) { + ROCKSDB_NAMESPACE::SSTDumpTool dumpTool; + ROCKSDB_NAMESPACE::Options *options = reinterpret_cast(optionsHandle); + Pipe *pipe = reinterpret_cast(pipeHandle); + int length = env->GetArrayLength(argsArray); + char *args[length + 1]; + for (int i = 0; i < length; i++) { + jstring str_val = (jstring)env->GetObjectArrayElement(argsArray, (jsize)i); + char *utf_str = (char *)env->GetStringUTFChars(str_val, JNI_FALSE); + args[i + 1] = utf_str; + } + FILE *wr = fdopen(pipe->getWriteFd(), "w"); + int ret = dumpTool.Run(length + 1, args, *options, wr); + for (int i = 1; i < length + 1; i++) { + jstring str_val = (jstring)env->GetObjectArrayElement(argsArray, (jsize)(i - 1)); + env->ReleaseStringUTFChars(str_val, args[i]); + } + fclose(wr); + pipe->close(); + return ret; +} diff --git a/hadoop-hdds/rocks-native/src/main/native/cplusplus_to_java_convert.h b/hadoop-hdds/rocks-native/src/main/native/cplusplus_to_java_convert.h new file mode 100644 index 000000000000..efe9d4a5be24 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/native/cplusplus_to_java_convert.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +/* + * This macro is used for 32 bit OS. In 32 bit OS, the result number is a + negative number if we use reinterpret_cast(pointer). + * For example, jlong ptr = reinterpret_cast(pointer), ptr is a negative + number in 32 bit OS. + * If we check ptr using ptr > 0, it fails. For example, the following code is + not correct. + * if (jblock_cache_handle > 0) { + std::shared_ptr *pCache = + reinterpret_cast *>( + jblock_cache_handle); + options.block_cache = *pCache; + } + * But the result number is positive number if we do + reinterpret_cast(pointer) first and then cast it to jlong. size_t is 4 + bytes long in 32 bit OS and 8 bytes long in 64 bit OS. + static_cast(reinterpret_cast(_pointer)) is also working in 64 + bit OS. + * + * We don't need an opposite cast because it works from jlong to c++ pointer in + both 32 bit and 64 bit OS. + * For example, the following code is working in both 32 bit and 64 bit OS. + jblock_cache_handle is jlong. + * std::shared_ptr *pCache = + reinterpret_cast *>( + jblock_cache_handle); +*/ + +#define GET_CPLUSPLUS_POINTER(_pointer) \ + static_cast(reinterpret_cast(_pointer)) diff --git a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch new file mode 100644 index 000000000000..c58a9a12f124 --- /dev/null +++ b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +diff --git a/include/rocksdb/sst_dump_tool.h b/include/rocksdb/sst_dump_tool.h +index 9261ba47d..09ed123e5 100644 +--- a/include/rocksdb/sst_dump_tool.h ++++ b/include/rocksdb/sst_dump_tool.h +@@ -11,7 +11,8 @@ namespace ROCKSDB_NAMESPACE { + + class SSTDumpTool { + public: +- int Run(int argc, char const* const* argv, Options options = Options()); ++ int Run(int argc, char const* const* argv,Options options = Options(), ++ FILE* out = stdout, FILE* err = stderr); + }; + + } // namespace ROCKSDB_NAMESPACE +diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc +index eefbaaeee..2c8106298 100644 +--- a/table/sst_file_dumper.cc ++++ b/table/sst_file_dumper.cc +@@ -45,7 +45,7 @@ SstFileDumper::SstFileDumper(const Options& options, + Temperature file_temp, size_t readahead_size, + bool verify_checksum, bool output_hex, + bool decode_blob_index, const EnvOptions& soptions, +- bool silent) ++ bool silent, FILE* out, FILE* err) + : file_name_(file_path), + read_num_(0), + file_temp_(file_temp), +@@ -57,10 +57,13 @@ SstFileDumper::SstFileDumper(const Options& options, + ioptions_(options_), + moptions_(ColumnFamilyOptions(options_)), + read_options_(verify_checksum, false), +- internal_comparator_(BytewiseComparator()) { ++ internal_comparator_(BytewiseComparator()), ++ out(out), ++ err(err) ++ { + read_options_.readahead_size = readahead_size; + if (!silent_) { +- fprintf(stdout, "Process %s\n", file_path.c_str()); ++ fprintf(out, "Process %s\n", file_path.c_str()); + } + init_result_ = GetTableReader(file_name_); + } +@@ -253,17 +256,17 @@ Status SstFileDumper::ShowAllCompressionSizes( + int32_t compress_level_from, int32_t compress_level_to, + uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes, + uint64_t max_dict_buffer_bytes, bool use_zstd_dict_trainer) { +- fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); ++ fprintf(out, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); + for (auto& i : compression_types) { + if (CompressionTypeSupported(i.first)) { +- fprintf(stdout, "Compression: %-24s\n", i.second); ++ fprintf(out, "Compression: %-24s\n", i.second); + CompressionOptions compress_opt; + compress_opt.max_dict_bytes = max_dict_bytes; + compress_opt.zstd_max_train_bytes = zstd_max_train_bytes; + compress_opt.max_dict_buffer_bytes = max_dict_buffer_bytes; + compress_opt.use_zstd_dict_trainer = use_zstd_dict_trainer; + for (int32_t j = compress_level_from; j <= compress_level_to; j++) { +- fprintf(stdout, "Compression level: %d", j); ++ fprintf(out, "Compression level: %d", j); + compress_opt.level = j; + Status s = ShowCompressionSize(block_size, i.first, compress_opt); + if (!s.ok()) { +@@ -271,7 +274,7 @@ Status SstFileDumper::ShowAllCompressionSizes( + } + } + } else { +- fprintf(stdout, "Unsupported compression type: %s.\n", i.second); ++ fprintf(out, "Unsupported compression type: %s.\n", i.second); + } + } + return Status::OK(); +@@ -307,9 +310,9 @@ Status SstFileDumper::ShowCompressionSize( + } + + std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); +- fprintf(stdout, " Size: %10" PRIu64, file_size); +- fprintf(stdout, " Blocks: %6" PRIu64, num_data_blocks); +- fprintf(stdout, " Time Taken: %10s microsecs", ++ fprintf(out, " Size: %10" PRIu64, file_size); ++ fprintf(out, " Blocks: %6" PRIu64, num_data_blocks); ++ fprintf(out, " Time Taken: %10s microsecs", + std::to_string( + std::chrono::duration_cast(end - start) + .count()) +@@ -342,11 +345,11 @@ Status SstFileDumper::ShowCompressionSize( + : ((static_cast(not_compressed_blocks) / + static_cast(num_data_blocks)) * + 100.0); +- fprintf(stdout, " Compressed: %6" PRIu64 " (%5.1f%%)", compressed_blocks, ++ fprintf(out, " Compressed: %6" PRIu64 " (%5.1f%%)", compressed_blocks, + compressed_pcnt); +- fprintf(stdout, " Not compressed (ratio): %6" PRIu64 " (%5.1f%%)", ++ fprintf(out, " Not compressed (ratio): %6" PRIu64 " (%5.1f%%)", + ratio_not_compressed_blocks, ratio_not_compressed_pcnt); +- fprintf(stdout, " Not compressed (abort): %6" PRIu64 " (%5.1f%%)\n", ++ fprintf(out, " Not compressed (abort): %6" PRIu64 " (%5.1f%%)\n", + not_compressed_blocks, not_compressed_pcnt); + return Status::OK(); + } +@@ -362,7 +365,7 @@ Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number, + /* memory_allocator= */ nullptr, prefetch_buffer); + if (!s.ok()) { + if (!silent_) { +- fprintf(stdout, "Not able to read table properties\n"); ++ fprintf(out, "Not able to read table properties\n"); + } + } + return s; +@@ -382,7 +385,7 @@ Status SstFileDumper::SetTableOptionsByMagicNumber( + + options_.table_factory.reset(bbtf); + if (!silent_) { +- fprintf(stdout, "Sst file format: block-based\n"); ++ fprintf(out, "Sst file format: block-based\n"); + } + + auto& props = table_properties_->user_collected_properties; +@@ -410,7 +413,7 @@ Status SstFileDumper::SetTableOptionsByMagicNumber( + + options_.table_factory.reset(NewPlainTableFactory(plain_table_options)); + if (!silent_) { +- fprintf(stdout, "Sst file format: plain table\n"); ++ fprintf(out, "Sst file format: plain table\n"); + } + } else { + char error_msg_buffer[80]; +@@ -427,7 +430,7 @@ Status SstFileDumper::SetOldTableOptions() { + assert(table_properties_ == nullptr); + options_.table_factory = std::make_shared(); + if (!silent_) { +- fprintf(stdout, "Sst file format: block-based(old version)\n"); ++ fprintf(out, "Sst file format: block-based(old version)\n"); + } + + return Status::OK(); +@@ -478,7 +481,7 @@ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num, + + if (print_kv) { + if (!decode_blob_index_ || ikey.type != kTypeBlobIndex) { +- fprintf(stdout, "%s => %s\n", ++ fprintf(out, "%s => %s\n", + ikey.DebugString(true, output_hex_).c_str(), + value.ToString(output_hex_).c_str()); + } else { +@@ -486,12 +489,12 @@ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num, + + const Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { +- fprintf(stderr, "%s => error decoding blob index\n", ++ fprintf(err, "%s => error decoding blob index\n", + ikey.DebugString(true, output_hex_).c_str()); + continue; + } + +- fprintf(stdout, "%s => %s\n", ++ fprintf(out, "%s => %s\n", + ikey.DebugString(true, output_hex_).c_str(), + blob_index.DebugString(output_hex_).c_str()); + } +diff --git a/table/sst_file_dumper.h b/table/sst_file_dumper.h +index 7be876390..20e35ac2a 100644 +--- a/table/sst_file_dumper.h ++++ b/table/sst_file_dumper.h +@@ -22,7 +22,9 @@ class SstFileDumper { + bool verify_checksum, bool output_hex, + bool decode_blob_index, + const EnvOptions& soptions = EnvOptions(), +- bool silent = false); ++ bool silent = false, ++ FILE* out = stdout, ++ FILE* err = stderr); + + Status ReadSequential(bool print_kv, uint64_t read_num, bool has_from, + const std::string& from_key, bool has_to, +@@ -94,6 +96,8 @@ class SstFileDumper { + ReadOptions read_options_; + InternalKeyComparator internal_comparator_; + std::unique_ptr table_properties_; ++ FILE* out; ++ FILE* err; + }; + + } // namespace ROCKSDB_NAMESPACE +diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc +index 7053366e7..b063f8e24 100644 +--- a/tools/sst_dump_tool.cc ++++ b/tools/sst_dump_tool.cc +@@ -31,7 +31,7 @@ static const std::vector> + + namespace { + +-void print_help(bool to_stderr) { ++void print_help(bool to_stderr, FILE* out, FILE* err) { + std::string supported_compressions; + for (CompressionType ct : GetSupportedCompressions()) { + if (!supported_compressions.empty()) { +@@ -43,7 +43,7 @@ void print_help(bool to_stderr) { + supported_compressions += str; + } + fprintf( +- to_stderr ? stderr : stdout, ++ to_stderr ? err : out, + R"(sst_dump --file= [--command=check|scan|raw|recompress|identify] + --file= + Path to SST file or directory containing SST files +@@ -149,7 +149,8 @@ bool ParseIntArg(const char* arg, const std::string arg_name, + } + } // namespace + +-int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { ++int SSTDumpTool::Run(int argc, char const* const* argv, Options options, ++ FILE* out, FILE* err) { + std::string env_uri, fs_uri; + const char* dir_or_file = nullptr; + uint64_t read_num = std::numeric_limits::max(); +@@ -248,7 +249,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + return curr.second == compression_type; + }); + if (iter == kCompressions.end()) { +- fprintf(stderr, "%s is not a valid CompressionType\n", ++ fprintf(err, "%s is not a valid CompressionType\n", + compression_type.c_str()); + exit(1); + } +@@ -273,7 +274,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + std::cerr << pik_status.getState() << "\n"; + retc = -1; + } +- fprintf(stdout, "key=%s\n", ikey.DebugString(true, true).c_str()); ++ fprintf(out, "key=%s\n", ikey.DebugString(true, true).c_str()); + return retc; + } else if (ParseIntArg(argv[i], "--compression_level_from=", + "compression_level_from must be numeric", +@@ -288,9 +289,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + "compression_max_dict_bytes must be numeric", + &tmp_val)) { + if (tmp_val < 0 || tmp_val > std::numeric_limits::max()) { +- fprintf(stderr, "compression_max_dict_bytes must be a uint32_t: '%s'\n", ++ fprintf(err, "compression_max_dict_bytes must be a uint32_t: '%s'\n", + argv[i]); +- print_help(/*to_stderr*/ true); ++ print_help(/*to_stderr*/ true, out, err); + return 1; + } + compression_max_dict_bytes = static_cast(tmp_val); +@@ -298,10 +299,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + "compression_zstd_max_train_bytes must be numeric", + &tmp_val)) { + if (tmp_val < 0 || tmp_val > std::numeric_limits::max()) { +- fprintf(stderr, ++ fprintf(err, + "compression_zstd_max_train_bytes must be a uint32_t: '%s'\n", + argv[i]); +- print_help(/*to_stderr*/ true); ++ print_help(/*to_stderr*/ true, out, err); + return 1; + } + compression_zstd_max_train_bytes = static_cast(tmp_val); +@@ -309,41 +310,41 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + "compression_max_dict_buffer_bytes must be numeric", + &tmp_val)) { + if (tmp_val < 0) { +- fprintf(stderr, ++ fprintf(err, + "compression_max_dict_buffer_bytes must be positive: '%s'\n", + argv[i]); +- print_help(/*to_stderr*/ true); ++ print_help(/*to_stderr*/ true, out, err); + return 1; + } + compression_max_dict_buffer_bytes = static_cast(tmp_val); + } else if (strcmp(argv[i], "--compression_use_zstd_finalize_dict") == 0) { + compression_use_zstd_finalize_dict = true; + } else if (strcmp(argv[i], "--help") == 0) { +- print_help(/*to_stderr*/ false); ++ print_help(/*to_stderr*/ false, out, err); + return 0; + } else if (strcmp(argv[i], "--version") == 0) { + printf("%s\n", GetRocksBuildInfoAsString("sst_dump").c_str()); + return 0; + } else { +- fprintf(stderr, "Unrecognized argument '%s'\n\n", argv[i]); +- print_help(/*to_stderr*/ true); ++ fprintf(err, "Unrecognized argument '%s'\n\n", argv[i]); ++ print_help(/*to_stderr*/ true, out, err); + return 1; + } + } + + if(has_compression_level_from && has_compression_level_to) { + if(!has_specified_compression_types || compression_types.size() != 1) { +- fprintf(stderr, "Specify one compression type.\n\n"); ++ fprintf(err, "Specify one compression type.\n\n"); + exit(1); + } + } else if(has_compression_level_from || has_compression_level_to) { +- fprintf(stderr, "Specify both --compression_level_from and " ++ fprintf(err, "Specify both --compression_level_from and " + "--compression_level_to.\n\n"); + exit(1); + } + + if (use_from_as_prefix && has_from) { +- fprintf(stderr, "Cannot specify --prefix and --from\n\n"); ++ fprintf(err, "Cannot specify --prefix and --from\n\n"); + exit(1); + } + +@@ -357,8 +358,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + } + + if (dir_or_file == nullptr) { +- fprintf(stderr, "file or directory must be specified.\n\n"); +- print_help(/*to_stderr*/ true); ++ fprintf(err, "file or directory must be specified.\n\n"); ++ print_help(/*to_stderr*/ true, out, err); + exit(1); + } + +@@ -373,10 +374,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + Status s = Env::CreateFromUri(config_options, env_uri, fs_uri, &options.env, + &env_guard); + if (!s.ok()) { +- fprintf(stderr, "CreateEnvFromUri: %s\n", s.ToString().c_str()); ++ fprintf(err, "CreateEnvFromUri: %s\n", s.ToString().c_str()); + exit(1); + } else { +- fprintf(stdout, "options.env is %p\n", options.env); ++ fprintf(out, "options.env is %p\n", options.env); + } + } + +@@ -390,7 +391,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + Status s = env->FileExists(dir_or_file); + // dir_or_file does not exist + if (!s.ok()) { +- fprintf(stderr, "%s%s: No such file or directory\n", s.ToString().c_str(), ++ fprintf(err, "%s%s: No such file or directory\n", s.ToString().c_str(), + dir_or_file); + return 1; + } +@@ -418,13 +419,13 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + if (dir) { + filename = std::string(dir_or_file) + "/" + filename; + } +- + ROCKSDB_NAMESPACE::SstFileDumper dumper( + options, filename, Temperature::kUnknown, readahead_size, +- verify_checksum, output_hex, decode_blob_index); ++ verify_checksum, output_hex, decode_blob_index, EnvOptions(), ++ false,out, err); + // Not a valid SST + if (!dumper.getStatus().ok()) { +- fprintf(stderr, "%s: %s\n", filename.c_str(), ++ fprintf(err, "%s: %s\n", filename.c_str(), + dumper.getStatus().ToString().c_str()); + continue; + } else { +@@ -434,7 +435,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + if (valid_sst_files.size() == 1) { + // from_key and to_key are only used for "check", "scan", or "" + if (command == "check" || command == "scan" || command == "") { +- fprintf(stdout, "from [%s] to [%s]\n", ++ fprintf(out, "from [%s] to [%s]\n", + ROCKSDB_NAMESPACE::Slice(from_key).ToString(true).c_str(), + ROCKSDB_NAMESPACE::Slice(to_key).ToString(true).c_str()); + } +@@ -449,7 +450,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes, + !compression_use_zstd_finalize_dict); + if (!st.ok()) { +- fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str()); ++ fprintf(err, "Failed to recompress: %s\n", st.ToString().c_str()); + exit(1); + } + return 0; +@@ -461,10 +462,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + + st = dumper.DumpTable(out_filename); + if (!st.ok()) { +- fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str()); ++ fprintf(err, "%s: %s\n", filename.c_str(), st.ToString().c_str()); + exit(1); + } else { +- fprintf(stdout, "raw dump written to file %s\n", &out_filename[0]); ++ fprintf(out, "raw dump written to file %s\n", &out_filename[0]); + } + continue; + } +@@ -476,7 +477,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + has_from || use_from_as_prefix, from_key, has_to, to_key, + use_from_as_prefix); + if (!st.ok()) { +- fprintf(stderr, "%s: %s\n", filename.c_str(), ++ fprintf(err, "%s: %s\n", filename.c_str(), + st.ToString().c_str()); + } + total_read += dumper.GetReadNumber(); +@@ -488,10 +489,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + if (command == "verify") { + st = dumper.VerifyChecksum(); + if (!st.ok()) { +- fprintf(stderr, "%s is corrupted: %s\n", filename.c_str(), ++ fprintf(err, "%s is corrupted: %s\n", filename.c_str(), + st.ToString().c_str()); + } else { +- fprintf(stdout, "The file is ok\n"); ++ fprintf(out, "The file is ok\n"); + } + continue; + } +@@ -503,15 +504,15 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + table_properties_from_reader; + st = dumper.ReadTableProperties(&table_properties_from_reader); + if (!st.ok()) { +- fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str()); +- fprintf(stderr, "Try to use initial table properties\n"); ++ fprintf(err, "%s: %s\n", filename.c_str(), st.ToString().c_str()); ++ fprintf(err, "Try to use initial table properties\n"); + table_properties = dumper.GetInitTableProperties(); + } else { + table_properties = table_properties_from_reader.get(); + } + if (table_properties != nullptr) { + if (show_properties) { +- fprintf(stdout, ++ fprintf(out, + "Table Properties:\n" + "------------------------------\n" + " %s", +@@ -523,30 +524,30 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + total_index_block_size += table_properties->index_size; + total_filter_block_size += table_properties->filter_size; + if (show_properties) { +- fprintf(stdout, ++ fprintf(out, + "Raw user collected properties\n" + "------------------------------\n"); + for (const auto& kv : table_properties->user_collected_properties) { + std::string prop_name = kv.first; + std::string prop_val = Slice(kv.second).ToString(true); +- fprintf(stdout, " # %s: 0x%s\n", prop_name.c_str(), ++ fprintf(out, " # %s: 0x%s\n", prop_name.c_str(), + prop_val.c_str()); + } + } + } else { +- fprintf(stderr, "Reader unexpectedly returned null properties\n"); ++ fprintf(err, "Reader unexpectedly returned null properties\n"); + } + } + } + if (show_summary) { +- fprintf(stdout, "total number of files: %" PRIu64 "\n", total_num_files); +- fprintf(stdout, "total number of data blocks: %" PRIu64 "\n", ++ fprintf(out, "total number of files: %" PRIu64 "\n", total_num_files); ++ fprintf(out, "total number of data blocks: %" PRIu64 "\n", + total_num_data_blocks); +- fprintf(stdout, "total data block size: %" PRIu64 "\n", ++ fprintf(out, "total data block size: %" PRIu64 "\n", + total_data_block_size); +- fprintf(stdout, "total index block size: %" PRIu64 "\n", ++ fprintf(out, "total index block size: %" PRIu64 "\n", + total_index_block_size); +- fprintf(stdout, "total filter block size: %" PRIu64 "\n", ++ fprintf(out, "total filter block size: %" PRIu64 "\n", + total_filter_block_size); + } + +@@ -554,24 +555,24 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { + // No valid SST files are found + // Exit with an error state + if (dir) { +- fprintf(stdout, "------------------------------\n"); +- fprintf(stderr, "No valid SST files found in %s\n", dir_or_file); ++ fprintf(out, "------------------------------\n"); ++ fprintf(err, "No valid SST files found in %s\n", dir_or_file); + } else { +- fprintf(stderr, "%s is not a valid SST file\n", dir_or_file); ++ fprintf(err, "%s is not a valid SST file\n", dir_or_file); + } + return 1; + } else { + if (command == "identify") { + if (dir) { +- fprintf(stdout, "------------------------------\n"); +- fprintf(stdout, "List of valid SST files found in %s:\n", dir_or_file); ++ fprintf(out, "------------------------------\n"); ++ fprintf(out, "List of valid SST files found in %s:\n", dir_or_file); + for (const auto& f : valid_sst_files) { +- fprintf(stdout, "%s\n", f.c_str()); ++ fprintf(out, "%s\n", f.c_str()); + } +- fprintf(stdout, "Number of valid SST files: %zu\n", ++ fprintf(out, "Number of valid SST files: %zu\n", + valid_sst_files.size()); + } else { +- fprintf(stdout, "%s is a valid SST file\n", dir_or_file); ++ fprintf(out, "%s is a valid SST file\n", dir_or_file); + } + } + // At least one valid SST diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml index 40a55033f918..053d2c9fcd26 100644 --- a/hadoop-ozone/dist/pom.xml +++ b/hadoop-ozone/dist/pom.xml @@ -75,7 +75,7 @@ *.classpath hdds-server-scm,ozone-common,ozone-csi,ozone-datanode,ozone-httpfsgateway, - ozone-insight,ozone-manager,ozone-recon,ozone-s3gateway,ozone-tools + ozone-insight,ozone-manager,ozone-recon,ozone-s3gateway,ozone-tools,hdds-rocks-native @@ -227,6 +227,10 @@ org.apache.ozone ozone-httpfsgateway + + org.apache.ozone + hdds-rocks-native + diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt index bc0277fa3ca7..6bf182460195 100644 --- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt +++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt @@ -514,7 +514,7 @@ WTFPL -------------------------------------------------------------------------------- -hdds-server-scm, ozone-manager, ozone-s3gateway and hdds-server-framework +hdds-server-scm, ozone-manager, ozone-s3gateway, hdds-rocks-native and hdds-server-framework contains the source of the following javascript/css components (See licenses/ for text of these licenses): Apache Software Foundation License 2.0 diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt b/hadoop-ozone/dist/src/main/license/jar-report.txt index f83427959ec5..e6da280004cb 100644 --- a/hadoop-ozone/dist/src/main/license/jar-report.txt +++ b/hadoop-ozone/dist/src/main/license/jar-report.txt @@ -76,6 +76,7 @@ share/ozone/lib/hdds-hadoop-dependency-server.jar share/ozone/lib/hdds-interface-admin.jar share/ozone/lib/hdds-interface-client.jar share/ozone/lib/hdds-interface-server.jar +share/ozone/lib/hdds-rocks-native.jar share/ozone/lib/hdds-managed-rocksdb.jar share/ozone/lib/hdds-server-framework.jar share/ozone/lib/hdds-server-scm.jar diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 3097140a4e3a..16ec897f6bd5 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -268,6 +268,11 @@ bcprov-jdk15on ${bouncycastle.version} + + org.apache.ozone + hdds-rocks-native + ${hdds.version} + diff --git a/pom.xml b/pom.xml index 42c32cdf73e8..3f65500c1751 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs 2.6.0 1.4 1.6 + 1.6.8 ${project.build.directory}/test-dir ${test.build.dir} @@ -294,6 +295,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs 1.14.0 2.0.0 4.2.0 + 1.0.8 + 1.2.13 + 1.9.3 + 1.1.8 + 1.4.9 @@ -1811,6 +1817,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs docker-maven-plugin ${docker-maven-plugin.version} + + com.googlecode.maven-download-plugin + download-maven-plugin + ${download-maven-plugin.version} +