diff --git a/CMakeLists.txt b/CMakeLists.txt index f9c8f3346..796656625 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -741,6 +741,7 @@ set(SOURCES utilities/debug.cc utilities/env_mirror.cc utilities/env_timed.cc + utilities/flink/flink_compaction_filter.cc utilities/leveldb_options/leveldb_options.cc utilities/memory/memory_util.cc utilities/merge_operators/bytesxor.cc @@ -1122,6 +1123,7 @@ if(WITH_TESTS) utilities/cassandra/cassandra_format_test.cc utilities/cassandra/cassandra_row_merge_test.cc utilities/cassandra/cassandra_serialize_test.cc + utilities/flink/flink_compaction_filter_test.cc utilities/checkpoint/checkpoint_test.cc utilities/memory/memory_test.cc utilities/merge_operators/string_append/stringappend_test.cc diff --git a/HISTORY.md b/HISTORY.md index b2bcbe6e0..03807e31e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,11 @@ +# FRocksdb Change Log +## 6.11.6-veverica-1.0 (10/22/2020) +### Features +* [Flink TTL] compaction filter for background cleanup of state with time-to-live +* [Flink ListState] Separator-free merge operator +### Bug FIxes +* Fix a bug which may cause MultiGet to be slow because it may read more data than requested, but this won't affect correctness. The bug was introduced in 6.10 release. + # Rocksdb Change Log ## 6.11.6 (10/12/2020) ### Bug Fixes diff --git a/Makefile b/Makefile index 332b7fa18..081bd96f6 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,8 @@ #----------------------------------------------- +FROCKSDB_VERSION ?= 1.0 + BASH_EXISTS := $(shell which bash) SHELL := $(shell which bash) # Default to python3. Some distros like CentOS 8 do not have `python`. @@ -585,6 +587,7 @@ TESTS = \ compaction_picker_test \ version_builder_test \ file_indexer_test \ + flink_compaction_filter_test \ write_batch_test \ write_batch_with_index_test \ write_controller_test\ @@ -1407,6 +1410,9 @@ cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOB hash_table_test: utilities/persistent_cache/hash_table_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +flink_compaction_filter_test: utilities/flink/flink_compaction_filter_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) @@ -2121,17 +2127,72 @@ rocksdbjavastatic: $(java_static_all_libobjects) strip $(STRIPFLAGS) $(ROCKSDBJNILIB); \ fi cd java;jar -cf target/$(ROCKSDB_JAR) HISTORY*.md + jar -uf java/target/$(ROCKSDB_JAR) HISTORY*.md cd java/target;jar -uf $(ROCKSDB_JAR) $(ROCKSDBJNILIB) cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class cd java/target/apidocs;jar -cf ../$(ROCKSDB_JAVADOCS_JAR) * cd java/src/main/java;jar -cf ../../../target/$(ROCKSDB_SOURCES_JAR) org + mkdir -p java/target/META-INF + cp LICENSE.Apache java/target/META-INF/LICENSE + cd java/target;jar -uf $(ROCKSDB_JAR) META-INF/LICENSE + rocksdbjavastaticrelease: rocksdbjavastatic cd java/crossbuild && (vagrant destroy -f || true) && vagrant up linux32 && vagrant halt linux32 && vagrant up linux64 && vagrant halt linux64 && vagrant up linux64-musl && vagrant halt linux64-musl cd java;jar -cf target/$(ROCKSDB_JAR_ALL) HISTORY*.md - cd java/target;jar -uf $(ROCKSDB_JAR_ALL) librocksdbjni-*.so librocksdbjni-*.jnilib + jar -uf java/target/$(ROCKSDB_JAR_ALL) HISTORY*.md + cd java/target;jar -uf $(ROCKSDB_JAR_ALL) librocksdbjni-*.so librocksdbjni-*.jnilib librocksdbjni-win64.dll cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/rocksdb/*.class org/rocksdb/util/*.class +frocksdbjavastaticrelease: rocksdbjavastaticrelease + # update license + mkdir -p java/target/META-INF + cp LICENSE.Apache java/target/META-INF/LICENSE + cd java/target;jar -uf $(ROCKSDB_JAR_ALL) META-INF/LICENSE + + # platform jars + $(eval JAR_PREF=rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)) + $(eval JAR_DOCS=$(JAR_PREF)-javadoc.jar) + $(eval JAR_SOURCES=$(JAR_PREF)-sources.jar) + $(eval OSX_JAR=$(JAR_PREF)-osx.jar) + $(eval WIN_JAR=$(JAR_PREF)-win64.jar) + $(eval LINUX32_JAR=$(JAR_PREF)-linux32.jar) + $(eval LINUX64_JAR=$(JAR_PREF)-linux64.jar) + + # update windows jar + cd java/target;cp rocksdbjni_classes.jar $(WIN_JAR) + cd java;jar -uf target/$(WIN_JAR) HISTORY*.md + jar -uf java/target/$(WIN_JAR) HISTORY*.md + cd java/target;jar -uf $(WIN_JAR) librocksdbjni-win64.dll + cd java/target;jar -uf $(WIN_JAR) META-INF/LICENSE + + # update linux 64 jar with ppc64 lib + cd java/target;jar -uf $(LINUX64_JAR) librocksdbjni-linux-ppc64le.so + + cd java/target;jar -uf $(JAR_DOCS) META-INF/LICENSE + cd java/target;jar -uf $(JAR_SOURCES) META-INF/LICENSE + + # prepare frocksdb release + cd java/target;mkdir -p frocksdb-release + + $(eval FJAR_PREF=frocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-artisans-$(FROCKSDB_VERSION)) + $(eval FJAR=$(FJAR_PREF).jar) + $(eval FJAR_DOCS=$(FJAR_PREF)-javadoc.jar) + $(eval FJAR_SOURCES=$(FJAR_PREF)-sources.jar) + $(eval OSX_FJAR=$(FJAR_PREF)-osx.jar) + $(eval WIN_FJAR=$(FJAR_PREF)-win64.jar) + $(eval LINUX32_FJAR=$(FJAR_PREF)-linux32.jar) + $(eval LINUX64_FJAR=$(FJAR_PREF)-linux64.jar) + + cd java/target;cp $(ROCKSDB_JAR_ALL) frocksdb-release/$(FJAR) + cd java/target;cp $(JAR_DOCS) frocksdb-release/$(FJAR_DOCS) + cd java/target;cp $(JAR_SOURCES) frocksdb-release/$(FJAR_SOURCES) + cd java/target;cp $(OSX_JAR) frocksdb-release/$(OSX_FJAR) + cd java/target;cp $(WIN_JAR) frocksdb-release/$(WIN_FJAR) + cd java/target;cp $(LINUX32_JAR) frocksdb-release/$(LINUX32_FJAR) + cd java/target;cp $(LINUX64_JAR) frocksdb-release/$(LINUX64_FJAR) + cd java;cp rocksjni.pom target/frocksdb-release/$(FJAR_PREF).pom + rocksdbjavastaticreleasedocker: rocksdbjavastatic rocksdbjavastaticdockerx86 rocksdbjavastaticdockerx86_64 rocksdbjavastaticdockerx86musl rocksdbjavastaticdockerx86_64musl cd java;jar -cf target/$(ROCKSDB_JAR_ALL) HISTORY*.md cd java/target;jar -uf $(ROCKSDB_JAR_ALL) librocksdbjni-*.so librocksdbjni-*.jnilib diff --git a/RELEASE-FROCKSDB.md b/RELEASE-FROCKSDB.md new file mode 100644 index 000000000..0e77696a5 --- /dev/null +++ b/RELEASE-FROCKSDB.md @@ -0,0 +1,106 @@ + +@@ -0,0 +1,104 @@ +## Build in Windows + +Use Windows 64 bit machine (e.g. base AWS Windows instance: 4 cores, 16GB RAM, 40GB storage for build). + +Install: + * git + * java 8 + * maven + * Visual Studio Community 15 (2017) + +With [chocolatey](https://chocolatey.org/install): + + choco install git.install jdk8 maven visualstudio2017community + +Optionally: + + choco install intellijidea-community vscode + +Open developer command prompt for vs 2017 and run commands: + + git clone git@github.com:dataArtisans/frocksdb.git + cd frocksdb + git checkout FRocksDB-6.11.6 # release branch + java\crossbuild\build-win.bat + +The result native library is `build\java\Release\rocksdbjni-shared.dll`. +The result windows jar is `build\java\rocksdbjni_classes.jar`. + +There is also a how-to in CMakeLists.txt. + +## Build in PPC64LE + +Use Ubuntu 16.04 (e.g. AWS instance 4 cores, 16GB RAM, 40GB storage for build). +Install git if not installed. If docker is installed, it might need to be removed. + +Setup ppc64le docker machine ([source](https://developer.ibm.com/linuxonpower/2017/06/08/build-test-ppc64le-docker-images-intel/)): + + wget http://ftp.unicamp.br/pub/ppc64el/boot2docker/install.sh && chmod +x ./install.sh && ./install.sh -s + docker-machine create -d qemu \ + --qemu-boot2docker-url=/home/ubuntu/.docker/machine/boot2docker.iso \ + --qemu-memory 8192 \ + --qemu-cache-mode none \ + --qemu-arch ppc64le \ + vm-ppc64le + +Regenerate certs as suggested if it did not work at once. + +Prepare docker machine to run rocksdbjni docker image for ppc64le build: + + eval $(docker-machine env vm-ppc64le) + git clone git@github.com:dataArtisans/frocksdb.git + cd frocksdb + git checkout FRocksDB-6.11.6 # release branch + docker-machine ssh vm-ppc64le mkdir -p `pwd` + docker-machine scp -r . vm-ppc64le:`pwd` + +Build frocksdb: + + make rocksdbjavastaticdockerppc64le + docker-machine scp vm-ppc64le:`pwd`/java/target/librocksdbjni-linux-ppc64le.so java/target/. + +The result native library is in `java/target/librocksdbjni-linux-ppc64le.so`. + +## Final crossbuild in Mac OSX + +Read how to Build cross jar for Mac OSX and linux as described in java/RELEASE.md but do not run it yet. + +Run commands: + + make jclean clean + mkdir -p java/target + cp /rocksdbjni-shared.dll java/target/librocksdbjni-win64.dll + cp /rocksdbjni_classes.jar java/target/rocksdbjni_classes.jar + cp /librocksdbjni-linux-ppc64le.so java/target/librocksdbjni-linux-ppc64le.so + FROCKSDB_VERSION=1.0 make frocksdbjavastaticrelease + +## Push to maven central + +Edit the `frocksdbjni-.pom` file and replace +`-` +with +`` +where the `` is e.g. `6.11.6-veverica-1.0`. + +Run: +```bash +VERSION= \ +USER= \ +PASSWORD= \ +KEYNAME= \ +PASSPHRASE= \ +java/publish-frocksdbjni.sh +``` + +Go to the staging repositories on Sonatype: + +https://oss.sonatype.org/#stagingRepositories + +Select the open staging repository and click on "Close". + +Test the files in the staging repository +which will look something like this `https://oss.sonatype.org/content/repositories/comdata-artisans-1020`. + +Press the "Release" button (WARNING: this can not be undone). diff --git a/TARGETS b/TARGETS index 42785f38e..45fc890f8 100644 --- a/TARGETS +++ b/TARGETS @@ -1,4 +1,4 @@ -# This file @generated by `python3 buckifier/buckify_rocksdb.py` +TARGETS# This file @generated by `python3 buckifier/buckify_rocksdb.py` # --> DO NOT EDIT MANUALLY <-- # This file is a Facebook-specific integration for buck builds, so can # only be validated by Facebook employees. @@ -330,6 +330,7 @@ cpp_library( "utilities/debug.cc", "utilities/env_mirror.cc", "utilities/env_timed.cc", + "utilities/flink/flink_compaction_filter.cc", "utilities/leveldb_options/leveldb_options.cc", "utilities/memory/memory_util.cc", "utilities/merge_operators/bytesxor.cc", @@ -390,6 +391,7 @@ cpp_library( "tools/block_cache_analyzer/block_cache_trace_analyzer.cc", "tools/trace_analyzer_tool.cc", "utilities/cassandra/test_utils.cc", + "utilities/flink/flink_compaction_filter.cc", ], auto_headers = AutoHeaders.RECURSIVE_GLOB, arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS, @@ -1106,6 +1108,11 @@ ROCKS_TESTS = [ [], [], ], + [ + "flink_compaction_filter_test", + "utilities/flink/flink_compaction_filter_test.cc", + "serial", + ], [ "flush_job_test", "db/flush_job_test.cc", diff --git a/Vagrantfile b/Vagrantfile index 07f2e99fd..3dcedaf76 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -2,7 +2,7 @@ Vagrant.configure("2") do |config| config.vm.provider "virtualbox" do |v| - v.memory = 4096 + v.memory = 6096 v.cpus = 2 end diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 0be3e0546..5e56ea4ce 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -194,7 +194,7 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { size_t src_offset = static_cast(src.offset); size_t dest_end = End(*dest); size_t src_end = End(src); - if (std::max(dest_offset, dest_offset) > std::min(dest_end, src_end)) { + if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) { return false; } dest->offset = static_cast(std::min(dest_offset, src_offset)); diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 828803bfb..774706316 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -29,6 +29,7 @@ set(JNI_NATIVE_SOURCES rocksjni/config_options.cc rocksjni/env.cc rocksjni/env_options.cc + rocksjni/flink_compactionfilterjni.cc rocksjni/filter.cc rocksjni/ingest_external_file_options.cc rocksjni/iterator.cc @@ -140,6 +141,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/EnvOptions.java src/main/java/org/rocksdb/Experimental.java src/main/java/org/rocksdb/Filter.java + src/main/java/org/rocksdb/FlinkCompactionFilter.java src/main/java/org/rocksdb/FlushOptions.java src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java src/main/java/org/rocksdb/HashSkipListMemTableConfig.java @@ -211,6 +213,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/StatsLevel.java src/main/java/org/rocksdb/Status.java src/main/java/org/rocksdb/StringAppendOperator.java + src/main/java/org/rocksdb/StringAppendOperatorWithVariableDelimitor.java src/main/java/org/rocksdb/TableFilter.java src/main/java/org/rocksdb/TableProperties.java src/main/java/org/rocksdb/TableFormatConfig.java @@ -415,6 +418,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7" org.rocksdb.Env org.rocksdb.EnvOptions org.rocksdb.Filter + org.rocksdb.FlinkCompactionFilter org.rocksdb.FlushOptions org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig @@ -453,6 +457,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7" org.rocksdb.SstFileReaderIterator org.rocksdb.Statistics org.rocksdb.StringAppendOperator + org.rocksdb.StringAppendOperatorWithVariableDelimitor org.rocksdb.TableFormatConfig org.rocksdb.ThreadStatus org.rocksdb.TimedEnv diff --git a/java/Makefile b/java/Makefile index 12eb95f03..9869165a2 100644 --- a/java/Makefile +++ b/java/Makefile @@ -30,6 +30,7 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.DirectSlice\ org.rocksdb.Env\ org.rocksdb.EnvOptions\ + org.rocksdb.FlinkCompactionFilter\ org.rocksdb.FlushOptions\ org.rocksdb.Filter\ org.rocksdb.IngestExternalFileOptions\ @@ -75,6 +76,7 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.VectorMemTableConfig\ org.rocksdb.Snapshot\ org.rocksdb.StringAppendOperator\ + org.rocksdb.StringAppendOperatorWithVariableDelimitor\ org.rocksdb.UInt64AddOperator\ org.rocksdb.WriteBatch\ org.rocksdb.WriteBatch.Handler\ diff --git a/java/crossbuild/Vagrantfile b/java/crossbuild/Vagrantfile index 0ee50de2c..a3035e683 100644 --- a/java/crossbuild/Vagrantfile +++ b/java/crossbuild/Vagrantfile @@ -33,7 +33,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| end config.vm.provider "virtualbox" do |v| - v.memory = 2048 + v.memory = 6048 v.cpus = 4 v.customize ["modifyvm", :id, "--nictype1", "virtio" ] end diff --git a/java/crossbuild/build-win.bat b/java/crossbuild/build-win.bat new file mode 100644 index 000000000..963f050b8 --- /dev/null +++ b/java/crossbuild/build-win.bat @@ -0,0 +1,16 @@ +:: install git, java 8, maven, visual studio community 15 (2017) + +set MSBUILD=C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\MSBuild\15.0\Bin\MSBuild.exe + +if exist build rd /s /q build +if exist librocksdbjni-win64.dll del librocksdbjni-win64.dll +mkdir build && cd build + +cmake -G "Visual Studio 15 Win64" -DWITH_JNI=1 .. + +"%MSBUILD%" rocksdb.sln /p:Configuration=Release /m + +cd .. + +copy build\java\Release\rocksdbjni-shared.dll librocksdbjni-win64.dll +echo Result is in librocksdbjni-win64.dll diff --git a/java/deploysettings.xml b/java/deploysettings.xml new file mode 100644 index 000000000..02528b646 --- /dev/null +++ b/java/deploysettings.xml @@ -0,0 +1,12 @@ + + + + sonatype-nexus-staging + ${sonatype_user} + ${sonatype_pw} + + + diff --git a/java/publish-frocksdbjni.sh b/java/publish-frocksdbjni.sh new file mode 100644 index 000000000..ddd14a98f --- /dev/null +++ b/java/publish-frocksdbjni.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# fail on errors +set -e + +PREFIX=java/target/frocksdb-release/frocksdbjni-${VERSION} + +function deploy() { + FILE=$1 + CLASSIFIER=$2 + echo "Deploying file=${FILE} with classifier=${CLASSIFIER} to sonatype with prefix=${PREFIX}" + sonatype_user=${USER} sonatype_pw=${PASSWORD} mvn gpg:sign-and-deploy-file \ + --settings java/deploysettings.xml \ + -Durl=https://oss.sonatype.org/service/local/staging/deploy/maven2/ \ + -DrepositoryId=sonatype-nexus-staging \ + -DpomFile=${PREFIX}.pom \ + -Dfile=$FILE \ + -Dclassifier=$CLASSIFIER \ + -Dgpg.keyname=${KEYNAME} \ + -Dgpg.passphrase=${PASSPHRASE} +} + +deploy ${PREFIX}-sources.jar sources +deploy ${PREFIX}-javadoc.jar javadoc +deploy ${PREFIX}.jar diff --git a/java/rocksjni.pom b/java/rocksjni.pom index 5defdca7d..122ffda7b 100644 --- a/java/rocksjni.pom +++ b/java/rocksjni.pom @@ -4,14 +4,14 @@ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 4.0.0 - RocksDB JNI - http://rocksdb.org/ - org.rocksdb - rocksdbjni + FRocksDB JNI + https://github.com/dataArtisans/frocksdb + com.data-artisans + frocksdbjni - - RocksDB fat jar that contains .so files for linux32 and linux64 (glibc and musl-libc), jnilib files - for Mac OSX, and a .dll for Windows x64. + RocksDB fat jar with modifications specific for Apache Flink + that contains .so files for linux32 and linux64, jnilib files for Mac OSX, and a .dll for Windows x64. @@ -19,11 +19,6 @@ http://www.apache.org/licenses/LICENSE-2.0.html repo - - GNU General Public License, version 2 - http://www.gnu.org/licenses/gpl-2.0.html - repo - scm:git:git://github.com/dropwizard/metrics.git diff --git a/java/rocksjni/flink_compactionfilterjni.cc b/java/rocksjni/flink_compactionfilterjni.cc new file mode 100644 index 000000000..9d8e36807 --- /dev/null +++ b/java/rocksjni/flink_compactionfilterjni.cc @@ -0,0 +1,193 @@ +#include // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include + +#include "include/org_rocksdb_FlinkCompactionFilter.h" +#include +#include "utilities/flink/flink_compaction_filter.h" +#include "rocksjni/jnicallback.h" +#include "loggerjnicallback.h" +#include "portal.h" + +using namespace ROCKSDB_NAMESPACE::flink; + +class JniCallbackBase : public ROCKSDB_NAMESPACE::JniCallback { +public: + JniCallbackBase(JNIEnv *env, jobject jcallback_obj) : JniCallback(env, jcallback_obj) {} +protected: + inline void CheckAndRethrowException(JNIEnv* env) const { + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->Throw(env->ExceptionOccurred()); + } + } +}; + +// This list element filter operates on list state for which byte length of elements is unknown (variable), +// the list element serializer has to be used in this case to compute the offset of the next element. +// The filter wraps java object implenented in Flink. The java object holds element serializer and performs filtering. +class JavaListElementFilter : public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilter, JniCallbackBase { +public: + JavaListElementFilter(JNIEnv* env, jobject jlist_filter) : JniCallbackBase(env, jlist_filter) { + jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(env, "org/rocksdb/FlinkCompactionFilter$ListElementFilter"); + if(jclazz == nullptr) { + // exception occurred accessing class + return; + } + m_jnext_unexpired_offset_methodid = env->GetMethodID(jclazz, "nextUnexpiredOffset", "([BJJ)I"); + assert(m_jnext_unexpired_offset_methodid != nullptr); + + } + + std::size_t NextUnexpiredOffset(const ROCKSDB_NAMESPACE::Slice& list, int64_t ttl, int64_t current_timestamp) const override { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + jbyteArray jlist = ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, list); + CheckAndRethrowException(env); + if (jlist == nullptr) { + return static_cast(-1); + } + auto jl_ttl = static_cast(ttl); + auto jl_current_timestamp = static_cast(current_timestamp); + jint next_offset = env->CallIntMethod(m_jcallback_obj, m_jnext_unexpired_offset_methodid, jlist, jl_ttl, jl_current_timestamp); + CheckAndRethrowException(env); + env->DeleteLocalRef(jlist); + releaseJniEnv(attached_thread); + return static_cast(next_offset); + }; + +private: + jmethodID m_jnext_unexpired_offset_methodid; +}; + +class JavaListElemenFilterFactory : public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilterFactory, JniCallbackBase { +public: + JavaListElemenFilterFactory(JNIEnv* env, jobject jlist_filter_factory) : JniCallbackBase(env, jlist_filter_factory) { + jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(env, "org/rocksdb/FlinkCompactionFilter$ListElementFilterFactory"); + if(jclazz == nullptr) { + // exception occurred accessing class + return; + } + m_jcreate_filter_methodid = env->GetMethodID(jclazz, + "createListElementFilter", "()Lorg/rocksdb/FlinkCompactionFilter$ListElementFilter;"); + assert(m_jcreate_filter_methodid != nullptr); + } + + FlinkCompactionFilter::ListElementFilter* CreateListElementFilter(std::shared_ptr /*logger*/) const override { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + auto jlist_filter = env->CallObjectMethod(m_jcallback_obj, m_jcreate_filter_methodid); + auto list_filter = new JavaListElementFilter(env, jlist_filter); + CheckAndRethrowException(env); + releaseJniEnv(attached_thread); + return list_filter; + }; + +private: + jmethodID m_jcreate_filter_methodid; +}; + +class JavaTimeProvider : public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::TimeProvider, JniCallbackBase { +public: + JavaTimeProvider(JNIEnv* env, jobject jtime_provider) : JniCallbackBase(env, jtime_provider) { + jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(env, "org/rocksdb/FlinkCompactionFilter$TimeProvider"); + if(jclazz == nullptr) { + // exception occurred accessing class + return; + } + m_jcurrent_timestamp_methodid = env->GetMethodID(jclazz, "currentTimestamp", "()J"); + assert(m_jcurrent_timestamp_methodid != nullptr); + } + + int64_t CurrentTimestamp() const override { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + auto jtimestamp = env->CallLongMethod(m_jcallback_obj, m_jcurrent_timestamp_methodid); + CheckAndRethrowException(env); + releaseJniEnv(attached_thread); + return static_cast(jtimestamp); + }; + +private: + jmethodID m_jcurrent_timestamp_methodid; +}; + +static FlinkCompactionFilter::ListElementFilterFactory* createListElementFilterFactory( + JNIEnv *env, jint ji_list_elem_len, jobject jlist_filter_factory) { + FlinkCompactionFilter::ListElementFilterFactory* list_filter_factory = nullptr; + if (ji_list_elem_len > 0) { + auto fixed_size = static_cast(ji_list_elem_len); + list_filter_factory = new FlinkCompactionFilter::FixedListElementFilterFactory(fixed_size, static_cast(0)); + } else if (jlist_filter_factory != nullptr) { + list_filter_factory = new JavaListElemenFilterFactory(env, jlist_filter_factory); + } + return list_filter_factory; +} + +/*x + * Class: org_rocksdb_FlinkCompactionFilter + * Method: createNewFlinkCompactionFilterConfigHolder + * Signature: ()J + */ +jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilterConfigHolder( + JNIEnv* /* env */, jclass /* jcls */) { + using namespace ROCKSDB_NAMESPACE::flink; + return reinterpret_cast( + new std::shared_ptr(new FlinkCompactionFilter::ConfigHolder())); +} + +/* + * Class: org_rocksdb_FlinkCompactionFilter + * Method: disposeFlinkCompactionFilterConfigHolder + * Signature: (J)V + */ +void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHolder( + JNIEnv* /* env */, jclass /* jcls */, jlong handle) { + using namespace ROCKSDB_NAMESPACE::flink; + auto* config_holder = reinterpret_cast*>(handle); + delete config_holder; +} + +/* + * Class: org_rocksdb_FlinkCompactionFilter + * Method: createNewFlinkCompactionFilter0 + * Signature: (JJJ)J + */ +jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilter0( + JNIEnv* env, jclass /* jcls */, jlong config_holder_handle, jobject jtime_provider, jlong logger_handle) { + using namespace ROCKSDB_NAMESPACE::flink; + auto config_holder = *(reinterpret_cast*>(config_holder_handle)); + auto time_provider = new JavaTimeProvider(env, jtime_provider); + auto logger = logger_handle == 0 ? nullptr : + *(reinterpret_cast*>(logger_handle)); + return reinterpret_cast(new FlinkCompactionFilter( + config_holder, std::unique_ptr(time_provider), logger)); +} + +/* + * Class: org_rocksdb_FlinkCompactionFilter + * Method: configureFlinkCompactionFilter + * Signature: (JIIJJILorg/rocksdb/FlinkCompactionFilter$ListElementFilter;)Z + */ +jboolean Java_org_rocksdb_FlinkCompactionFilter_configureFlinkCompactionFilter( + JNIEnv* env, jclass /* jcls */, + jlong handle, + jint ji_state_type, + jint ji_timestamp_offset, + jlong jl_ttl_milli, + jlong jquery_time_after_num_entries, + jint ji_list_elem_len, + jobject jlist_filter_factory) { + auto state_type = static_cast(ji_state_type); + auto timestamp_offset = static_cast(ji_timestamp_offset); + auto ttl = static_cast(jl_ttl_milli); + auto query_time_after_num_entries = static_cast(jquery_time_after_num_entries); + auto config_holder = *(reinterpret_cast*>(handle)); + auto list_filter_factory = createListElementFilterFactory(env, ji_list_elem_len, jlist_filter_factory); + auto config = new FlinkCompactionFilter::Config{state_type, timestamp_offset, ttl, query_time_after_num_entries, + std::unique_ptr(list_filter_factory)}; + return static_cast(config_holder->Configure(config)); +} \ No newline at end of file diff --git a/java/rocksjni/merge_operator.cc b/java/rocksjni/merge_operator.cc index edc3e7231..bee41bb7d 100644 --- a/java/rocksjni/merge_operator.cc +++ b/java/rocksjni/merge_operator.cc @@ -14,6 +14,7 @@ #include #include "include/org_rocksdb_StringAppendOperator.h" +#include "include/org_rocksdb_StringAppendOperatorWithVariableDelimitor.h" #include "include/org_rocksdb_UInt64AddOperator.h" #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" @@ -79,3 +80,38 @@ void Java_org_rocksdb_UInt64AddOperator_disposeInternal(JNIEnv* /*env*/, jhandle); delete sptr_uint64_add_op; // delete std::shared_ptr } + +/* + * Class: org_rocksdb_StringAppendOperatorWithVariableDelimitor + * Method: newSharedStringAppendTESTOperator + * Signature: ([B)J + */ +jlong Java_org_rocksdb_StringAppendOperatorWithVariableDelimitor_newSharedStringAppendTESTOperator( + JNIEnv* env, jclass /*jclazz*/, jbyteArray jdelim) { + jboolean has_exception = JNI_FALSE; + std::string delim = ROCKSDB_NAMESPACE::JniUtil::byteString( + env, jdelim, + [](const char* str, const size_t len) { return std::string(str, len); }, + &has_exception); + if (has_exception == JNI_TRUE) { + // exception occurred + return 0; + } + + auto* sptr_string_append_test_op = new std::shared_ptr( + ROCKSDB_NAMESPACE::MergeOperators::CreateStringAppendTESTOperator(delim)); + return reinterpret_cast(sptr_string_append_test_op); +} + +/* + * Class: org_rocksdb_StringAppendOperatorWithVariableDelimitor + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_StringAppendOperatorWithVariableDelimitor_disposeInternal(JNIEnv* /*env*/, + jobject /*jobj*/, + jlong jhandle) { + auto* sptr_string_append_test_op = + reinterpret_cast*>(jhandle); + delete sptr_string_append_test_op; // delete std::shared_ptr +} diff --git a/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java new file mode 100644 index 000000000..966449adf --- /dev/null +++ b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java @@ -0,0 +1,167 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * Just a Java wrapper around FlinkCompactionFilter implemented in C++. + * + * Note: this compaction filter is a special implementation, designed for usage only in Apache Flink project. + */ +public class FlinkCompactionFilter + extends AbstractCompactionFilter { + public enum StateType { + // WARNING!!! Do not change the order of enum entries as it is important for jni translation + Disabled, + Value, + List + } + + public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider) { + this(configHolder, timeProvider,null); + } + + public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) { + super(createNewFlinkCompactionFilter0(configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_)); + } + + private native static long createNewFlinkCompactionFilter0(long configHolderHandle, TimeProvider timeProvider, long loggerHandle); + private native static long createNewFlinkCompactionFilterConfigHolder(); + private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle); + private native static boolean configureFlinkCompactionFilter( + long configHolderHandle, int stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries, + int fixedElementLength, ListElementFilterFactory listElementFilterFactory); + + public interface ListElementFilter { + /** + * Gets offset of the first unexpired element in the list. + * + *

Native code wraps this java object and calls it for list state + * for which element byte length is unknown and Flink custom type serializer has to be used + * to compute offset of the next element in serialized form. + * + * @param list serialised list of elements with timestamp + * @param ttl time-to-live of the list elements + * @param currentTimestamp current timestamp to check expiration against + * @return offset of the first unexpired element in the list + */ + @SuppressWarnings("unused") + int nextUnexpiredOffset(byte[] list, long ttl, long currentTimestamp); + } + + public interface ListElementFilterFactory { + @SuppressWarnings("unused") + ListElementFilter createListElementFilter(); + } + + public static class Config { + final StateType stateType; + final int timestampOffset; + final long ttl; + /** Number of state entries to process by compaction filter before updating current timestamp. */ + final long queryTimeAfterNumEntries; + final int fixedElementLength; + final ListElementFilterFactory listElementFilterFactory; + + private Config( + StateType stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries, + int fixedElementLength, ListElementFilterFactory listElementFilterFactory) { + this.stateType = stateType; + this.timestampOffset = timestampOffset; + this.ttl = ttl; + this.queryTimeAfterNumEntries = queryTimeAfterNumEntries; + this.fixedElementLength = fixedElementLength; + this.listElementFilterFactory = listElementFilterFactory; + } + + @SuppressWarnings("WeakerAccess") + public static Config createNotList(StateType stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries) { + return new Config(stateType, timestampOffset, ttl, queryTimeAfterNumEntries, -1, null); + } + + @SuppressWarnings("unused") + public static Config createForValue(long ttl, long queryTimeAfterNumEntries) { + return createNotList(StateType.Value, 0, ttl, queryTimeAfterNumEntries); + } + + @SuppressWarnings("unused") + public static Config createForMap(long ttl, long queryTimeAfterNumEntries) { + return createNotList(StateType.Value, 1, ttl, queryTimeAfterNumEntries); + } + + @SuppressWarnings("WeakerAccess") + public static Config createForFixedElementList(long ttl, long queryTimeAfterNumEntries, int fixedElementLength) { + return new Config(StateType.List, 0, ttl, queryTimeAfterNumEntries, fixedElementLength, null); + } + + @SuppressWarnings("WeakerAccess") + public static Config createForList(long ttl, long queryTimeAfterNumEntries, ListElementFilterFactory listElementFilterFactory) { + return new Config(StateType.List, 0, ttl, queryTimeAfterNumEntries, -1, listElementFilterFactory); + } + } + + private static class ConfigHolder extends RocksObject { + ConfigHolder() { + super(createNewFlinkCompactionFilterConfigHolder()); + } + + @Override + protected void disposeInternal(long handle) { + disposeFlinkCompactionFilterConfigHolder(handle); + } + } + + /** Provides current timestamp to check expiration, it must be thread safe. */ + public interface TimeProvider { + long currentTimestamp(); + } + + public static class FlinkCompactionFilterFactory extends AbstractCompactionFilterFactory { + private final ConfigHolder configHolder; + private final TimeProvider timeProvider; + private final Logger logger; + + @SuppressWarnings("unused") + public FlinkCompactionFilterFactory(TimeProvider timeProvider) { + this(timeProvider, null); + } + + @SuppressWarnings("WeakerAccess") + public FlinkCompactionFilterFactory(TimeProvider timeProvider, Logger logger) { + this.configHolder = new ConfigHolder(); + this.timeProvider = timeProvider; + this.logger = logger; + } + + @Override + public void close() { + super.close(); + configHolder.close(); + if (logger != null) { + logger.close(); + } + } + + @Override + public FlinkCompactionFilter createCompactionFilter(Context context) { + return new FlinkCompactionFilter(configHolder, timeProvider, logger); + } + + @Override + public String name() { + return "FlinkCompactionFilterFactory"; + } + + @SuppressWarnings("WeakerAccess") + public void configure(Config config) { + boolean already_configured = !configureFlinkCompactionFilter( + configHolder.nativeHandle_, config.stateType.ordinal(), config.timestampOffset, + config.ttl, config.queryTimeAfterNumEntries, config.fixedElementLength, config.listElementFilterFactory); + if (already_configured) { + throw new IllegalStateException("Compaction filter is already configured"); + } + } + } +} diff --git a/java/src/main/java/org/rocksdb/StringAppendOperatorWithVariableDelimitor.java b/java/src/main/java/org/rocksdb/StringAppendOperatorWithVariableDelimitor.java new file mode 100644 index 000000000..0c35e103a --- /dev/null +++ b/java/src/main/java/org/rocksdb/StringAppendOperatorWithVariableDelimitor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.rocksdb; + +import java.nio.charset.Charset; + +/** Merge operator that concatenates two strings with a delimiter of variable length, string or byte array. */ +public class StringAppendOperatorWithVariableDelimitor extends MergeOperator { + public StringAppendOperatorWithVariableDelimitor() { + this(','); + } + + public StringAppendOperatorWithVariableDelimitor(char delim) { + this(Character.toString(delim)); + } + + public StringAppendOperatorWithVariableDelimitor(byte[] delim) { + super(newSharedStringAppendTESTOperator(delim)); + } + + public StringAppendOperatorWithVariableDelimitor(String delim) { + this(delim.getBytes()); + } + + public StringAppendOperatorWithVariableDelimitor(String delim, Charset charset) { + this(delim.getBytes(charset)); + } + + private native static long newSharedStringAppendTESTOperator(final byte[] delim); + @Override protected final native void disposeInternal(final long handle); +} diff --git a/java/src/test/java/org/rocksdb/FilterTest.java b/java/src/test/java/org/rocksdb/FilterTest.java index dc5c19fbc..e308ffefb 100644 --- a/java/src/test/java/org/rocksdb/FilterTest.java +++ b/java/src/test/java/org/rocksdb/FilterTest.java @@ -16,7 +16,7 @@ public class FilterTest { @Test public void filter() { - // new Bloom filter + // new Bloom filterFactory final BlockBasedTableConfig blockConfig = new BlockBasedTableConfig(); try(final Options options = new Options()) { diff --git a/java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java b/java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java new file mode 100644 index 000000000..0804fc720 --- /dev/null +++ b/java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.rocksdb; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.FlinkCompactionFilter.StateType; +import org.rocksdb.FlinkCompactionFilter.TimeProvider; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +public class FlinkCompactionFilterTest { + private static final int LONG_LENGTH = 8; + private static final int INT_LENGTH = 4; + private static final String MERGE_OPERATOR_NAME = "stringappendtest"; + private static final byte DELIMITER = ','; + private static final long TTL = 100; + private static final long QUERY_TIME_AFTER_NUM_ENTRIES = 100; + private static final int TEST_TIMESTAMP_OFFSET = 2; + private static final Random rnd = new Random(); + + private TestTimeProvider timeProvider; + private List stateContexts; + private List cfDescs; + private List cfHandles; + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + @Before + public void init() { + timeProvider = new TestTimeProvider(); + timeProvider.time = rnd.nextLong(); + stateContexts = Arrays.asList( + new StateContext(StateType.Value, timeProvider, TEST_TIMESTAMP_OFFSET), + new FixedElementListStateContext(timeProvider), + new NonFixedElementListStateContext(timeProvider) + ); + cfDescs = new ArrayList<>(); + cfHandles = new ArrayList<>(); + cfDescs.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + for (StateContext stateContext : stateContexts) { + cfDescs.add(stateContext.getCfDesc()); + } + } + + @After + public void cleanup() { + for (StateContext stateContext : stateContexts) { + stateContext.cfDesc.getOptions().close(); + stateContext.filterFactory.close(); + } + } + + @Test + public void checkStateTypeEnumOrder() { + // if the order changes it also needs to be adjusted + // in utilities/flink/flink_compaction_filter.h + // and in utilities/flink/flink_compaction_filter_test.cc + assertThat(StateType.Disabled.ordinal()).isEqualTo(0); + assertThat(StateType.Value.ordinal()).isEqualTo(1); + assertThat(StateType.List.ordinal()).isEqualTo(2); + } + + @Test + public void testCompactionFilter() throws RocksDBException { + try(DBOptions options = createDbOptions(); + RocksDB rocksDb = setupDb(options)) { + try { + for (StateContext stateContext : stateContexts) { + stateContext.updateValueWithTimestamp(rocksDb); + stateContext.checkUnexpired(rocksDb); + rocksDb.compactRange(stateContext.columnFamilyHandle); + stateContext.checkUnexpired(rocksDb); + } + + timeProvider.time += TTL + TTL / 2; // expire state + + for (StateContext stateContext : stateContexts) { + stateContext.checkUnexpired(rocksDb); + rocksDb.compactRange(stateContext.columnFamilyHandle); + stateContext.checkExpired(rocksDb); + rocksDb.compactRange(stateContext.columnFamilyHandle); + } + } finally { + for (ColumnFamilyHandle cfHandle : cfHandles) { + cfHandle.close(); + } + } + } + } + + private static DBOptions createDbOptions() { + return new DBOptions() + .setCreateIfMissing(true) + .setCreateMissingColumnFamilies(true); + } + + private RocksDB setupDb(DBOptions options) throws RocksDBException { + RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath(), cfDescs, cfHandles); + for (int i = 0; i < stateContexts.size(); i++) { + stateContexts.get(i).columnFamilyHandle = cfHandles.get(i + 1); + } + return db; + } + + private static class StateContext { + private final String cf; + final String key; + final ColumnFamilyDescriptor cfDesc; + final String userValue; + final long currentTime; + final FlinkCompactionFilter.FlinkCompactionFilterFactory filterFactory; + + ColumnFamilyHandle columnFamilyHandle; + + private StateContext(StateType type, TimeProvider timeProvider, int timestampOffset) { + this.currentTime = timeProvider.currentTimestamp(); + userValue = type.name() + "StateValue"; + cf = getClass().getSimpleName() + "StateCf"; + key = type.name() + "StateKey"; + filterFactory = new FlinkCompactionFilter.FlinkCompactionFilterFactory(timeProvider, createLogger()); + filterFactory.configure(createConfig(type, timestampOffset)); + cfDesc = new ColumnFamilyDescriptor(getASCII(cf), getOptionsWithFilter(filterFactory)); + } + + private Logger createLogger() { + try (DBOptions opts = new DBOptions().setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) { + return new Logger(opts) { + @Override + protected void log(InfoLogLevel infoLogLevel, String logMsg) { + System.out.println(infoLogLevel + ": " + logMsg); + } + }; + } + } + + FlinkCompactionFilter.Config createConfig(StateType type, int timestampOffset) { + return FlinkCompactionFilter.Config.createNotList(type, timestampOffset, TTL, QUERY_TIME_AFTER_NUM_ENTRIES); + } + + private static ColumnFamilyOptions getOptionsWithFilter( + FlinkCompactionFilter.FlinkCompactionFilterFactory filterFactory) { + return new ColumnFamilyOptions() + .setCompactionFilterFactory(filterFactory) + .setMergeOperatorName(MERGE_OPERATOR_NAME); + } + + public String getKey() { + return key; + } + + ColumnFamilyDescriptor getCfDesc() { + return cfDesc; + } + + byte[] getValueWithTimestamp(RocksDB db) throws RocksDBException { + return db.get(columnFamilyHandle, getASCII(key)); + } + + void updateValueWithTimestamp(RocksDB db) throws RocksDBException { + db.put(columnFamilyHandle, getASCII(key), valueWithTimestamp()); + } + + byte[] valueWithTimestamp() { + return valueWithTimestamp(TEST_TIMESTAMP_OFFSET); + } + + byte[] valueWithTimestamp(@SuppressWarnings("SameParameterValue") int offset) { + return valueWithTimestamp(offset, currentTime); + } + + byte[] valueWithTimestamp(int offset, long timestamp) { + ByteBuffer buffer = getByteBuffer(offset); + buffer.put(new byte[offset]); + appendValueWithTimestamp(buffer, userValue, timestamp); + return buffer.array(); + } + + void appendValueWithTimestamp(ByteBuffer buffer, String value, long timestamp) { + buffer.putLong(timestamp); + buffer.putInt(value.length()); + buffer.put(getASCII(value)); + } + + ByteBuffer getByteBuffer(int offset) { + int length = offset + LONG_LENGTH + INT_LENGTH + userValue.length(); + return ByteBuffer.allocate(length); + } + + byte[] unexpiredValue() { + return valueWithTimestamp(); + } + + byte[] expiredValue() { + return null; + } + + void checkUnexpired(RocksDB db) throws RocksDBException { + assertThat(getValueWithTimestamp(db)).isEqualTo(unexpiredValue()); + } + + void checkExpired(RocksDB db) throws RocksDBException { + assertThat(getValueWithTimestamp(db)).isEqualTo(expiredValue()); + } + } + + private static class FixedElementListStateContext extends StateContext { + private FixedElementListStateContext(TimeProvider timeProvider) { + super(StateType.List, timeProvider,0); + } + + @Override + FlinkCompactionFilter.Config createConfig(StateType type, int timestampOffset) { + //return FlinkCompactionFilter.Config.createForList(TTL, QUERY_TIME_AFTER_NUM_ENTRIES, ELEM_FILTER_FACTORY); + return FlinkCompactionFilter.Config.createForFixedElementList(TTL, QUERY_TIME_AFTER_NUM_ENTRIES, 13 + userValue.getBytes().length); + } + + @Override + void updateValueWithTimestamp(RocksDB db) throws RocksDBException { + db.merge(columnFamilyHandle, getASCII(key), listExpired(3)); + db.merge(columnFamilyHandle, getASCII(key), mixedList(2, 3)); + db.merge(columnFamilyHandle, getASCII(key), listUnexpired(4)); + } + + @Override + byte[] unexpiredValue() { + return mixedList(5, 7); + } + + byte[] mergeBytes(byte[] ... bytes) { + int length = 0; + for (byte[] a : bytes) { + length += a.length; + } + ByteBuffer buffer = ByteBuffer.allocate(length); + for (byte[] a : bytes) { + buffer.put(a); + } + return buffer.array(); + } + + @Override + byte[] expiredValue() { + return listUnexpired(7); + } + + private byte[] mixedList(int numberOfExpiredElements, int numberOfUnexpiredElements) { + assert numberOfExpiredElements > 0; + assert numberOfUnexpiredElements > 0; + return mergeBytes( + listExpired(numberOfExpiredElements), + new byte[] {DELIMITER}, + listUnexpired(numberOfUnexpiredElements)); + } + + private byte[] listExpired(int numberOfElements) { + return list(numberOfElements, currentTime); + } + + private byte[] listUnexpired(int numberOfElements) { + return list(numberOfElements, currentTime + TTL); + } + + private byte[] list(int numberOfElements, long timestamp) { + ByteBuffer buffer = getByteBufferForList(numberOfElements); + for (int i = 0; i < numberOfElements; i++) { + appendValueWithTimestamp(buffer, userValue, timestamp); + if (i < numberOfElements - 1) { + buffer.put(DELIMITER); + } + } + return buffer.array(); + } + + private ByteBuffer getByteBufferForList(int numberOfElements) { + int length = ((LONG_LENGTH + INT_LENGTH + userValue.length() + 1) * numberOfElements) - 1; + return ByteBuffer.allocate(length); + } + } + + private static class NonFixedElementListStateContext extends FixedElementListStateContext { + private static FlinkCompactionFilter.ListElementFilterFactory ELEM_FILTER_FACTORY = new ListElementFilterFactory(); + + private NonFixedElementListStateContext(TimeProvider timeProvider) { + super(timeProvider); + } + + @Override + FlinkCompactionFilter.Config createConfig(StateType type, int timestampOffset) { + //return FlinkCompactionFilter.Config.createForList(TTL, QUERY_TIME_AFTER_NUM_ENTRIES, ELEM_FILTER_FACTORY); + return FlinkCompactionFilter.Config.createForList(TTL, QUERY_TIME_AFTER_NUM_ENTRIES, ELEM_FILTER_FACTORY); + } + + private static class ListElementFilterFactory implements FlinkCompactionFilter.ListElementFilterFactory { + @Override + public FlinkCompactionFilter.ListElementFilter createListElementFilter() { + return new FlinkCompactionFilter.ListElementFilter() { + @Override + public int nextUnexpiredOffset(byte[] list, long ttl, long currentTimestamp) { + int currentOffset = 0; + while (currentOffset < list.length) { + ByteBuffer bf = ByteBuffer + .wrap(list, currentOffset, list.length - currentOffset); + long timestamp = bf.getLong(); + if (timestamp + ttl > currentTimestamp) { + break; + } + int elemLen = bf.getInt(8); + currentOffset += 13 + elemLen; + } + return currentOffset; + } + }; + } + } + } + + private static byte[] getASCII(String str) { + return str.getBytes(StandardCharsets.US_ASCII); + } + + private static class TestTimeProvider implements TimeProvider { + private long time; + + @Override + public long currentTimestamp() { + return time; + } + } +} \ No newline at end of file diff --git a/java/src/test/java/org/rocksdb/MergeTest.java b/java/src/test/java/org/rocksdb/MergeTest.java index 128d694bf..661c8dfcc 100644 --- a/java/src/test/java/org/rocksdb/MergeTest.java +++ b/java/src/test/java/org/rocksdb/MergeTest.java @@ -205,6 +205,34 @@ public void uint64AddOperatorOption() } } + @Test + public void stringDelimiter() throws RocksDBException { + stringDelimiter("DELIM"); + stringDelimiter(""); + } + + private void stringDelimiter(String delim) throws RocksDBException { + try (final MergeOperator stringAppendOperator = new StringAppendOperatorWithVariableDelimitor(delim.getBytes()); + final Options opt = new Options() + .setCreateIfMissing(true) + .setMergeOperator(stringAppendOperator); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + // Writing aa under key + db.put("key".getBytes(), "aa".getBytes()); + + // Writing bb under key + db.merge("key".getBytes(), "bb".getBytes()); + + // Writing empty under key + db.merge("key".getBytes(), "".getBytes()); + + final byte[] value = db.get("key".getBytes()); + final String strValue = new String(value); + + assertThat(strValue).isEqualTo("aa" + delim + "bb" + delim); + } + } + @Test public void cFOperatorOption() throws InterruptedException, RocksDBException { diff --git a/src.mk b/src.mk index 81c1039e4..7e423f64f 100644 --- a/src.mk +++ b/src.mk @@ -210,6 +210,7 @@ LIB_SOURCES = \ utilities/debug.cc \ utilities/env_mirror.cc \ utilities/env_timed.cc \ + utilities/flink/flink_compaction_filter.cc \ utilities/leveldb_options/leveldb_options.cc \ utilities/memory/memory_util.cc \ utilities/merge_operators/max.cc \ @@ -464,6 +465,7 @@ MAIN_SOURCES = \ utilities/cassandra/cassandra_row_merge_test.cc \ utilities/cassandra/cassandra_serialize_test.cc \ utilities/checkpoint/checkpoint_test.cc \ + utilities/flink/flink_compaction_filter_test.cc \ utilities/memory/memory_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \ utilities/object_registry_test.cc \ @@ -501,6 +503,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/config_options.cc \ java/rocksjni/env.cc \ java/rocksjni/env_options.cc \ + java/rocksjni/flink_compactionfilterjni.cc \ java/rocksjni/ingest_external_file_options.cc \ java/rocksjni/filter.cc \ java/rocksjni/iterator.cc \ diff --git a/utilities/flink/flink_compaction_filter.cc b/utilities/flink/flink_compaction_filter.cc new file mode 100644 index 000000000..4510ea456 --- /dev/null +++ b/utilities/flink/flink_compaction_filter.cc @@ -0,0 +1,185 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include "utilities/flink/flink_compaction_filter.h" + +namespace ROCKSDB_NAMESPACE { +namespace flink { + +int64_t DeserializeTimestamp(const char *src, std::size_t offset) { + uint64_t result = 0; + for (unsigned long i = 0; i < sizeof(uint64_t); i++) { + result |= static_cast(static_cast(src[offset + i])) + << ((sizeof(int64_t) - 1 - i) * BITS_PER_BYTE); + } + return static_cast(result); +} + +CompactionFilter::Decision Decide( + const char* ts_bytes, + const int64_t ttl, + const std::size_t timestamp_offset, + const int64_t current_timestamp, + const std::shared_ptr &logger) { + int64_t timestamp = DeserializeTimestamp(ts_bytes, timestamp_offset); + const int64_t ttlWithoutOverflow = timestamp > 0 ? std::min(JAVA_MAX_LONG - timestamp, ttl) : ttl; + Debug(logger.get(), "Last access timestamp: %" PRId64 " ms, ttlWithoutOverflow: %" PRId64 " ms, Current timestamp: %" PRId64 " ms", + timestamp, ttlWithoutOverflow, current_timestamp); + return timestamp + ttlWithoutOverflow <= current_timestamp ? + CompactionFilter::Decision::kRemove : CompactionFilter::Decision::kKeep; +} + +FlinkCompactionFilter::ConfigHolder::ConfigHolder() : config_(const_cast(&DISABLED_CONFIG)) {}; + +FlinkCompactionFilter::ConfigHolder::~ConfigHolder() { + Config* config = config_.load(); + if (config != &DISABLED_CONFIG) { + delete config; + } +} + +// at the moment Flink configures filters (can be already created) only once when user creates state +// otherwise it can lead to ListElementFilter leak in Config +// or race between its delete in Configure() and usage in FilterV2() +// the method returns true if it was configured before +bool FlinkCompactionFilter::ConfigHolder::Configure(Config* config) { + bool not_configured = GetConfig() == &DISABLED_CONFIG; + if (not_configured) { + assert(config->query_time_after_num_entries_ >= 0); + config_ = config; + } + return not_configured; +} + +FlinkCompactionFilter::Config* FlinkCompactionFilter::ConfigHolder::GetConfig() { + return config_.load(); +} + +std::size_t FlinkCompactionFilter::FixedListElementFilter::NextUnexpiredOffset( + const Slice& list, int64_t ttl, int64_t current_timestamp) const { + std::size_t offset = 0; + while (offset < list.size()) { + Decision decision = Decide(list.data(), ttl, offset + timestamp_offset_, current_timestamp, logger_); + if (decision != Decision::kKeep) { + std::size_t new_offset = offset + fixed_size_; + if (new_offset >= JAVA_MAX_SIZE || new_offset < offset) { + return JAVA_MAX_SIZE; + } + offset = new_offset; + } else { + break; + } + } + return offset; +} + +const char* FlinkCompactionFilter::Name() const { + return "FlinkCompactionFilter"; +} + +FlinkCompactionFilter::FlinkCompactionFilter(std::shared_ptr config_holder, + std::unique_ptr time_provider) : + FlinkCompactionFilter(std::move(config_holder), std::move(time_provider), nullptr) {}; + +FlinkCompactionFilter::FlinkCompactionFilter(std::shared_ptr config_holder, + std::unique_ptr time_provider, + std::shared_ptr logger) : + config_holder_(std::move(config_holder)), + time_provider_(std::move(time_provider)), + logger_(std::move(logger)), + config_cached_(const_cast(&DISABLED_CONFIG)) {}; + +inline void FlinkCompactionFilter::InitConfigIfNotYet() const { + const_cast(this)->config_cached_ = + config_cached_ == &DISABLED_CONFIG ? config_holder_->GetConfig() : config_cached_; +} + +CompactionFilter::Decision FlinkCompactionFilter::FilterV2( + int /*level*/, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const { + InitConfigIfNotYet(); + CreateListElementFilterIfNull(); + UpdateCurrentTimestampIfStale(); + + const char* data = existing_value.data(); + + Debug(logger_.get(), + "Call FlinkCompactionFilter::FilterV2 - Key: %s, Data: %s, Value type: %d, " + "State type: %d, TTL: %" PRId64 " ms, timestamp_offset: %lu", + key.ToString().c_str(), existing_value.ToString(true).c_str(), value_type, + config_cached_->state_type_, config_cached_->ttl_, config_cached_->timestamp_offset_); + + // too short value to have timestamp at all + const bool tooShortValue = existing_value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE; + + const StateType state_type = config_cached_->state_type_; + const bool value_or_merge = value_type == ValueType::kValue || value_type == ValueType::kMergeOperand; + const bool value_state = state_type == StateType::Value && value_type == ValueType::kValue; + const bool list_entry = state_type == StateType::List && value_or_merge; + const bool toDecide = value_state || list_entry; + const bool list_filter = list_entry && list_element_filter_; + + Decision decision = Decision::kKeep; + if (!tooShortValue && toDecide) { + decision = list_filter ? + ListDecide(existing_value, new_value) : + Decide(data, config_cached_->ttl_, config_cached_->timestamp_offset_, current_timestamp_, logger_); + } + Debug(logger_.get(), "Decision: %d", static_cast(decision)); + return decision; +} + +CompactionFilter::Decision FlinkCompactionFilter::ListDecide( + const Slice& existing_value, std::string* new_value) const { + std::size_t offset = 0; + if (offset < existing_value.size()) { + Decision decision = Decide(existing_value.data(), config_cached_->ttl_, offset + config_cached_->timestamp_offset_, current_timestamp_, logger_); + if (decision != Decision::kKeep) { + offset = ListNextUnexpiredOffset(existing_value, offset, config_cached_->ttl_); + if (offset >= JAVA_MAX_SIZE) { + return Decision::kKeep; + } + } + } + if (offset >= existing_value.size()) { + return Decision::kRemove; + } else if (offset > 0) { + SetUnexpiredListValue(existing_value, offset, new_value); + return Decision::kChangeValue; + } + return Decision::kKeep; +} + +std::size_t FlinkCompactionFilter::ListNextUnexpiredOffset( + const Slice &existing_value, size_t offset, int64_t ttl) const { + std::size_t new_offset = list_element_filter_->NextUnexpiredOffset(existing_value, ttl, current_timestamp_); + if (new_offset >= JAVA_MAX_SIZE || new_offset < offset) { + Error(logger_.get(), "Wrong next offset in list filter: %zu -> %lu", + offset, new_offset); + new_offset = JAVA_MAX_SIZE; + } else { + Debug(logger_.get(), "Next unexpired offset: %zu -> %lu", + offset, new_offset); + } + return new_offset; +} + +void FlinkCompactionFilter::SetUnexpiredListValue( + const Slice& existing_value, std::size_t offset, std::string* new_value) const { + new_value->clear(); + auto new_value_char = existing_value.data() + offset; + auto new_value_size = existing_value.size() - offset; + new_value->assign(new_value_char, new_value_size); + Logger* logger = logger_.get(); + if (logger && logger->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) { + Slice new_value_slice = Slice(new_value_char, new_value_size); + Debug(logger, "New list value: %s", new_value_slice.ToString(true).c_str()); + } +} +} // namespace flink +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/flink/flink_compaction_filter.h b/utilities/flink/flink_compaction_filter.h new file mode 100644 index 000000000..1d94b4301 --- /dev/null +++ b/utilities/flink/flink_compaction_filter.h @@ -0,0 +1,162 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include +#include "rocksdb/compaction_filter.h" +#include "rocksdb/slice.h" +#include +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { +namespace flink { + +static const std::size_t BITS_PER_BYTE = static_cast(8); +static const std::size_t TIMESTAMP_BYTE_SIZE = static_cast(8); +static const int64_t JAVA_MIN_LONG = static_cast(0x8000000000000000); +static const int64_t JAVA_MAX_LONG = static_cast(0x7fffffffffffffff); +static const std::size_t JAVA_MAX_SIZE = static_cast(0x7fffffff); + +/** + * Compaction filter for removing expired Flink state entries with ttl. + * + * Note: this compaction filter is a special implementation, designed for usage only in Apache Flink project. + */ +class FlinkCompactionFilter : public CompactionFilter { +public: + enum StateType { + // WARNING!!! Do not change the order of enum entries as it is important for jni translation + Disabled, + Value, + List + }; + + // Provides current timestamp to check expiration, it must thread safe. + class TimeProvider { + public: + virtual ~TimeProvider() = default; + virtual int64_t CurrentTimestamp() const = 0; + }; + + // accepts serialized list state and checks elements for expiration starting from the head + // stops upon discovery of unexpired element and returns its offset + // or returns offset greater or equal to list byte length. + class ListElementFilter { + public: + virtual ~ListElementFilter() = default; + virtual std::size_t NextUnexpiredOffset(const Slice& list, int64_t ttl, int64_t current_timestamp) const = 0; + }; + + // this filter can operate directly on list state bytes + // because the byte length of list element and last acess timestamp position are known. + class FixedListElementFilter : public ListElementFilter { + public: + explicit FixedListElementFilter(std::size_t fixed_size, std::size_t timestamp_offset, std::shared_ptr logger) : + fixed_size_(fixed_size), timestamp_offset_(timestamp_offset), logger_(std::move(logger)) {} + std::size_t NextUnexpiredOffset(const Slice& list, int64_t ttl, int64_t current_timestamp) const override; + private: + std::size_t fixed_size_; + std::size_t timestamp_offset_; + std::shared_ptr logger_; + }; + + // Factory is needed to create one filter per filter/thread + // and avoid concurrent access to the filter state + class ListElementFilterFactory { + public: + virtual ~ListElementFilterFactory() = default; + virtual ListElementFilter* CreateListElementFilter(std::shared_ptr logger) const = 0; + }; + + class FixedListElementFilterFactory : public ListElementFilterFactory { + public: + explicit FixedListElementFilterFactory(std::size_t fixed_size, std::size_t timestamp_offset) : + fixed_size_(fixed_size), timestamp_offset_(timestamp_offset) {} + FixedListElementFilter* CreateListElementFilter(std::shared_ptr logger) const override { + return new FixedListElementFilter(fixed_size_, timestamp_offset_, logger); + }; + private: + std::size_t fixed_size_; + std::size_t timestamp_offset_; + }; + + struct Config { + StateType state_type_; + std::size_t timestamp_offset_; + int64_t ttl_; + // Number of state entries to process by compaction filter before updating current timestamp. + int64_t query_time_after_num_entries_; + std::unique_ptr list_element_filter_factory_; + }; + + // Allows to configure at once all FlinkCompactionFilters created by the factory. + // The ConfigHolder holds the shared Config. + class ConfigHolder { + public: + explicit ConfigHolder(); + ~ConfigHolder(); + bool Configure(Config* config); + Config* GetConfig(); + private: + std::atomic config_; + }; + + explicit FlinkCompactionFilter(std::shared_ptr config_holder, + std::unique_ptr time_provider); + + explicit FlinkCompactionFilter(std::shared_ptr config_holder, + std::unique_ptr time_provider, + std::shared_ptr logger); + + const char* Name() const override; + Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const override; + + bool IgnoreSnapshots() const override { return true; } + +private: + inline void InitConfigIfNotYet() const; + + Decision ListDecide(const Slice& existing_value, std::string* new_value) const; + + inline std::size_t ListNextUnexpiredOffset(const Slice &existing_value, std::size_t offset, int64_t ttl) const; + + inline void SetUnexpiredListValue( + const Slice& existing_value, std::size_t offset, std::string* new_value) const; + + inline void CreateListElementFilterIfNull() const { + if (!list_element_filter_ && config_cached_->list_element_filter_factory_) { + const_cast(this)->list_element_filter_ = + std::unique_ptr(config_cached_->list_element_filter_factory_->CreateListElementFilter(logger_)); + } + } + + inline void UpdateCurrentTimestampIfStale() const { + bool is_stale = record_counter_ >= config_cached_->query_time_after_num_entries_; + if (is_stale) { + const_cast(this)->record_counter_ = 0; + const_cast(this)->current_timestamp_ = time_provider_->CurrentTimestamp(); + } + const_cast(this)->record_counter_ = record_counter_ + 1; + } + + std::shared_ptr config_holder_; + std::unique_ptr time_provider_; + std::shared_ptr logger_; + Config* config_cached_; + std::unique_ptr list_element_filter_; + int64_t current_timestamp_ = std::numeric_limits::max(); + int64_t record_counter_ = std::numeric_limits::max(); +}; + +static const FlinkCompactionFilter::Config DISABLED_CONFIG = FlinkCompactionFilter::Config{ + FlinkCompactionFilter::StateType::Disabled, 0, std::numeric_limits::max(), std::numeric_limits::max(), nullptr}; + +} // namespace flink +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/flink/flink_compaction_filter_test.cc b/utilities/flink/flink_compaction_filter_test.cc new file mode 100644 index 000000000..5174d6619 --- /dev/null +++ b/utilities/flink/flink_compaction_filter_test.cc @@ -0,0 +1,218 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include "test_util/testharness.h" +#include "utilities/flink/flink_compaction_filter.h" + +namespace ROCKSDB_NAMESPACE { +namespace flink { + +#define DISABLED FlinkCompactionFilter::StateType::Disabled +#define VALUE FlinkCompactionFilter::StateType::Value +#define LIST FlinkCompactionFilter::StateType::List + +#define KVALUE CompactionFilter::ValueType::kValue +#define KMERGE CompactionFilter::ValueType::kMergeOperand +#define KBLOB CompactionFilter::ValueType::kBlobIndex + +#define KKEEP CompactionFilter::Decision::kKeep +#define KREMOVE CompactionFilter::Decision::kRemove +#define KCHANGE CompactionFilter::Decision::kChangeValue + +#define EXPIRE (time += ttl + 20) + +#define EXPECT_ARR_EQ(arr1, arr2, num) EXPECT_TRUE( 0 == memcmp( arr1, arr2, num ) ); + +static const std::size_t TEST_TIMESTAMP_OFFSET = static_cast(2); + +static const std::size_t LIST_ELEM_FIXED_LEN = static_cast(8 + 4); + +static const int64_t QUERY_TIME_AFTER_NUM_ENTRIES = static_cast(10); + +class ConsoleLogger : public Logger { +public: + using Logger::Logv; + ConsoleLogger() : Logger(InfoLogLevel::DEBUG_LEVEL) {} + + void Logv(const char* format, va_list ap) override { + vprintf(format, ap); + printf("\n"); + } +}; + +int64_t time = 0; + +class TestTimeProvider : public FlinkCompactionFilter::TimeProvider { +public: + int64_t CurrentTimestamp() const override { + return time; + } +}; + +std::random_device rd; // NOLINT +std::mt19937 mt(rd()); // NOLINT +std::uniform_int_distribution rnd(JAVA_MIN_LONG, JAVA_MAX_LONG); // NOLINT + +int64_t ttl = 100; + +Slice key = Slice("key"); // NOLINT +char data[24]; +std::string new_list = ""; // NOLINT +std::string stub = ""; // NOLINT + +FlinkCompactionFilter::StateType state_type; +CompactionFilter::ValueType value_type; +FlinkCompactionFilter* filter; // NOLINT + +void SetTimestamp(int64_t timestamp, size_t offset = 0, char* value = data) { + for (unsigned long i = 0; i < sizeof(uint64_t); i++) { + value[offset + i] = static_cast(static_cast(timestamp) + >> ((sizeof(int64_t) - 1 - i) * BITS_PER_BYTE)); + } +} + +CompactionFilter::Decision decide(size_t data_size = sizeof(data)) { + return filter->FilterV2(0, key, value_type, Slice(data, data_size), &new_list, &stub); +} + +void Init(FlinkCompactionFilter::StateType stype, + CompactionFilter::ValueType vtype, + FlinkCompactionFilter::ListElementFilterFactory* fixed_len_filter_factory, + size_t timestamp_offset, + bool expired = false) { + time = expired ? time + ttl + 20 : time; + state_type = stype; + value_type = vtype; + + auto config_holder = std::make_shared(); + auto time_provider = new TestTimeProvider(); + auto logger = std::make_shared(); + + filter = new FlinkCompactionFilter(config_holder, std::unique_ptr(time_provider), logger); + auto config = new FlinkCompactionFilter::Config{state_type, timestamp_offset, ttl, QUERY_TIME_AFTER_NUM_ENTRIES, + std::unique_ptr(fixed_len_filter_factory)}; + EXPECT_EQ(decide(), KKEEP); // test disabled config + EXPECT_TRUE(config_holder->Configure(config)); + EXPECT_FALSE(config_holder->Configure(config)); +} + +void InitValue(FlinkCompactionFilter::StateType stype, + CompactionFilter::ValueType vtype, + bool expired = false, + size_t timestamp_offset = TEST_TIMESTAMP_OFFSET) { + time = rnd(mt); + SetTimestamp(time, timestamp_offset); + Init(stype, vtype, nullptr, timestamp_offset, expired); +} + +void InitList(CompactionFilter::ValueType vtype, + bool all_expired = false, + bool first_elem_expired = false, + size_t timestamp_offset = 0) { + time = rnd(mt); + SetTimestamp(first_elem_expired ? time - ttl - 20 : time, timestamp_offset); // elem 1 ts + SetTimestamp(time, LIST_ELEM_FIXED_LEN + timestamp_offset); // elem 2 ts + auto fixed_len_filter_factory = + new FlinkCompactionFilter::FixedListElementFilterFactory(LIST_ELEM_FIXED_LEN, static_cast(0)); + Init(LIST, vtype, fixed_len_filter_factory, timestamp_offset, all_expired); +} + +void Deinit() { + delete filter; +} + +TEST(FlinkStateTtlTest, CheckStateTypeEnumOrder) { // NOLINT + // if the order changes it also needs to be adjusted in Java client: + // in org.rocksdb.FlinkCompactionFilter + // and in org.rocksdb.FlinkCompactionFilterTest + EXPECT_EQ(DISABLED, 0); + EXPECT_EQ(VALUE, 1); + EXPECT_EQ(LIST, 2); +} + +TEST(FlinkStateTtlTest, SkipShortDataWithoutTimestamp) { // NOLINT + InitValue(VALUE, KVALUE, true); + EXPECT_EQ(decide(TIMESTAMP_BYTE_SIZE - 1), KKEEP); + Deinit(); +} + +TEST(FlinkValueStateTtlTest, Unexpired) { // NOLINT + InitValue(VALUE, KVALUE); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +TEST(FlinkValueStateTtlTest, Expired) { // NOLINT + InitValue(VALUE, KVALUE, true); + EXPECT_EQ(decide(), KREMOVE); + Deinit(); +} + +TEST(FlinkValueStateTtlTest, CachedTimeUpdate) { // NOLINT + InitValue(VALUE, KVALUE); + EXPECT_EQ(decide(), KKEEP); // also implicitly cache current timestamp + EXPIRE; // advance current timestamp to expire but cached should be used + // QUERY_TIME_AFTER_NUM_ENTRIES - 2: + // -1 -> for decide disabled in InitValue + // and -1 -> for decide right after InitValue + for (int64_t i = 0; i < QUERY_TIME_AFTER_NUM_ENTRIES - 2; i++) { + EXPECT_EQ(decide(), KKEEP); + } + EXPECT_EQ(decide(), KREMOVE); // advanced current timestamp should be updated in cache and expire state + Deinit(); +} + +TEST(FlinkValueStateTtlTest, WrongFilterValueType) { // NOLINT + InitValue(VALUE, KMERGE, true); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +TEST(FlinkListStateTtlTest, Unexpired) { // NOLINT + InitList(KMERGE); + EXPECT_EQ(decide(), KKEEP); + Deinit(); + + InitList(KVALUE); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +TEST(FlinkListStateTtlTest, Expired) { // NOLINT + InitList(KMERGE, true); + EXPECT_EQ(decide(), KREMOVE); + Deinit(); + + InitList(KVALUE, true); + EXPECT_EQ(decide(), KREMOVE); + Deinit(); +} + +TEST(FlinkListStateTtlTest, HalfExpired) { // NOLINT + InitList(KMERGE, false, true); + EXPECT_EQ(decide(), KCHANGE); + EXPECT_ARR_EQ(new_list.data(), data + LIST_ELEM_FIXED_LEN, LIST_ELEM_FIXED_LEN); + Deinit(); + + InitList(KVALUE, false, true); + EXPECT_EQ(decide(), KCHANGE); + EXPECT_ARR_EQ(new_list.data(), data + LIST_ELEM_FIXED_LEN, LIST_ELEM_FIXED_LEN); + Deinit(); +} + +TEST(FlinkListStateTtlTest, WrongFilterValueType) { // NOLINT + InitList(KBLOB, true); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +} // namespace flink +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 018d097b1..88cd8188d 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -21,6 +21,7 @@ class MergeOperators { static std::shared_ptr CreateStringAppendOperator(); static std::shared_ptr CreateStringAppendOperator(char delim_char); static std::shared_ptr CreateStringAppendTESTOperator(); + static std::shared_ptr CreateStringAppendTESTOperator(std::string delim); static std::shared_ptr CreateMaxOperator(); static std::shared_ptr CreateBytesXOROperator(); static std::shared_ptr CreateSortOperator(); diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index b8c676ee5..9c84dbf49 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -15,11 +15,6 @@ namespace ROCKSDB_NAMESPACE { -// Constructor: also specify the delimiter character. -StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) - : delim_(delim_char) { -} - // Implementation for the merge operation (concatenates two strings) bool StringAppendTESTOperator::FullMergeV2( const MergeOperationInput& merge_in, @@ -37,7 +32,7 @@ bool StringAppendTESTOperator::FullMergeV2( size_t numBytes = 0; for (auto it = merge_in.operand_list.begin(); it != merge_in.operand_list.end(); ++it) { - numBytes += it->size() + 1; // Plus 1 for the delimiter + numBytes += it->size() + delim_.size(); // Plus one delimiter } // Only print the delimiter after the first entry has been printed @@ -48,20 +43,20 @@ bool StringAppendTESTOperator::FullMergeV2( merge_out->new_value.reserve(numBytes + merge_in.existing_value->size()); merge_out->new_value.append(merge_in.existing_value->data(), merge_in.existing_value->size()); - printDelim = true; + printDelim = !delim_.empty(); } else if (numBytes) { merge_out->new_value.reserve( - numBytes - 1); // Minus 1 since we have one less delimiter + numBytes - delim_.size()); // Minus 1 delimiter since we have one less delimiter } // Concatenate the sequence of strings (and add a delimiter between each) for (auto it = merge_in.operand_list.begin(); it != merge_in.operand_list.end(); ++it) { if (printDelim) { - merge_out->new_value.append(1, delim_); + merge_out->new_value.append(delim_); } merge_out->new_value.append(it->data(), it->size()); - printDelim = true; + printDelim = !delim_.empty(); } return true; @@ -87,17 +82,17 @@ bool StringAppendTESTOperator::_AssocPartialMergeMulti( // Determine and reserve correct size for *new_value. size_t size = 0; for (const auto& operand : operand_list) { - size += operand.size(); + size += operand.size() + delim_.size(); } - size += operand_list.size() - 1; // Delimiters + size -= delim_.size(); // since we have one less delimiter new_value->reserve(size); // Apply concatenation new_value->assign(operand_list.front().data(), operand_list.front().size()); - for (std::deque::const_iterator it = operand_list.begin() + 1; + for (auto it = operand_list.begin() + 1; it != operand_list.end(); ++it) { - new_value->append(1, delim_); + new_value->append(delim_); new_value->append(it->data(), it->size()); } @@ -114,4 +109,9 @@ MergeOperators::CreateStringAppendTESTOperator() { return std::make_shared(','); } -} // namespace ROCKSDB_NAMESPACE +std::shared_ptr +MergeOperators::CreateStringAppendTESTOperator(std::string delim) { + return std::make_shared(delim); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 452164d8e..c2841245d 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -14,7 +14,7 @@ #pragma once #include #include - +#include #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" @@ -22,18 +22,20 @@ namespace ROCKSDB_NAMESPACE { class StringAppendTESTOperator : public MergeOperator { public: - // Constructor with delimiter - explicit StringAppendTESTOperator(char delim_char); + // Constructor with string delimiter + explicit StringAppendTESTOperator(std::string delim_str) : delim_(std::move(delim_str)) {}; + + // Constructor with char delimiter + explicit StringAppendTESTOperator(char delim_char) : delim_(std::string(1, delim_char)) {}; - virtual bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override; + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; - virtual bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, Logger* logger) const - override; + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const override; - virtual const char* Name() const override; + const char* Name() const override; private: // A version of PartialMerge that actually performs "partial merging". @@ -42,7 +44,7 @@ class StringAppendTESTOperator : public MergeOperator { const std::deque& operand_list, std::string* new_value, Logger* logger) const; - char delim_; // The delimiter is inserted between elements + std::string delim_; // The delimiter is inserted between elements };