diff --git a/build.sh b/build.sh index a572036..f7c1c0e 100755 --- a/build.sh +++ b/build.sh @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version} # Note: if you don't have a good reason, please do not set -DPORTABLE=ON # This one is set here on purpose of compatibility with github action runtime processor -rocksdb_version="8.11.4" +rocksdb_version="9.1.1" cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \ mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \ -DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \ diff --git a/c.h b/c.h index 54c770a..9de8770 100644 --- a/c.h +++ b/c.h @@ -608,6 +608,9 @@ extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t* rocksdb_create_snapshot( extern ROCKSDB_LIBRARY_API void rocksdb_release_snapshot( rocksdb_t* db, const rocksdb_snapshot_t* snapshot); +extern ROCKSDB_LIBRARY_API uint64_t +rocksdb_snapshot_get_sequence_number(const rocksdb_snapshot_t* snapshot); + /* Returns NULL if property name is unknown. Else returns a pointer to a malloc()-ed null-terminated value. */ extern ROCKSDB_LIBRARY_API char* rocksdb_property_value(rocksdb_t* db, @@ -691,8 +694,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_flush_wal(rocksdb_t* db, extern ROCKSDB_LIBRARY_API void rocksdb_disable_file_deletions(rocksdb_t* db, char** errptr); -extern ROCKSDB_LIBRARY_API void rocksdb_enable_file_deletions( - rocksdb_t* db, unsigned char force, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_enable_file_deletions(rocksdb_t* db, + char** errptr); /* Management operations */ @@ -1152,10 +1155,16 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_env(rocksdb_options_t*, rocksdb_env_t*); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log(rocksdb_options_t*, rocksdb_logger_t*); +extern ROCKSDB_LIBRARY_API rocksdb_logger_t* rocksdb_options_get_info_log( + rocksdb_options_t* opt); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log_level( rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API int rocksdb_options_get_info_log_level( rocksdb_options_t*); +extern ROCKSDB_LIBRARY_API rocksdb_logger_t* +rocksdb_logger_create_stderr_logger(int log_level, const char* prefix); +extern ROCKSDB_LIBRARY_API void rocksdb_logger_destroy( + rocksdb_logger_t* logger); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_write_buffer_size( rocksdb_options_t*, size_t); extern ROCKSDB_LIBRARY_API size_t @@ -1499,10 +1508,6 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_advise_random_on_open( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_advise_random_on_open(rocksdb_options_t*); -extern ROCKSDB_LIBRARY_API void -rocksdb_options_set_access_hint_on_compaction_start(rocksdb_options_t*, int); -extern ROCKSDB_LIBRARY_API int -rocksdb_options_get_access_hint_on_compaction_start(rocksdb_options_t*); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_adaptive_mutex( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_use_adaptive_mutex( @@ -1684,6 +1689,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* rocksdb_ratelimiter_create_auto_tuned(int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness); +extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* +rocksdb_ratelimiter_create_with_mode(int64_t rate_bytes_per_sec, + int64_t refill_period_us, int32_t fairness, + int mode, bool auto_tuned); extern ROCKSDB_LIBRARY_API void rocksdb_ratelimiter_destroy( rocksdb_ratelimiter_t*); diff --git a/cf_ts_test.go b/cf_ts_test.go index f62de31..7262371 100644 --- a/cf_ts_test.go +++ b/cf_ts_test.go @@ -112,7 +112,7 @@ func TestColumnFamilyMultiGetWithTS(t *testing.T) { givenVal1 = []byte("world1") givenVal2 = []byte("world2") givenVal3 = []byte("world3") - givenTs1 = marshalTimestamp(1) + givenTs1 = marshalTimestamp(0) givenTs2 = marshalTimestamp(2) givenTs3 = marshalTimestamp(3) ) @@ -177,7 +177,7 @@ func TestColumnFamilyMultiGetWithTS(t *testing.T) { require.EqualValues(t, values[1].Data(), givenVal2) require.EqualValues(t, values[2].Data(), givenVal3) - require.EqualValues(t, times[0].Data(), givenTs1) + require.EqualValues(t, times[0].Data(), []byte{}) require.EqualValues(t, times[1].Data(), givenTs2) require.EqualValues(t, times[2].Data(), givenTs3) } diff --git a/db.go b/db.go index 685f496..f21efe7 100644 --- a/db.go +++ b/db.go @@ -1017,7 +1017,7 @@ func (db *DB) SingleDeleteCFWithTS(opts *WriteOptions, cf *ColumnFamilyHandle, k } // DeleteRangeCF deletes keys that are between [startKey, endKey) -func (db *DB) DeleteRangeCF(opts *WriteOptions, cf *ColumnFamilyHandle, startKey []byte, endKey []byte) (err error) { +func (db *DB) DeleteRangeCF(opts *WriteOptions, cf *ColumnFamilyHandle, startKey, endKey []byte) (err error) { var ( cErr *C.char cStartKey = refGoBytes(startKey) @@ -1087,7 +1087,7 @@ func (db *DB) SingleDeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []b } // Merge merges the data associated with the key with the actual data in the database. -func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) (err error) { +func (db *DB) Merge(opts *WriteOptions, key, value []byte) (err error) { var ( cErr *C.char cKey = refGoBytes(key) @@ -1102,7 +1102,7 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) (err error) { // MergeCF merges the data associated with the key with the actual data in the // database and column family. -func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, value []byte) (err error) { +func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) (err error) { var ( cErr *C.char cKey = refGoBytes(key) @@ -1662,10 +1662,10 @@ func (db *DB) DisableFileDeletions() (err error) { } // EnableFileDeletions enables file deletions for the database. -func (db *DB) EnableFileDeletions(force bool) (err error) { +func (db *DB) EnableFileDeletions() (err error) { var cErr *C.char - C.rocksdb_enable_file_deletions(db.c, boolToChar(force), &cErr) + C.rocksdb_enable_file_deletions(db.c, &cErr) err = fromCError(cErr) return diff --git a/db_test.go b/db_test.go index 397b350..e07b0ea 100644 --- a/db_test.go +++ b/db_test.go @@ -222,6 +222,8 @@ func newTestDBMultiCF(t *testing.T, columns []string, applyOpts func(opts *Optio dir := t.TempDir() opts := NewDefaultOptions() + rateLimiter := NewGenericRateLimiter(1024, 100*1000, 10, RateLimiterModeAllIo, true) + opts.SetRateLimiter(rateLimiter) opts.SetCreateIfMissingColumnFamilies(true) opts.SetCreateIfMissing(true) opts.SetCompression(ZSTDCompression) diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..e1db9dc --- /dev/null +++ b/logger.go @@ -0,0 +1,26 @@ +package grocksdb + +// #include "rocksdb/c.h" +// #include "grocksdb.h" +import "C" +import "unsafe" + +// Logger struct. +type Logger struct { + c *C.rocksdb_logger_t +} + +func NewStderrLogger(level InfoLogLevel, prefix string) *Logger { + prefix_ := C.CString(prefix) + defer C.free(unsafe.Pointer(prefix_)) + + return &Logger{ + c: C.rocksdb_logger_create_stderr_logger(C.int(level), prefix_), + } +} + +// Destroy Logger. +func (l *Logger) Destroy() { + C.rocksdb_logger_destroy(l.c) + l.c = nil +} diff --git a/options.go b/options.go index 442c907..3b373c7 100644 --- a/options.go +++ b/options.go @@ -353,6 +353,18 @@ func (opts *Options) SetEnv(env *Env) { env.c = nil } +// SetInfoLog sets info logger. +func (opts *Options) SetInfoLog(logger *Logger) { + C.rocksdb_options_set_info_log(opts.c, logger.c) +} + +// GetInfoLog gets info logger. +func (opts *Options) GetInfoLog() *Logger { + return &Logger{ + c: C.rocksdb_options_get_info_log(opts.c), + } +} + // SetInfoLogLevel sets the info log level. // // Default: InfoInfoLogLevel @@ -1451,20 +1463,21 @@ func (opts *Options) GetDbWriteBufferSize() uint64 { return uint64(C.rocksdb_options_get_db_write_buffer_size(opts.c)) } -// SetAccessHintOnCompactionStart specifies the file access pattern -// once a compaction is started. -// -// It will be applied to all input files of a compaction. -// Default: NormalCompactionAccessPattern -func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) { - C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value)) -} - -// GetAccessHintOnCompactionStart returns the file access pattern -// once a compaction is started. -func (opts *Options) GetAccessHintOnCompactionStart() CompactionAccessPattern { - return CompactionAccessPattern(C.rocksdb_options_get_access_hint_on_compaction_start(opts.c)) -} +// Deprecation in rocksdb v9.x +// // SetAccessHintOnCompactionStart specifies the file access pattern +// // once a compaction is started. +// // +// // It will be applied to all input files of a compaction. +// // Default: NormalCompactionAccessPattern +// func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) { +// C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value)) +// } +// +// // GetAccessHintOnCompactionStart returns the file access pattern +// // once a compaction is started. +// func (opts *Options) GetAccessHintOnCompactionStart() CompactionAccessPattern { +// return CompactionAccessPattern(C.rocksdb_options_get_access_hint_on_compaction_start(opts.c)) +// } // SetUseAdaptiveMutex enable/disable adaptive mutex, which spins // in the user space before resorting to kernel. diff --git a/options_test.go b/options_test.go index 5a23d0c..3aaa0c9 100644 --- a/options_test.go +++ b/options_test.go @@ -101,8 +101,8 @@ func TestOptions(t *testing.T) { opts.SetAdviseRandomOnOpen(true) require.EqualValues(t, true, opts.AdviseRandomOnOpen()) - opts.SetAccessHintOnCompactionStart(SequentialCompactionAccessPattern) - require.EqualValues(t, SequentialCompactionAccessPattern, opts.GetAccessHintOnCompactionStart()) + // opts.SetAccessHintOnCompactionStart(SequentialCompactionAccessPattern) + // require.EqualValues(t, SequentialCompactionAccessPattern, opts.GetAccessHintOnCompactionStart()) opts.SetDbWriteBufferSize(1 << 30) require.EqualValues(t, 1<<30, opts.GetDbWriteBufferSize()) @@ -401,6 +401,10 @@ func TestOptions(t *testing.T) { opts.SetWriteBufferManager(wbm) + lg := NewStderrLogger(InfoInfoLogLevel, "prefix") + opts.SetInfoLog(lg) + require.NotNil(t, opts.GetInfoLog()) + // cloning cl := opts.Clone() require.EqualValues(t, 5, cl.GetTableCacheNumshardbits()) diff --git a/ratelimiter.go b/ratelimiter.go index 833e511..76b4eaa 100644 --- a/ratelimiter.go +++ b/ratelimiter.go @@ -4,6 +4,14 @@ package grocksdb // #include "rocksdb/c.h" import "C" +type RateLimiterMode int + +const ( + RateLimiterModeReadsOnly RateLimiterMode = iota + RateLimiterModeWritesOnly + RateLimiterModeAllIo +) + // RateLimiter is used to control write rate of flush and // compaction. type RateLimiter struct { @@ -52,6 +60,47 @@ func NewAutoTunedRateLimiter(rateBytesPerSec, refillPeriodMicros int64, fairness return newNativeRateLimiter(cR) } +// NewGenericRateLimiter creates a RateLimiter object, which can be shared among RocksDB instances to +// control write rate of flush and compaction. +// +// @rate_bytes_per_sec: this is the only parameter you want to set most of the +// time. It controls the total write rate of compaction and flush in bytes per +// second. Currently, RocksDB does not enforce rate limit for anything other +// than flush and compaction, e.g. write to WAL. +// +// @refill_period_us: this controls how often tokens are refilled. For example, +// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to +// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to +// burstier writes while smaller value introduces more CPU overhead. +// The default should work for most cases. +// +// @fairness: RateLimiter accepts high-pri requests and low-pri requests. +// A low-pri request is usually blocked in favor of hi-pri request. Currently, +// RocksDB assigns low-pri to request from compaction and high-pri to request +// from flush. Low-pri requests can get blocked if flush requests come in +// continuously. This fairness parameter grants low-pri requests permission by +// 1/fairness chance even though high-pri requests exist to avoid starvation. +// You should be good by leaving it at default 10. +// +// @mode: Mode indicates which types of operations count against the limit. +// +// @auto_tuned: Enables dynamic adjustment of rate limit within the range +// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to +// the recent demand for background I/O. +func NewGenericRateLimiter( + rateBytesPerSec, refillPeriodMicros int64, fairness int32, + mode RateLimiterMode, autoTuned bool, +) *RateLimiter { + cR := C.rocksdb_ratelimiter_create_with_mode( + C.int64_t(rateBytesPerSec), + C.int64_t(refillPeriodMicros), + C.int32_t(fairness), + C.int(mode), + C.bool(autoTuned), + ) + return newNativeRateLimiter(cR) +} + // NewNativeRateLimiter creates a native RateLimiter object. func newNativeRateLimiter(c *C.rocksdb_ratelimiter_t) *RateLimiter { return &RateLimiter{c: c} diff --git a/snapshot.go b/snapshot.go index 7853d06..99daf66 100644 --- a/snapshot.go +++ b/snapshot.go @@ -14,6 +14,11 @@ func newNativeSnapshot(c *C.rocksdb_snapshot_t) *Snapshot { return &Snapshot{c: c} } +// GetSequenceNumber gets sequence number of the Snapshot. +func (snapshot *Snapshot) GetSequenceNumber() uint64 { + return uint64(C.rocksdb_snapshot_get_sequence_number(snapshot.c)) +} + // Destroy deallocates the Snapshot object. func (snapshot *Snapshot) Destroy() { C.rocksdb_free(unsafe.Pointer(snapshot.c))