diff --git a/include/rocksdb/sst_file_reader.h b/include/rocksdb/sst_file_reader.h index 842425f98fdd..105b63a3c54a 100644 --- a/include/rocksdb/sst_file_reader.h +++ b/include/rocksdb/sst_file_reader.h @@ -35,6 +35,15 @@ class SstFileReader { const std::vector& keys, std::vector* values); + // Returns a new iterator over the table contents as a raw table iterator, + // a.k.a a `TableIterator`that iterates all point data entries in the table + // including logically invisible entries like delete entries lying in a range inclusive of from_key and + // exclusive of to_key. + // This API is intended to provide a programmatic way to observe SST files + // created by a DB, to be used by third party tools. DB optimization + // capabilities like filling cache, read ahead are disabled. + std::unique_ptr NewTableIterator(const Slice* from_key, const Slice* to_key); + // Returns a new iterator over the table contents as a raw table iterator, // a.k.a a `TableIterator`that iterates all point data entries in the table // including logically invisible entries like delete entries. diff --git a/include/rocksdb/types.h b/include/rocksdb/types.h index 982f497fdf55..59bb51d10a17 100644 --- a/include/rocksdb/types.h +++ b/include/rocksdb/types.h @@ -81,6 +81,7 @@ struct ParsedEntryInfo { Slice timestamp; SequenceNumber sequence; EntryType type; + bool copied_user_key; }; enum class WriteStallCause { diff --git a/include/rocksdb/utilities/types_util.h b/include/rocksdb/utilities/types_util.h index d1531cf12fdd..ba78d7a0214b 100644 --- a/include/rocksdb/utilities/types_util.h +++ b/include/rocksdb/utilities/types_util.h @@ -27,6 +27,12 @@ Status GetInternalKeyForSeekForPrev(const Slice& user_key, const Comparator* comparator, std::string* buf); +// Util method that takes an internal key and parse it to get `ParsedEntryInfo`. +// Such an internal key usually comes from a table iterator. +// `comparator`: see doc for `GetInternalKeyForSeek`. +Status ParseEntry(const Slice& internal_key, const Comparator* comparator, + ParsedEntryInfo* parsed_entry, bool copied_user_key); + // Util method that takes an internal key and parse it to get `ParsedEntryInfo`. // Such an internal key usually comes from a table iterator. // `comparator`: see doc for `GetInternalKeyForSeek`. diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index ffc374102699..eb4342b322f8 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -99,6 +99,8 @@ set(JNI_NATIVE_SOURCES rocksjni/write_batch_with_index.cc rocksjni/write_buffer_manager.cc rocksjni/writebatchhandlerjnicallback.cc + rocksjni/parsed_entry_info.cc + rocksjni/type_util.cc ) set(JAVA_MAIN_CLASSES @@ -308,6 +310,9 @@ set(JAVA_MAIN_CLASSES src/test/java/org/rocksdb/test/TestableEventListener.java src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java src/test/java/org/rocksdb/util/WriteBatchGetter.java + src/main/java/org/rocksdb/ParsedEntryInfo.java + src/main/java/org/rocksdb/EntryType.java + src/main/java/org/rocksdb/TypeUtil.java ) set(JAVA_TEST_CLASSES diff --git a/java/rocksjni/parsed_entry_info.cc b/java/rocksjni/parsed_entry_info.cc new file mode 100644 index 000000000000..3341e8474e18 --- /dev/null +++ b/java/rocksjni/parsed_entry_info.cc @@ -0,0 +1,199 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ ROCKSDB_NAMESPACE::Iterator methods from Java side. + +#include +#include + +#include "include/org_rocksdb_ParsedEntryInfo.h" +#include "rocksdb/options.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/types_util.h" +#include "rocksjni/cplusplus_to_java_convert.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: newParseEntryInstance + * Signature: ()J + */ +jlong JNICALL Java_org_rocksdb_ParsedEntryInfo_newParseEntryInstance( + JNIEnv * /*env*/, jclass /*cls*/) { + ROCKSDB_NAMESPACE::ParsedEntryInfo *parsed_entry_info = + new ROCKSDB_NAMESPACE::ParsedEntryInfo(); + return GET_CPLUSPLUS_POINTER(parsed_entry_info); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: parseEntry + * Signature: (JJ[BI)V + */ +void JNICALL Java_org_rocksdb_ParsedEntryInfo_parseEntry( + JNIEnv *env, jclass /*cls*/, jlong handle, jlong options_handle, + jbyteArray jtarget, jint len) { + auto *options = + reinterpret_cast(options_handle); + auto *parsed_entry_info = + reinterpret_cast(handle); + jbyte* target = new jbyte[len]; + env->GetByteArrayRegion(jtarget, 0, len, target); + if (env->ExceptionCheck()) { + return; + } + ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast(target), len); + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::ParseEntry(target_slice, options->comparator, + parsed_entry_info, true); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: parseEntryDirect + * Signature: (JJLjava/nio/ByteBuffer;II)V + */ +void JNICALL Java_org_rocksdb_ParsedEntryInfo_parseEntryDirect( + JNIEnv *env, jclass /*clz*/, jlong handle, jlong options_handle, + jobject jbuffer, jint jbuffer_off, jint jbuffer_len) { + auto *options = + reinterpret_cast(options_handle); + auto *parsed_entry_info = + reinterpret_cast(handle); + auto parse = [&env, &parsed_entry_info, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::ParseEntry(target_slice, options->comparator, + parsed_entry_info); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return; + } + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_direct(parse, env, jbuffer, jbuffer_off, + jbuffer_len); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: parseEntryByteArray + * Signature: (JJ[BII)V + */ +void JNICALL Java_org_rocksdb_ParsedEntryInfo_parseEntryByteArray( + JNIEnv *env, jclass /*clz*/, jlong handle, jlong options_handle, + jbyteArray jtarget, jint joff, jint jlen) { + auto *options = + reinterpret_cast(options_handle); + auto *parsed_entry_info = + reinterpret_cast(handle); + auto parse = [&env, &parsed_entry_info, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::ParseEntry(target_slice, options->comparator, + parsed_entry_info); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return true; + } + return false; + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_indirect(parse, env, jtarget, joff, jlen); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: userKeyDirect + * Signature: (JLjava/nio/ByteBuffer;II)I + */ +jint JNICALL Java_org_rocksdb_ParsedEntryInfo_userKeyDirect( + JNIEnv *env, jclass /*clz*/, jlong handle, jobject jtarget, jint joffset, + jint jlen) { + auto *parsed_entry_info = + reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice key_slice = parsed_entry_info->user_key; + return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, key_slice, jtarget, + joffset, jlen); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: userKeyByteArray + * Signature: (J[BII)I + */ +jint JNICALL Java_org_rocksdb_ParsedEntryInfo_userKeyByteArray( + JNIEnv *env, jclass /*clz*/, jlong handle, jbyteArray jtarget, jint joffset, + jint jlen) { + auto *parsed_entry_info = + reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice key_slice = parsed_entry_info->user_key; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice, jtarget, joffset, + jlen); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: userKeyJni + * Signature: (J)[B + */ +jbyteArray JNICALL Java_org_rocksdb_ParsedEntryInfo_userKeyJni(JNIEnv *env, + jclass /*clz*/, + jlong handle) { + auto *parsed_entry_info = + reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice key_slice = parsed_entry_info->user_key; + jbyteArray jkey = env->NewByteArray(static_cast(key_slice.size())); + if (jkey == nullptr) { + // exception thrown: OutOfMemoryError + ROCKSDB_NAMESPACE::OutOfMemoryErrorJni::ThrowNew(env, "Memory allocation failed in RocksDB JNI function"); + return nullptr; + } + ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice, jkey, 0, + static_cast(key_slice.size())); + return jkey; +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: getSequenceNumberJni + * Signature: (J)J + */ +jlong JNICALL Java_org_rocksdb_ParsedEntryInfo_getSequenceNumberJni( + JNIEnv * /*env*/, jclass /*clz*/, jlong handle) { + auto *parsed_entry_info = + reinterpret_cast(handle); + uint64_t sequence_number = parsed_entry_info->sequence; + return static_cast(sequence_number); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: getValueTypeJni + * Signature: (J)B + */ +jbyte JNICALL Java_org_rocksdb_ParsedEntryInfo_getEntryTypeJni(JNIEnv * /*env*/, + jclass /*clz*/, + jlong handle) { + auto *parsed_entry_info = + reinterpret_cast(handle); + ROCKSDB_NAMESPACE::EntryType type = parsed_entry_info->type; + return ROCKSDB_NAMESPACE::EntryTypeJni::toJavaEntryType(type); +} + +/* + * Class: org_rocksdb_ParsedEntryInfo + * Method: disposeInternalJni + * Signature: (J)V + */ +void JNICALL Java_org_rocksdb_ParsedEntryInfo_disposeInternalJni( + JNIEnv * /*env*/, jclass /*clz*/, jlong handle) { + auto *parsed_entry_info = + reinterpret_cast(handle); + assert(parsed_entry_info != nullptr); + if (parsed_entry_info->copied_user_key) { + delete parsed_entry_info->user_key.data(); + } + delete parsed_entry_info; +} diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 094ac379b174..48802836f99f 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -24,6 +24,7 @@ #include #include +#include "db/dbformat.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/filter_policy.h" @@ -2477,6 +2478,42 @@ class JniUtil { return op(key_slice); } + /* + * Helper for operations on a key on java byte array + * Copies values from jbyte array to slice and performs op on the slice. + * for example WriteBatch->Delete + * + * from `op` and used for RocksDB->Delete etc. + */ + static void k_op_indirect(std::function op, + JNIEnv* env, jbyteArray jkey, jint jkey_off, + jint jkey_len) { + if (jkey == nullptr || env->GetArrayLength(jkey) < (jkey_off + jkey_len)) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, + "Invalid key argument"); + return; + } + char* target = new char[jkey_len]; + if (target == nullptr) { + ROCKSDB_NAMESPACE::OutOfMemoryErrorJni::ThrowNew(env, + "Memory allocation failed in RocksDB JNI function"); + return; + } + env->GetByteArrayRegion(jkey, jkey_off, jkey_len, + reinterpret_cast(target)); + if (env->ExceptionCheck()) { + // exception thrown: ArrayIndexOutOfBoundsException + ROCKSDB_NAMESPACE::IllegalArgumentExceptionJni::ThrowNew(env, + "Failed to get byte array region: Out of bounds or invalid array."); + return; + } + ROCKSDB_NAMESPACE::Slice target_slice(target, jkey_len); + bool release_copy = op(target_slice); + if (release_copy) { + delete[] target; + } + } + template static jint copyToDirect(JNIEnv* env, T& source, jobject jtarget, jint jtarget_off, jint jtarget_len) { @@ -2498,6 +2535,31 @@ class JniUtil { return cvalue_len; } + + /* Helper for copying value in source into a byte array. + */ + template + static jint copyBytes(JNIEnv* env, T& source, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + if (jtarget == nullptr || + env->GetArrayLength(jtarget) < (jtarget_off + jtarget_len)) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Invalid target argument"); + return 0; + } + + const jint cvalue_len = static_cast(source.size()); + const jint length = std::min(jtarget_len, cvalue_len); + env->SetByteArrayRegion( + jtarget, jtarget_off, length, + const_cast(reinterpret_cast(source.data()))); + if (env->ExceptionCheck()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Failed to copy data to byte array."); + return 0; + } + return cvalue_len; + } }; class MapJni : public JavaClass { @@ -6444,6 +6506,35 @@ class TxnDBWritePolicyJni { } }; +// The portal class for org.rocksdb.EntryType +class EntryTypeJni { + public: + static jbyte toJavaEntryType(const ROCKSDB_NAMESPACE::EntryType& entry_type) { + switch (entry_type) { + case ROCKSDB_NAMESPACE::EntryType::kEntryPut: + return 0x0; + case ROCKSDB_NAMESPACE::EntryType::kEntryDelete: + return 0x1; + case ROCKSDB_NAMESPACE::EntryType::kEntrySingleDelete: + return 0x2; + case ROCKSDB_NAMESPACE::EntryType::kEntryMerge: + return 0x3; + case ROCKSDB_NAMESPACE::EntryType::kEntryRangeDeletion: + return 0x4; + case ROCKSDB_NAMESPACE::EntryType::kEntryBlobIndex: + return 0x5; + case ROCKSDB_NAMESPACE::EntryType::kEntryDeleteWithTimestamp: + return 0x6; + case ROCKSDB_NAMESPACE::EntryType::kEntryWideColumnEntity: + return 0x7; + case ROCKSDB_NAMESPACE::EntryType::kEntryOther: + return 0x8; + default: + return 0x9; + } + } +}; + // The portal class for org.rocksdb.TransactionDB.KeyLockInfo class KeyLockInfoJni : public JavaClass { public: diff --git a/java/rocksjni/sst_file_readerjni.cc b/java/rocksjni/sst_file_readerjni.cc index 4af472ecfb1c..dd887a9f341d 100644 --- a/java/rocksjni/sst_file_readerjni.cc +++ b/java/rocksjni/sst_file_readerjni.cc @@ -71,6 +71,46 @@ jlong Java_org_rocksdb_SstFileReader_newIterator(JNIEnv * /*env*/, return GET_CPLUSPLUS_POINTER(sst_file_reader->NewIterator(*read_options)); } +/* + * Class: org_rocksdb_SstFileReader + * Method: newTableIterator + * Signature: (JJ)J + */ +jlong Java_org_rocksdb_SstFileReader_newTableIterator0(JNIEnv * /*env*/, + jclass /*jcls*/, + jlong jhandle, + jboolean jhas_from, + jlong from_slice_handle, + jboolean jhas_to, + jlong to_slice_handle) { + auto *sst_file_reader = + reinterpret_cast(jhandle); + ROCKSDB_NAMESPACE::Slice* from_slice = nullptr; + ROCKSDB_NAMESPACE::Slice* to_slice = nullptr; + bool has_from = static_cast(jhas_from); + bool has_to = static_cast(jhas_to); + if (has_from) { + from_slice = reinterpret_cast(from_slice_handle); + } + if (has_to) { + to_slice = reinterpret_cast(to_slice_handle); + } + return GET_CPLUSPLUS_POINTER(sst_file_reader->NewTableIterator(from_slice, to_slice).release()); +} + +/* + * Class: org_rocksdb_SstFileReader + * Method: newTableIterator + * Signature: (JJ)J + */ +jlong Java_org_rocksdb_SstFileReader_newTableIterator1(JNIEnv * /*env*/, + jclass /*jcls*/, + jlong jhandle) { + auto *sst_file_reader = + reinterpret_cast(jhandle); + return GET_CPLUSPLUS_POINTER(sst_file_reader->NewTableIterator().release()); +} + /* * Class: org_rocksdb_SstFileReader * Method: disposeInternal diff --git a/java/rocksjni/type_util.cc b/java/rocksjni/type_util.cc new file mode 100644 index 000000000000..4ee3d8547e21 --- /dev/null +++ b/java/rocksjni/type_util.cc @@ -0,0 +1,327 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ ROCKSDB_NAMESPACE::Iterator methods from Java side. + +#include +#include + +#include "include/org_rocksdb_TypeUtil.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/types_util.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyDirect0 + * Signature: (Ljava/nio/ByteBuffer;IILjava/nio/ByteBuffer;II)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyDirect0( + JNIEnv *env, jclass /*clz*/, jobject user_key, jint user_key_off, + jint user_key_len, jobject int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeek(target_slice, options->comparator, + &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return; + } + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_direct(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyByteArray0 + * Signature: ([BIILjava/nio/ByteBuffer;IIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyByteArray0( + JNIEnv *env, jclass /*clz*/, jbyteArray user_key, jint user_key_off, + jint user_key_len, jobject int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeek(target_slice, options->comparator, + &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + } + return true; + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_indirect(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyDirect1 + * Signature: (Ljava/nio/ByteBuffer;II[BIIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyDirect1( + JNIEnv *env, jclass /*cls*/, jobject user_key, jint user_key_off, + jint user_key_len, jbyteArray int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeek(target_slice, options->comparator, + &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return; + } + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_direct(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyByteArray1 + * Signature: ([BII[BIIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyByteArray1( + JNIEnv *env, jclass /*cls*/, jbyteArray user_key, jint user_key_off, + jint user_key_len, jbyteArray int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeek(target_slice, options->comparator, + &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + } + return true; + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_indirect(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyJni + * Signature: ([BIJ)[B + */ +jbyteArray JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyJni( + JNIEnv *env, jclass /*cls*/, jbyteArray user_key, jint user_key_len, + jlong options_handle) { + jbyte* target = new jbyte[user_key_len]; + env->GetByteArrayRegion(user_key, 0, user_key_len, target); + if (env->ExceptionCheck()) { + // exception thrown: ArrayIndexOutOfBoundsException + delete[] target; + return nullptr; + } + auto *options = + reinterpret_cast(options_handle); + ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast(target), + user_key_len); + std::string seek_key_buf; + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeek(target_slice, options->comparator, + &seek_key_buf); + delete[] target; + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return nullptr; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyDirectForPrev0 + * Signature: (Ljava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyDirectForPrev0( + JNIEnv *env, jclass /*clz*/, jobject user_key, jint user_key_off, + jint user_key_len, jobject int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeekForPrev( + target_slice, options->comparator, &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return; + } + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_direct(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyByteArrayForPrev0 + * Signature: ([BIILjava/nio/ByteBuffer;IIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyByteArrayForPrev0( + JNIEnv *env, jclass /*clz*/, jbyteArray user_key, jint user_key_off, + jint user_key_len, jobject int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeekForPrev( + target_slice, options->comparator, &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + } + return true; + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_indirect(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyDirectForPrev1 + * Signature: (Ljava/nio/ByteBuffer;II[BIIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyDirectForPrev1( + JNIEnv *env, jclass /*cls*/, jobject user_key, jint user_key_off, + jint user_key_len, jbyteArray int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeekForPrev( + target_slice, options->comparator, &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return; + } + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_direct(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyByteArrayForPrev1 + * Signature: ([BII[BIIJ)I + */ +jint JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyByteArrayForPrev1( + JNIEnv *env, jclass /*cls*/, jbyteArray user_key, jint user_key_off, + jint user_key_len, jbyteArray int_key, jint int_key_off, jint int_key_len, + jlong options_handle) { + auto *options = + reinterpret_cast(options_handle); + std::string seek_key_buf; + auto getInternalKeySeek = [&env, &seek_key_buf, + &options](ROCKSDB_NAMESPACE::Slice &target_slice) { + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeekForPrev( + target_slice, options->comparator, &seek_key_buf); + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + } + return true; + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_indirect(getInternalKeySeek, env, user_key, + user_key_off, user_key_len); + if (env->ExceptionCheck()) { + return 0; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice, int_key, + int_key_off, int_key_len); +} + +/* + * Class: org_rocksdb_TypeUtil + * Method: getInternalKeyForPrevJni + * Signature: ([BIJ)[B + */ +jbyteArray JNICALL Java_org_rocksdb_TypeUtil_getInternalKeyForPrevJni( + JNIEnv *env, jclass /*cls*/, jbyteArray user_key, jint user_key_len, + jlong options_handle) { + jbyte* target = new jbyte[user_key_len]; + env->GetByteArrayRegion(user_key, 0, user_key_len, target); + if (env->ExceptionCheck()) { + // exception thrown: ArrayIndexOutOfBoundsException + delete[] target; + return nullptr; + } + auto *options = + reinterpret_cast(options_handle); + ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast(target), + user_key_len); + std::string seek_key_buf; + ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::GetInternalKeyForSeekForPrev( + target_slice, options->comparator, &seek_key_buf); + delete[] target; + if (!s.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s); + return nullptr; + } + ROCKSDB_NAMESPACE::Slice key_slice = seek_key_buf; + return ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, key_slice); +} diff --git a/java/src/main/java/org/rocksdb/EntryType.java b/java/src/main/java/org/rocksdb/EntryType.java new file mode 100644 index 000000000000..692da8058c0f --- /dev/null +++ b/java/src/main/java/org/rocksdb/EntryType.java @@ -0,0 +1,36 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * Value types of entry in SST file. + */ +public enum EntryType { + kEntryPut((byte) 0x0), + kEntryDelete((byte) 0x1), + kEntrySingleDelete((byte) 0x2), + kEntryMerge((byte) 0x3), + kEntryRangeDeletion((byte) 0x4), + kEntryBlobIndex((byte) 0x5), + kEntryDeleteWithTimestamp((byte) 0x6), + kEntryWideColumnEntity((byte) 0x7), + kEntryOther((byte) 0x8), + kInvalid((byte) 0x9); + private final byte value; + + EntryType(final byte value) { + this.value = value; + } + + public static EntryType getEntryType(final byte value) { + for (final EntryType entryType : EntryType.values()) { + if (value == entryType.value) { + return entryType; + } + } + throw new IllegalArgumentException("Invalid EntryType byte " + value); + } +} diff --git a/java/src/main/java/org/rocksdb/ParsedEntryInfo.java b/java/src/main/java/org/rocksdb/ParsedEntryInfo.java new file mode 100644 index 000000000000..4f26b9abfbed --- /dev/null +++ b/java/src/main/java/org/rocksdb/ParsedEntryInfo.java @@ -0,0 +1,118 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +import java.nio.ByteBuffer; + +/** + * Class to parse internal key to extract user key, entry type, sequence number. + */ +public class ParsedEntryInfo extends RocksObject { + protected ParsedEntryInfo() { + super(newParseEntryInstance()); + } + + @Override + protected void disposeInternal(final long handle) { + disposeInternalJni(handle); + } + + /** + * Returns the entryType of record in the sstFile. + */ + public EntryType getEntryType() { + assert (isOwningHandle()); + return EntryType.getEntryType(getEntryTypeJni(nativeHandle_)); + } + + /** + * Returns the sequence number of the record in the sstFile. + */ + public long getSequenceNumber() { + return getSequenceNumberJni(nativeHandle_); + } + + /** + * Returns the user key of the record in the sstFile. + */ + public byte[] getUserKey() { + assert (isOwningHandle()); + return userKeyJni(nativeHandle_); + } + + /** + * + * @param key Byte buffer to write the user key into. + * @return length of the key. + */ + public int userKey(final ByteBuffer key) { + if (key == null) { + throw new IllegalArgumentException("ByteBuffer parameters must not be null"); + } + assert (isOwningHandle()); + final int result; + if (key.isDirect()) { + result = userKeyDirect(nativeHandle_, key, key.position(), key.remaining()); + } else { + result = userKeyByteArray( + nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining()); + } + key.limit(Math.min(key.position() + result, key.limit())); + return result; + } + + /** + * Parses internal key to get initialize the values in this class. + * @param options options used while writing the sst file. + * @param internalKey byte array containing the internal key. + */ + public final void parseEntry(final Options options, final byte[] internalKey) { + if (options == null || internalKey == null) { + throw new IllegalArgumentException("ByteBuffer and options parameters must not be null"); + } + assert (isOwningHandle()); + parseEntry(nativeHandle_, options.getNativeHandle(), internalKey, internalKey.length); + } + + /** + * Parses internal key to get initialize the values in this class. + * @param options options used while writing the sst file. + * @param internalKey ByteBuffer containing the internal key. + */ + public final void parseEntry(final Options options, final ByteBuffer internalKey) { + if (options == null || internalKey == null) { + throw new IllegalArgumentException("ByteBuffer and options parameters must not be null"); + } + assert (isOwningHandle()); + if (internalKey.isDirect()) { + parseEntryDirect(nativeHandle_, options.getNativeHandle(), internalKey, + internalKey.position(), internalKey.remaining()); + } else { + parseEntryByteArray(nativeHandle_, options.getNativeHandle(), internalKey.array(), + internalKey.arrayOffset() + internalKey.position(), internalKey.remaining()); + } + internalKey.position(internalKey.limit()); + } + + private static native long newParseEntryInstance(); + + private static native void parseEntry( + final long handle, final long optionsHandle, final byte[] buffer, final int bufferLen); + + private static native void parseEntryDirect( + final long handle, final long optionsHandle, final ByteBuffer buffer, final int bufferOffset, + final int bufferLen); + private static native void parseEntryByteArray( + final long handle, final long optionsHandle, final byte[] buffer, final int bufferOffset, final int bufferLen); + private static native int userKeyDirect( + final long handle, final ByteBuffer target, final int bufferOffset, final int bufferLen); + private static native int userKeyByteArray( + final long handle, final byte[] target, final int bufferOffset, final int bufferLen); + private static native byte[] userKeyJni(final long handle); + private static native long getSequenceNumberJni(final long handle); + private static native byte getEntryTypeJni(final long handle); + private static native void disposeInternalJni(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/SstFileReader.java b/java/src/main/java/org/rocksdb/SstFileReader.java index 46bebf1dd2e3..ce871a845f98 100644 --- a/java/src/main/java/org/rocksdb/SstFileReader.java +++ b/java/src/main/java/org/rocksdb/SstFileReader.java @@ -32,6 +32,38 @@ public SstFileReaderIterator newIterator(final ReadOptions readOptions) { return new SstFileReaderIterator(this, iter); } + /** + * Returns an iterator that will iterate on all keys(including tombstones) in the default + * column family including both keys in the DB and uncommitted keys in this + * transaction. + * Caller is responsible for deleting the returned Iterator. + * + * @return instance of iterator object. + */ + public SstFileReaderIterator newTableIterator(final AbstractSlice fromKey, final AbstractSlice toKey) { + assert (isOwningHandle() && (fromKey == null || fromKey.isOwningHandle()) && + (toKey == null || toKey.isOwningHandle())); + long fromNativeHandle = fromKey == null ? 0 : fromKey.getNativeHandle(); + long toNativeHandle = toKey == null ? 0 : toKey.getNativeHandle(); + final long iter = newTableIterator0(nativeHandle_, fromKey != null, fromNativeHandle, + toKey != null, toNativeHandle); + return new SstFileReaderIterator(this, iter); + } + + /** + * Returns an iterator that will iterate on all keys(including tombstones) in the default + * column family including both keys in the DB and uncommitted keys in this + * transaction. + * Caller is responsible for deleting the returned Iterator. + * + * @return instance of iterator object. + */ + public SstFileReaderIterator newTableIterator() { + assert (isOwningHandle()); + final long iter = newTableIterator1(nativeHandle_); + return new SstFileReaderIterator(this, iter); + } + /** * Prepare SstFileReader to read a file. * @@ -71,7 +103,9 @@ protected final void disposeInternal(final long handle) { } private static native void disposeInternalJni(final long handle); private static native long newIterator(final long handle, final long readOptionsHandle); - + private static native long newTableIterator0(final long handle, final boolean hasFromKey, + final long readOptionsHandle, final boolean hasToKey, final long toKeyHandle); + private static native long newTableIterator1(final long handle); private static native void open(final long handle, final String filePath) throws RocksDBException; private static native long newSstFileReader(final long optionsHandle); diff --git a/java/src/main/java/org/rocksdb/TypeUtil.java b/java/src/main/java/org/rocksdb/TypeUtil.java new file mode 100644 index 000000000000..769f5c3effd9 --- /dev/null +++ b/java/src/main/java/org/rocksdb/TypeUtil.java @@ -0,0 +1,121 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +package org.rocksdb; + +import java.nio.ByteBuffer; + +/** + * Util class to get internal key for seeking from user key. + */ +public class TypeUtil { + public static byte[] getInternalKey(final byte[] userKey, final Options options) { + if (options == null || userKey == null) { + throw new IllegalArgumentException("ByteBuffer and options parameters must not be null"); + } + return getInternalKeyJni(userKey, userKey.length, options.getNativeHandle()); + } + + public static int getInternalKey(final ByteBuffer userKey, final ByteBuffer internalKey, final Options options) { + int result; + if (options == null || userKey == null || internalKey == null) { + throw new IllegalArgumentException("ByteBuffer and options parameters must not be null"); + } + if (userKey.isDirect()) { + if (internalKey.isDirect()) { + result = + getInternalKeyDirect0(userKey, userKey.position(), userKey.remaining(), internalKey, + internalKey.position(), internalKey.remaining(), options.getNativeHandle()); + } else { + result = getInternalKeyDirect1(userKey, userKey.position(), userKey.remaining(), + internalKey.array(), internalKey.arrayOffset() + internalKey.position(), + internalKey.remaining(), options.getNativeHandle()); + } + } else { + if (internalKey.isDirect()) { + result = getInternalKeyByteArray0(userKey.array(), + userKey.arrayOffset() + userKey.position(), userKey.remaining(), internalKey, + internalKey.position(), internalKey.remaining(), options.getNativeHandle()); + } else { + result = getInternalKeyByteArray1(userKey.array(), + userKey.arrayOffset() + userKey.position(), userKey.remaining(), internalKey.array(), + internalKey.arrayOffset() + internalKey.position(), internalKey.remaining(), + options.getNativeHandle()); + } + } + userKey.position(userKey.limit()); + internalKey.limit(Math.min(internalKey.position() + result, internalKey.limit())); + return result; + } + + public static byte[] getInternalKeyForPrev(final byte[] userKey, final Options options) { + if (options == null || userKey == null) { + throw new IllegalArgumentException("Byte array and options parameters must not be null"); + } + return getInternalKeyForPrevJni(userKey, userKey.length, options.getNativeHandle()); + } + + public static int getInternalKeyForPrev( + final ByteBuffer userKey, final ByteBuffer internalKey, final Options options) { + if (options == null || userKey == null || internalKey == null) { + throw new IllegalArgumentException("ByteBuffer and options parameters must not be null"); + } + int result; + if (userKey.isDirect()) { + if (internalKey.isDirect()) { + result = getInternalKeyDirectForPrev0(userKey, userKey.position(), userKey.remaining(), + internalKey, internalKey.position(), internalKey.remaining(), + options.getNativeHandle()); + } else { + result = getInternalKeyDirectForPrev1(userKey, userKey.position(), userKey.remaining(), + internalKey.array(), internalKey.arrayOffset() + internalKey.position(), + internalKey.remaining(), options.getNativeHandle()); + } + } else { + if (internalKey.isDirect()) { + result = getInternalKeyByteArrayForPrev0(userKey.array(), + userKey.arrayOffset() + userKey.position(), userKey.remaining(), internalKey, + internalKey.position(), internalKey.remaining(), options.getNativeHandle()); + } else { + result = getInternalKeyByteArrayForPrev1(userKey.array(), + userKey.arrayOffset() + userKey.position(), userKey.remaining(), internalKey.array(), + internalKey.arrayOffset() + internalKey.position(), internalKey.remaining(), + options.getNativeHandle()); + } + } + userKey.position(userKey.limit()); + internalKey.limit(Math.min(internalKey.position() + result, internalKey.limit())); + return result; + } + + private static native int getInternalKeyDirect0(final ByteBuffer userKey, final int userKeyOffset, + int userKeyLen, ByteBuffer internalKey, int internalKeyOffset, int internalKeyLen, + long optionsHandle); + private static native int getInternalKeyByteArray0(final byte[] userKey, final int userKeyOffset, + final int userKeyLen, final ByteBuffer internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native int getInternalKeyDirect1(final ByteBuffer userKey, final int userKeyOffset, + final int userKeyLen, final byte[] internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native int getInternalKeyByteArray1(final byte[] userKey, final int userKeyOffset, + final int userKeyLen, final byte[] internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native byte[] getInternalKeyJni( + final byte[] userKey, final int userKeyLen, final long optionsHandle); + + private static native int getInternalKeyDirectForPrev0(final ByteBuffer userKey, final int userKeyOffset, + final int userKeyLen, final ByteBuffer internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native int getInternalKeyByteArrayForPrev0(final byte[] userKey, final int userKeyOffset, + final int userKeyLen, final ByteBuffer internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native int getInternalKeyDirectForPrev1(final ByteBuffer userKey, final int userKeyOffset, + final int userKeyLen, final byte[] internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native int getInternalKeyByteArrayForPrev1(final byte[] userKey, final int userKeyOffset, + final int userKeyLen, final byte[] internalKey, final int internalKeyOffset, final int internalKeyLen, + final long optionsHandle); + private static native byte[] getInternalKeyForPrevJni( + final byte[] userKey, final int userKeyLen, final long optionsHandle); +} diff --git a/java/src/test/java/org/rocksdb/SstFileReaderTest.java b/java/src/test/java/org/rocksdb/SstFileReaderTest.java index 27934e0f80b6..6923ec974831 100644 --- a/java/src/test/java/org/rocksdb/SstFileReaderTest.java +++ b/java/src/test/java/org/rocksdb/SstFileReaderTest.java @@ -8,6 +8,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.rocksdb.EntryType.kEntryDelete; +import static org.rocksdb.EntryType.kEntryMerge; +import static org.rocksdb.EntryType.kEntryPut; import java.io.File; import java.io.IOException; @@ -62,9 +65,25 @@ public static Iterable parameters() { @Parameterized.Parameter(1) public ByteBufferAllocator byteBufferAllocator; - enum OpType { PUT, PUT_BYTES, MERGE, MERGE_BYTES, DELETE, DELETE_BYTES } + enum OpType { + PUT(kEntryPut), + PUT_BYTES(kEntryPut), + MERGE(kEntryMerge), + MERGE_BYTES(kEntryMerge), + DELETE(kEntryDelete), + DELETE_BYTES(kEntryDelete); - private File newSstFile(final List keyValues) + private final EntryType entryType; + + private OpType(EntryType entryType) { + this.entryType = entryType; + } + public EntryType getEntryType() { + return entryType; + } + } + + static File newSstFile(TemporaryFolder parentFolder, final List keyValues) throws IOException, RocksDBException { final EnvOptions envOptions = new EnvOptions(); final StringAppendOperator stringAppendOperator = new StringAppendOperator(); @@ -122,14 +141,14 @@ public void readSstFile() throws RocksDBException, IOException { keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT)); keyValues.add(new KeyValueWithOp("key3", "value3", OpType.PUT)); - final File sstFile = newSstFile(keyValues); + final File sstFile = newSstFile(parentFolder, keyValues); try (final StringAppendOperator stringAppendOperator = new StringAppendOperator(); final Options options = new Options().setCreateIfMissing(true).setMergeOperator(stringAppendOperator); - final SstFileReader reader = new SstFileReader(options)) { + final SstFileReader reader = new SstFileReader(options); + final ReadOptions readOptions = new ReadOptions()) { // Open the sst file and iterator reader.open(sstFile.getAbsolutePath()); - final ReadOptions readOptions = new ReadOptions(); final SstFileReaderIterator iterator = reader.newIterator(readOptions); // Use the iterator to read sst file diff --git a/java/src/test/java/org/rocksdb/SstTableReaderIteratorTest.java b/java/src/test/java/org/rocksdb/SstTableReaderIteratorTest.java new file mode 100644 index 000000000000..28bc64daf0b1 --- /dev/null +++ b/java/src/test/java/org/rocksdb/SstTableReaderIteratorTest.java @@ -0,0 +1,350 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.rocksdb.SstFileReaderTest.newSstFile; +import static org.rocksdb.util.ByteBufferAllocator.DIRECT; +import static org.rocksdb.util.ByteBufferAllocator.HEAP; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; +import java.util.stream.Collectors; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.rocksdb.SstFileReaderTest.KeyValueWithOp; +import org.rocksdb.SstFileReaderTest.OpType; +import org.rocksdb.util.ByteBufferAllocator; + +@RunWith(Parameterized.class) +public class SstTableReaderIteratorTest { + + private static File sstFile; + private static List kvs; + + @ClassRule + public static TemporaryFolder parentFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{0}") + public static Iterable parameters() { + return Arrays.asList(new Object[][] { + {"direct-direct", DIRECT, DIRECT}, + {"direct-indirect", DIRECT, HEAP}, + {"indirect-direct", HEAP, DIRECT}, + {"indirect-indirect", HEAP, HEAP}, + }); + } + + public SstTableReaderIteratorTest(String name, ByteBufferAllocator userByteBufferAllocator, + ByteBufferAllocator internalByteBufferAllocator) throws RocksDBException, IOException { + this.name = name; + this.userByteBufferAllocator = userByteBufferAllocator; + this.internalByteBufferAllocator = internalByteBufferAllocator; + } + + @BeforeClass + public static void setUp() throws Exception { + kvs = new ArrayList<>(); + sstFile = createSstFileWithKeys(kvs); + } + + + private static File createSstFileWithKeys(List keyValues) throws IOException, + RocksDBException { + keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT)); + keyValues.add(new KeyValueWithOp("key11", "", OpType.DELETE)); + keyValues.add(new KeyValueWithOp("key12", "", OpType.DELETE)); + keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT)); + keyValues.add(new KeyValueWithOp("key21", "", OpType.DELETE)); + keyValues.add(new KeyValueWithOp("key22", "", OpType.DELETE)); + keyValues.add(new KeyValueWithOp("key3", "value3", OpType.PUT)); + keyValues.add(new KeyValueWithOp("key31", "", OpType.DELETE)); + keyValues.add(new KeyValueWithOp("key32", "", OpType.DELETE)); + keyValues.add(new KeyValueWithOp("key33", "value33_merge", OpType.MERGE)); + return newSstFile(parentFolder, keyValues); + } + + private final String name; + + private final ByteBufferAllocator userByteBufferAllocator; + + private final ByteBufferAllocator internalByteBufferAllocator; + + private void assertInternalKey(final Options options, final ParsedEntryInfo parsedEntryInfo, + final SstFileReaderIterator sstFileReaderIterator, final ByteBuffer internalKey, ByteBuffer userKey, + final String expectedUserKey, final EntryType expectedEntryType) { + // Adding random 4 bytes before getting the internal key length. + internalKey.putInt(1056); + sstFileReaderIterator.key(internalKey); + userKey.clear(); + // Adding random 4 bytes before getting the user key length. + userKey.putInt(1039); + assertEquals(4, userKey.position()); + assertEquals(128 - 4, userKey.remaining()); + parsedEntryInfo.parseEntry(options, internalKey); + assertEquals(0, internalKey.remaining()); + byte[] expectedUserKeyBytes = expectedUserKey.getBytes(); + assertThat(expectedUserKeyBytes.length).isEqualTo(parsedEntryInfo.userKey(userKey)); + assertThat(userKey.position()).isEqualTo(4); + assertThat(userKey.remaining()).isEqualTo(expectedUserKeyBytes.length); + byte[] dst = new byte[expectedUserKeyBytes.length]; + userKey.get(dst); + assertEquals(new String(expectedUserKeyBytes), new String(dst)); + assertEquals(expectedEntryType, parsedEntryInfo.getEntryType()); + internalKey.clear(); + userKey.clear(); + assertInternalKeyByteArray(options, parsedEntryInfo, sstFileReaderIterator.key(), expectedUserKey, + expectedEntryType); + } + + private void assertInternalKeyByteArray(final Options options, final ParsedEntryInfo parsedEntryInfo, + final byte[] internalKey, final String expectedUserKey, final EntryType expectedEntryType) { + parsedEntryInfo.parseEntry(options, internalKey); + byte[] expectedUserKeyBytes = expectedUserKey.getBytes(); + assertArrayEquals(expectedUserKeyBytes, parsedEntryInfo.getUserKey()); + assertEquals(expectedEntryType, parsedEntryInfo.getEntryType()); + } + + private void seekTableIterator(final SstFileReaderIterator iterator, final ByteBuffer userKey, + final ByteBuffer internalKey, final Options options) { + byte[] userKeyArray = new byte[userKey.remaining()]; + // Adding random 4 bytes before getting the user key length. + internalKey.putInt(1540); + userKey.asReadOnlyBuffer().get(userKeyArray); + int len = TypeUtil.getInternalKey(userKey, internalKey, options); + byte[] internalKeyArray = TypeUtil.getInternalKey(userKeyArray, options); + assertEquals(4, internalKey.position()); + assertEquals(len, internalKey.remaining()); + byte[] internalKeyArrayFromByteBuffer = new byte[internalKey.remaining()]; + internalKey.asReadOnlyBuffer().get(internalKeyArrayFromByteBuffer); + assertArrayEquals(internalKeyArray, internalKeyArrayFromByteBuffer); + iterator.seek(internalKey); + assertThat(internalKey.position()).isEqualTo(len + 4); + assertThat(internalKey.limit()).isEqualTo(len + 4); + internalKey.clear(); + } + + private void seekTableIteratorForPrev(final SstFileReaderIterator iterator, final ByteBuffer userKey, + final ByteBuffer internalKey, final Options options) { + byte[] userKeyArray = new byte[userKey.remaining()]; + userKey.asReadOnlyBuffer().get(userKeyArray); + internalKey.putInt(1540); + int len = TypeUtil.getInternalKeyForPrev(userKey, internalKey, options); + byte[] internalKeyArray = TypeUtil.getInternalKeyForPrev(userKeyArray, options); + assertEquals(4, internalKey.position()); + assertEquals(len, internalKey.remaining()); + byte[] internalKeyArrayFromByteBuffer = new byte[internalKey.remaining()]; + internalKey.asReadOnlyBuffer().get(internalKeyArrayFromByteBuffer); + assertArrayEquals(internalKeyArray, internalKeyArrayFromByteBuffer); + iterator.seekForPrev(internalKey); + assertThat(internalKey.position()).isEqualTo(len + 4); + assertThat(internalKey.limit()).isEqualTo(len + 4); + internalKey.clear(); + } + + @Test + public void readSstFileTableIterator() throws RocksDBException { + SstFileReaderIterator iterator = null; + try (final StringAppendOperator stringAppendOperator = new StringAppendOperator(); + final Options options = + new Options().setCreateIfMissing(true).setMergeOperator(stringAppendOperator); + final SstFileReader reader = new SstFileReader(options); + final ParsedEntryInfo parsedEntryInfo = new ParsedEntryInfo()) { + // Open the sst file and iterator + reader.open(sstFile.getAbsolutePath()); + iterator = reader.newTableIterator(); + + // Use the iterator to read sst file + iterator.seekToFirst(); + + // Verify Checksum + reader.verifyChecksum(); + + // Verify Table Properties + assertEquals(reader.getTableProperties().getNumEntries(), 10); + final ByteBuffer userByteBuffer = userByteBufferAllocator.allocate(128); + final ByteBuffer internalKeyByteBuffer = internalByteBufferAllocator.allocate(128); + + // Check key and value + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, + "key1", EntryType.kEntryPut); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + + + userByteBuffer.put("key1".getBytes()).flip(); + seekTableIterator(iterator, userByteBuffer, internalKeyByteBuffer, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, + "key1", EntryType.kEntryPut); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + + { + userByteBuffer.clear(); + int length = iterator.key(userByteBuffer); + final byte[] dst = new byte[length]; + userByteBuffer.get(dst); + assertInternalKeyByteArray(options, parsedEntryInfo, dst, "key1", EntryType.kEntryPut); + } + + { + userByteBuffer.clear(); + userByteBuffer.put("PREFIX".getBytes()); + final ByteBuffer slice = userByteBuffer.slice(); + final byte[] dst = new byte[iterator.key(userByteBuffer)]; + slice.get(dst); + assertInternalKeyByteArray(options, parsedEntryInfo, dst, "key1", EntryType.kEntryPut); + } + + { + userByteBuffer.clear(); + assertThat(iterator.value(userByteBuffer)).isEqualTo("value1".getBytes().length); + final byte[] dst = new byte["value1".getBytes().length]; + userByteBuffer.get(dst); + assertThat(new String(dst)).isEqualTo("value1"); + } + + userByteBuffer.clear(); + userByteBuffer.put("key10".getBytes()).flip(); + seekTableIterator(iterator, userByteBuffer, internalKeyByteBuffer, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, "key11", EntryType.kEntryDelete); + + userByteBuffer.clear(); + userByteBuffer.put("key1point5".getBytes()).flip(); + seekTableIteratorForPrev(iterator, userByteBuffer, internalKeyByteBuffer, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, "key12", EntryType.kEntryDelete); + + userByteBuffer.clear(); + userByteBuffer.put("key2point5".getBytes()).flip(); + seekTableIterator(iterator, userByteBuffer, internalKeyByteBuffer, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, "key3", EntryType.kEntryPut); + assertThat(iterator.value()).isEqualTo("value3".getBytes()); + + userByteBuffer.clear(); + userByteBuffer.put("key2point5".getBytes()).flip(); + seekTableIteratorForPrev(iterator, userByteBuffer, internalKeyByteBuffer, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, "key22", EntryType.kEntryDelete); + + userByteBuffer.clear(); + internalKeyByteBuffer.put("PREFIX".getBytes()); + final ByteBuffer slice = internalKeyByteBuffer.slice(); + userByteBuffer.put("key1point5".getBytes()).flip(); + seekTableIteratorForPrev(iterator, userByteBuffer, slice, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, "key12", EntryType.kEntryDelete); + + userByteBuffer.clear(); + userByteBuffer.put("key3point5".getBytes()).flip(); + seekTableIteratorForPrev(iterator, userByteBuffer, internalKeyByteBuffer, options); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, "key33", + EntryType.kEntryMerge); + assertThat(iterator.value()).isEqualTo("value33_merge".getBytes()); + } finally { + if (iterator != null) { + iterator.close(); + } + } + } + + @Test + public void testReadTableForwardIteratorWithLimits() throws RocksDBException { + SstFileReaderIterator iterator = null; + String lowerBound = "key1point5"; + String upperBound = "key3"; + ByteBuffer lowerBoundBuffer = ByteBuffer.allocateDirect(128); + ByteBuffer upperBoundBuffer = ByteBuffer.allocateDirect(128); + lowerBoundBuffer.putInt(10294); + upperBoundBuffer.putInt(15948); + try (final StringAppendOperator stringAppendOperator = new StringAppendOperator(); + final Options options = + new Options().setCreateIfMissing(true).setMergeOperator(stringAppendOperator); + final SstFileReader reader = new SstFileReader(options); + final ParsedEntryInfo parsedEntryInfo = new ParsedEntryInfo(); + DirectSlice lowerSliceBound = new DirectSlice(lowerBoundBuffer); + DirectSlice upperSliceBound = new DirectSlice(upperBoundBuffer)) { + lowerBoundBuffer.put(TypeUtil.getInternalKey(lowerBound.getBytes(), options)); + upperBoundBuffer.put(TypeUtil.getInternalKey(upperBound.getBytes(), options)); + lowerBoundBuffer.flip(); + upperBoundBuffer.flip(); + lowerBoundBuffer.position(4); + upperBoundBuffer.position(4); + lowerSliceBound.removePrefix(4); + upperSliceBound.removePrefix(4); + lowerSliceBound.setLength(lowerBoundBuffer.remaining()); + upperSliceBound.setLength(upperBoundBuffer.remaining()); + final ByteBuffer userByteBuffer = userByteBufferAllocator.allocate(128); + final ByteBuffer internalKeyByteBuffer = internalByteBufferAllocator.allocate(128); + // Open the sst file and iterator + reader.open(sstFile.getAbsolutePath()); + iterator = reader.newTableIterator(lowerSliceBound, upperSliceBound); + iterator.seekToFirst(); + Queue expectedKeys = kvs.stream() + .filter(kv -> lowerBound.compareTo(kv.getKey()) <= 0 && + upperBound.compareTo(kv.getKey()) > 0).collect(Collectors.toCollection(LinkedList::new)); + while (!expectedKeys.isEmpty()) { + KeyValueWithOp expectedKey = expectedKeys.poll(); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, + expectedKey.getKey(), expectedKey.getOpType().getEntryType()); + assertArrayEquals(expectedKey.getValue().getBytes(), iterator.value()); + iterator.next(); + } + assertFalse(iterator.isValid()); + } + } + + @Test + public void testReadTableReverseIteratorWithLimits() throws RocksDBException { + SstFileReaderIterator iterator = null; + String lowerBound = "key1point5"; + String upperBound = "key3"; + try (final StringAppendOperator stringAppendOperator = new StringAppendOperator(); + final Options options = + new Options().setCreateIfMissing(true).setMergeOperator(stringAppendOperator); + final SstFileReader reader = new SstFileReader(options); + final ParsedEntryInfo parsedEntryInfo = new ParsedEntryInfo(); + Slice lowerSliceBound = new Slice(TypeUtil.getInternalKey(lowerBound.getBytes(), options)); + Slice upperSliceBound = new Slice(TypeUtil.getInternalKey(upperBound.getBytes(), options));) { + + final ByteBuffer userByteBuffer = userByteBufferAllocator.allocate(128); + final ByteBuffer internalKeyByteBuffer = internalByteBufferAllocator.allocate(128); + // Open the sst file and iterator + reader.open(sstFile.getAbsolutePath()); + iterator = reader.newTableIterator(lowerSliceBound, upperSliceBound); + iterator.seekToLast(); + Stack expectedKeys = kvs.stream() + .filter(kv -> lowerBound.compareTo(kv.getKey()) <= 0 && + upperBound.compareTo(kv.getKey()) > 0).collect(Collectors.toCollection(Stack::new)); + while (!expectedKeys.isEmpty()) { + KeyValueWithOp expectedKey = expectedKeys.pop(); + assertThat(iterator.isValid()).isTrue(); + assertInternalKey(options, parsedEntryInfo, iterator, internalKeyByteBuffer, userByteBuffer, + expectedKey.getKey(), expectedKey.getOpType().getEntryType()); + assertArrayEquals(expectedKey.getValue().getBytes(), iterator.value()); + iterator.prev(); + } + assertFalse(iterator.isValid()); + } + } +} diff --git a/src.mk b/src.mk index 5eac640572d1..2fa77e361049 100644 --- a/src.mk +++ b/src.mk @@ -761,4 +761,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/writebatchhandlerjnicallback.cc \ java/rocksjni/write_batch_test.cc \ java/rocksjni/write_batch_with_index.cc \ - java/rocksjni/write_buffer_manager.cc + java/rocksjni/write_buffer_manager.cc \ + java/rocksjni/parsed_entry_info.cc \ + java/rocksjni/type_util.cc + diff --git a/table/sst_file_reader.cc b/table/sst_file_reader.cc index 11013712e281..f31e53a40987 100644 --- a/table/sst_file_reader.cc +++ b/table/sst_file_reader.cc @@ -180,19 +180,23 @@ Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) { return res; } -std::unique_ptr SstFileReader::NewTableIterator() { +std::unique_ptr SstFileReader::NewTableIterator(const Slice* from_key, const Slice* to_key) { auto r = rep_.get(); InternalIterator* internal_iter = r->table_reader->NewIterator( - r->roptions_for_table_iter, r->moptions.prefix_extractor.get(), - /*arena*/ nullptr, false /* skip_filters */, - TableReaderCaller::kSSTFileReader); + r->roptions_for_table_iter, + r->moptions.prefix_extractor.get(),/*arena*/ nullptr, false /* skip_filters */, + TableReaderCaller::kSSTFileReader); assert(internal_iter); if (internal_iter == nullptr) { // Do not attempt to create a TableIterator if we cannot get a valid // InternalIterator. return nullptr; } - return std::make_unique(internal_iter); + return std::make_unique(internal_iter, from_key, to_key, &r->ioptions.internal_comparator); +} + +std::unique_ptr SstFileReader::NewTableIterator() { + return NewTableIterator(nullptr, nullptr); } std::shared_ptr SstFileReader::GetTableProperties() diff --git a/table/table_iterator.h b/table/table_iterator.h index 7d18924e2d8e..448d3270ede5 100644 --- a/table/table_iterator.h +++ b/table/table_iterator.h @@ -16,39 +16,83 @@ namespace ROCKSDB_NAMESPACE { // NOTE: Callers should ensure the wrapped `InternalIterator*` is a valid // pointer before constructing a `TableIterator` with it. class TableIterator : public Iterator { - void reset(InternalIterator* iter) noexcept { + void reset(InternalIterator* iter, const Slice* from_key, const Slice* to_key, + const InternalKeyComparator* internalKeyComparator) noexcept { if (iter_ != nullptr) { delete iter_; } iter_ = iter; + from_key_ = from_key; + to_key_ = to_key; + internal_key_comparator_ = internalKeyComparator; } public: - explicit TableIterator(InternalIterator* iter) : iter_(iter) {} + explicit TableIterator(InternalIterator* iter, const Slice* from_key, + const Slice* to_key, const InternalKeyComparator* internalKeyComparator) + : iter_(iter), + from_key_(from_key), + to_key_(to_key), + internal_key_comparator_(internalKeyComparator) { + + } TableIterator(const TableIterator&) = delete; TableIterator& operator=(const TableIterator&) = delete; TableIterator(TableIterator&& o) noexcept { iter_ = o.iter_; + from_key_ = o.from_key_; + to_key_ = o.to_key_; + internal_key_comparator_ = o.internal_key_comparator_; o.iter_ = nullptr; + o.to_key_ = nullptr; + o.from_key_ = nullptr; + o.internal_key_comparator_ = nullptr; } TableIterator& operator=(TableIterator&& o) noexcept { - reset(o.iter_); + reset(o.iter_, o.from_key_, o.to_key_, o.internal_key_comparator_); o.iter_ = nullptr; + o.from_key_ = nullptr; + o.to_key_ = nullptr; + o.internal_key_comparator_ = nullptr; return *this; } InternalIterator* operator->() { return iter_; } InternalIterator* get() { return iter_; } - ~TableIterator() override { reset(nullptr); } + ~TableIterator() override { reset(nullptr, nullptr, nullptr, nullptr); } + + bool Valid() const override { + return iter_->Valid() && + (!has_from_key_() || internal_key_comparator_->Compare(*from_key_, key()) <= 0) && + (!has_to_key_() || internal_key_comparator_->Compare(*to_key_, key()) > 0); + } + + void SeekToFirst() override { + if (has_from_key_()) { + return Seek(*from_key_); + } else { + return iter_->SeekToFirst(); + } + } - bool Valid() const override { return iter_->Valid(); } - void SeekToFirst() override { return iter_->SeekToFirst(); } - void SeekToLast() override { return iter_->SeekToLast(); } - void Seek(const Slice& target) override { return iter_->Seek(target); } + void SeekToLast() override { + if (has_to_key_()) { + SeekForPrev(*to_key_); + if (iter_->Valid() && internal_key_comparator_->Compare(*to_key_, key()) == 0) { + Prev(); + } + } else { + return iter_->SeekToLast(); + } + } + + void Seek(const Slice& target) override { + return iter_->Seek(target); + } void SeekForPrev(const Slice& target) override { return iter_->SeekForPrev(target); } @@ -65,5 +109,16 @@ class TableIterator : public Iterator { private: InternalIterator* iter_; + const Slice* from_key_; + const Slice* to_key_; + const InternalKeyComparator* internal_key_comparator_; + + bool has_from_key_() const { + return from_key_ != nullptr; + } + + bool has_to_key_() const { + return to_key_ != nullptr; + } }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/types_util.cc b/utilities/types_util.cc index cbd427425ed7..8b61a9d68082 100644 --- a/utilities/types_util.cc +++ b/utilities/types_util.cc @@ -57,6 +57,11 @@ Status GetInternalKeyForSeekForPrev(const Slice& user_key, Status ParseEntry(const Slice& internal_key, const Comparator* comparator, ParsedEntryInfo* parsed_entry) { + return ParseEntry(internal_key, comparator, parsed_entry, false); +} + +Status ParseEntry(const Slice& internal_key, const Comparator* comparator, + ParsedEntryInfo* parsed_entry, bool copied_user_key) { if (internal_key.size() < kNumInternalBytes) { return Status::InvalidArgument("Internal key size invalid."); } @@ -74,6 +79,10 @@ Status ParseEntry(const Slice& internal_key, const Comparator* comparator, if (pikey.user_key.size() < ts_sz) { return Status::InvalidArgument("User key(with timestamp) size invalid."); } + if (parsed_entry->copied_user_key) { + delete parsed_entry->user_key.data(); + } + if (ts_sz == 0) { parsed_entry->user_key = pikey.user_key; } else { @@ -83,6 +92,7 @@ Status ParseEntry(const Slice& internal_key, const Comparator* comparator, } parsed_entry->sequence = pikey.sequence; parsed_entry->type = ROCKSDB_NAMESPACE::GetEntryType(pikey.type); + parsed_entry->copied_user_key = copied_user_key; return Status::OK(); } } // namespace ROCKSDB_NAMESPACE