Skip to content

Commit

Permalink
cassandra compaction filter for purge expired ttl columns and rows
Browse files Browse the repository at this point in the history
  • Loading branch information
wpc committed Jul 15, 2017
1 parent 1320133 commit 46e28bc
Show file tree
Hide file tree
Showing 22 changed files with 322 additions and 74 deletions.
17 changes: 9 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,9 @@ set(SOURCES
utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc
utilities/cassandra/cassandra_compaction_filter.cc
utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc
utilities/checkpoint/checkpoint_impl.cc
utilities/col_buf_decoder.cc
utilities/col_buf_encoder.cc
Expand All @@ -501,8 +504,6 @@ set(SOURCES
utilities/memory/memory_util.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
utilities/merge_operators/cassandra/format.cc
utilities/merge_operators/cassandra/merge_operator.cc
utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc
Expand Down Expand Up @@ -706,18 +707,18 @@ set(TESTS
util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc
utilities/blob_db/blob_db_test.cc
utilities/cassandra/cassandra_functional_test.cc
utilities/cassandra/cassandra_format_test.cc
utilities/cassandra/cassandra_row_merge_test.cc
utilities/cassandra/cassandra_serialize_test.cc
utilities/checkpoint/checkpoint_test.cc
utilities/column_aware_encoding_test.cc
utilities/date_tiered/date_tiered_test.cc
utilities/document/document_db_test.cc
utilities/document/json_document_test.cc
utilities/geodb/geodb_test.cc
utilities/lua/rocks_lua_test.cc
utilities/memory/memory_test.cc
utilities/merge_operators/cassandra/cassandra_merge_test.cc
utilities/merge_operators/cassandra/cassandra_format_test.cc
utilities/merge_operators/cassandra/cassandra_row_merge_test.cc
utilities/merge_operators/cassandra/cassandra_serialize_test.cc
utilities/memory/memory_test.cc
utilities/merge_operators/string_append/stringappend_test.cc
utilities/object_registry_test.cc
utilities/option_change_migration/option_change_migration_test.cc
Expand Down Expand Up @@ -758,7 +759,7 @@ set(TESTUTIL_SOURCE
monitoring/thread_status_updater_debug.cc
table/mock_table.cc
util/fault_injection_test_env.cc
utilities/merge_operators/cassandra/test_utils.cc
utilities/cassandra/test_utils.cc
)
# test utilities are only build in debug
enable_testing()
Expand Down
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ TESTS = \
write_buffer_manager_test \
stringappend_test \
cassandra_format_test \
cassandra_merge_test \
cassandra_functional_test \
cassandra_row_merge_test \
cassandra_serialize_test \
ttl_test \
Expand Down Expand Up @@ -1000,16 +1000,16 @@ option_change_migration_test: utilities/option_change_migration/option_change_mi
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

cassandra_format_test: utilities/merge_operators/cassandra/cassandra_format_test.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_format_test: utilities/cassandra/cassandra_format_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

cassandra_merge_test: utilities/merge_operators/cassandra/cassandra_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_functional_test: utilities/cassandra/cassandra_functional_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

cassandra_row_merge_test: utilities/merge_operators/cassandra/cassandra_row_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_row_merge_test: utilities/cassandra/cassandra_row_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

cassandra_serialize_test: utilities/merge_operators/cassandra/cassandra_serialize_test.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

redis_test: utilities/redis/redis_lists_test.o $(LIBOBJECTS) $(TESTHARNESS)
Expand Down
17 changes: 9 additions & 8 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ cpp_library(
"utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/blob_log_format.cc",
"utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
Expand All @@ -226,8 +229,6 @@ cpp_library(
"utilities/leveldb_options/leveldb_options.cc",
"utilities/lua/rocks_lua_compaction_filter.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators/cassandra/format.cc",
"utilities/merge_operators/cassandra/merge_operator.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/merge_operators/string_append/stringappend.cc",
Expand Down Expand Up @@ -275,7 +276,7 @@ cpp_library(
"util/testharness.cc",
"util/testutil.cc",
"db/db_test_util.cc",
"utilities/merge_operators/cassandra/test_utils.cc",
"utilities/cassandra/test_utils.cc",
"utilities/col_buf_encoder.cc",
"utilities/col_buf_decoder.cc",
"utilities/column_aware_encoding_util.cc",
Expand Down Expand Up @@ -325,16 +326,16 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'],
['c_test', 'db/c_test.c', 'serial'],
['cache_test', 'cache/cache_test.cc', 'serial'],
['cassandra_format_test',
'utilities/merge_operators/cassandra/cassandra_format_test.cc',
'utilities/cassandra/cassandra_format_test.cc',
'serial'],
['cassandra_merge_test',
'utilities/merge_operators/cassandra/cassandra_merge_test.cc',
['cassandra_functional_test',
'utilities/cassandra/cassandra_functional_test.cc',
'serial'],
['cassandra_row_merge_test',
'utilities/merge_operators/cassandra/cassandra_row_merge_test.cc',
'utilities/cassandra/cassandra_row_merge_test.cc',
'serial'],
['cassandra_serialize_test',
'utilities/merge_operators/cassandra/cassandra_serialize_test.cc',
'utilities/cassandra/cassandra_serialize_test.cc',
'serial'],
['checkpoint_test', 'utilities/checkpoint/checkpoint_test.cc', 'serial'],
['cleanable_test', 'table/cleanable_test.cc', 'serial'],
Expand Down
3 changes: 3 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/options.cc
rocksjni/ratelimiterjni.cc
rocksjni/remove_emptyvalue_compactionfilterjni.cc
rocksjni/cassandra_compactionfilterjni.cc
rocksjni/restorejni.cc
rocksjni/rocksdb_exception_test.cc
rocksjni/rocksjni.cc
Expand Down Expand Up @@ -55,6 +56,8 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.BlockBasedTableConfig
org.rocksdb.BloomFilter
org.rocksdb.Cache
org.rocksdb.CassandraCompactionFilter
org.rocksdb.CassandraValueMergeOperator
org.rocksdb.Checkpoint
org.rocksdb.ClockCache
org.rocksdb.ColumnFamilyHandle
Expand Down
1 change: 1 addition & 0 deletions java/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.BloomFilter\
org.rocksdb.Checkpoint\
org.rocksdb.ClockCache\
org.rocksdb.CassandraCompactionFilter\
org.rocksdb.CassandraValueMergeOperator\
org.rocksdb.ColumnFamilyHandle\
org.rocksdb.ColumnFamilyOptions\
Expand Down
22 changes: 22 additions & 0 deletions java/rocksjni/cassandra_compactionfilterjni.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <jni.h>

#include "include/org_rocksdb_CassandraCompactionFilter.h"
#include "utilities/cassandra/cassandra_compaction_filter.h"

/*
* Class: org_rocksdb_CassandraCompactionFilter
* Method: createNewCassandraCompactionFilter0
* Signature: ()J
*/
jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0(
JNIEnv* env, jclass jcls) {
auto* compaction_filter =
new rocksdb::cassandra::CassandraCompactionFilter();
// set the native handle to our native compaction filter
return reinterpret_cast<jlong>(compaction_filter);
}
2 changes: 1 addition & 1 deletion java/rocksjni/cassandra_value_operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators/cassandra/merge_operator.h"
#include "utilities/cassandra/merge_operator.h"

/*
* Class: org_rocksdb_CassandraValueMergeOperator
Expand Down
18 changes: 18 additions & 0 deletions java/src/main/java/org/rocksdb/CassandraCompactionFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

package org.rocksdb;

/**
* Just a Java wrapper around CassandraCompactionFilter implemented in C++
*/
public class CassandraCompactionFilter
extends AbstractCompactionFilter<Slice> {
public CassandraCompactionFilter() {
super(createNewCassandraCompactionFilter0());
}

private native static long createNewCassandraCompactionFilter0();
}
15 changes: 9 additions & 6 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ LIB_SOURCES = \
utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/blob_log_format.cc \
utilities/cassandra/cassandra_compaction_filter.cc \
utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \
utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
Expand All @@ -173,8 +176,6 @@ LIB_SOURCES = \
utilities/leveldb_options/leveldb_options.cc \
utilities/lua/rocks_lua_compaction_filter.cc \
utilities/memory/memory_util.cc \
utilities/merge_operators/cassandra/format.cc \
utilities/merge_operators/cassandra/merge_operator.cc \
utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \
utilities/merge_operators/string_append/stringappend.cc \
Expand Down Expand Up @@ -329,6 +330,11 @@ MAIN_SOURCES = \
util/thread_local_test.cc \
utilities/backupable/backupable_db_test.cc \
utilities/blob_db/blob_db_test.cc \
utilities/cassandra/cassandra_format_test.cc \
utilities/cassandra/cassandra_functional_test.cc \
utilities/cassandra/cassandra_row_merge_test.cc \
utilities/cassandra/cassandra_serialize_test.cc \
utilities/cassandra/test_utils.cc \
utilities/checkpoint/checkpoint_test.cc \
utilities/column_aware_encoding_exp.cc \
utilities/column_aware_encoding_test.cc \
Expand All @@ -339,10 +345,6 @@ MAIN_SOURCES = \
utilities/lua/rocks_lua_test.cc \
utilities/memory/memory_test.cc \
utilities/merge_operators/string_append/stringappend_test.cc \
utilities/merge_operators/cassandra/cassandra_merge_test.cc \
utilities/merge_operators/cassandra/cassandra_format_test.cc \
utilities/merge_operators/cassandra/cassandra_row_merge_test.cc \
utilities/merge_operators/cassandra/cassandra_serialize_test.cc \
utilities/object_registry_test.cc \
utilities/option_change_migration/option_change_migration_test.cc \
utilities/options/options_util_test.cc \
Expand Down Expand Up @@ -379,6 +381,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/options.cc \
java/rocksjni/ratelimiterjni.cc \
java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \
java/rocksjni/cassandra_compactionfilterjni.cc \
java/rocksjni/restorejni.cc \
java/rocksjni/rocksjni.cc \
java/rocksjni/rocksdb_exception_test.cc \
Expand Down
49 changes: 49 additions & 0 deletions utilities/cassandra/cassandra_compaction_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef ROCKSDB_LITE

#include <string>

#include "rocksdb/slice.h"
#include "utilities/cassandra/cassandra_compaction_filter.h"
#include "utilities/cassandra/format.h"


namespace rocksdb {
namespace cassandra {

const char* CassandraCompactionFilter::Name() const {
return "CassandraCompactionFilter";
}

CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
int level,
const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const {

bool value_changed = false;
RowValue row_value = RowValue::Deserialize(existing_value.data(), existing_value.size());
RowValue compacted = row_value.GC(&value_changed);

if(compacted.Empty()) {
return Decision::kRemove;
}

if (value_changed) {
compacted.Serialize(new_value);
return Decision::kChangeValue;
}

return Decision::kKeep;
}


} // namespace cassandra
} // namespace rocksdb
#endif // !ROCKSDB_LITE
30 changes: 30 additions & 0 deletions utilities/cassandra/cassandra_compaction_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef ROCKSDB_LITE

#pragma once

#include <string>

#include "rocksdb/compaction_filter.h"
#include "rocksdb/slice.h"

namespace rocksdb {
namespace cassandra {

class CassandraCompactionFilter : public CompactionFilter {
public:
const char* Name() const override;
virtual Decision FilterV2(int level,
const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const override;
};
} // namespace cassandra
} // namespace rocksdb
#endif // !ROCKSDB_LITE
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
#include <cstring>
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/serialize.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/serialize.h"

using namespace rocksdb::cassandra;

Expand Down
Loading

0 comments on commit 46e28bc

Please sign in to comment.