Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ option(ENABLE_STATIC_LIBSTDCXX "link kvrocks with static library of libstd++ ins
option(ENABLE_LUAJIT "enable use of luaJIT instead of lua" ON)
option(ENABLE_OPENSSL "enable openssl to support tls connection" OFF)
option(ENABLE_IPO "enable interprocedural optimization" ON)
option(ENABLE_HISTOGRAMS "enable histograms to view the operation latencies" OFF)
set(SYMBOLIZE_BACKEND "" CACHE STRING "symbolization backend library for cpptrace (libbacktrace, libdwarf, or empty)")
set(PORTABLE 0 CACHE STRING "build a portable binary (disable arch-specific optimizations)")
# TODO: set ENABLE_NEW_ENCODING to ON when we are ready
Expand Down Expand Up @@ -288,6 +289,11 @@ if(ENABLE_IPO)
endif()
endif()

if(ENABLE_HISTOGRAMS)
target_compile_definitions(kvrocks_objs PUBLIC ENABLE_HISTOGRAMS)
endif()


# kvrocks main target
add_executable(kvrocks src/cli/main.cc)
target_link_libraries(kvrocks PRIVATE kvrocks_objs ${EXTERNAL_LIBS})
Expand Down
12 changes: 12 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,18 @@ json-storage-format json
# Default: no
txn-context-enabled no

# Define the histogram bucket values.
#
# If enabled, those values will be used to store the command execution latency values
# in buckets defined below. The values should be integers and must be sorted.
# An implicit bucket (+Inf in prometheus jargon) will be added to track the highest values
# that are beyond the bucket limits.

# NOTE: This is an experimental feature. There might be some performance overhead when using this
# feature, please be aware.
# Default: 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000
# histogram-bucket-boundaries 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
27 changes: 26 additions & 1 deletion src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},

#ifdef ENABLE_HISTOGRAMS
{"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_,
"10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000")},
#endif
/* rocksdb options */
{"rocksdb.compression", false,
new EnumField<rocksdb::CompressionType>(&rocks_db.compression, compression_types,
Expand Down Expand Up @@ -754,6 +757,28 @@ void Config::initFieldCallback() {
{"tls-session-caching", set_tls_option},
{"tls-session-cache-size", set_tls_option},
{"tls-session-cache-timeout", set_tls_option},
#endif
#ifdef ENABLE_HISTOGRAMS
{"histogram-bucket-boundaries",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::vector<std::string> buckets = util::Split(v, ",");
histogram_bucket_boundaries.clear();
if (buckets.size() < 1) {
return {Status::NotOK, "Please provide at least 1 bucket value for histogram"};
}
std::transform(buckets.begin(), buckets.end(), std::back_inserter(histogram_bucket_boundaries), [](const std::string& val)
{
return std::stod(val);
});
if (histogram_bucket_boundaries.size() != buckets.size()) {
return {Status::NotOK, "All values for the bucket must be double or integer values"};
}

if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) {
return {Status::NotOK, "The values for the histogram must be sorted"};
}
return Status::OK();
}},
#endif
};
for (const auto &iter : callbacks) {
Expand Down
8 changes: 8 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ struct Config {

bool skip_block_cache_deallocation_on_close = false;

#ifdef ENABLE_HISTOGRAMS
std::vector<double> histogram_bucket_boundaries;
#endif

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down Expand Up @@ -260,6 +264,10 @@ struct Config {
std::string profiling_sample_commands_str_;
std::map<std::string, std::unique_ptr<ConfigField>> fields_;
std::vector<std::string> rename_command_;
#ifdef ENABLE_HISTOGRAMS
std::string histogram_bucket_boundaries_str_;
#endif


void initFieldValidator();
void initFieldCallback();
Expand Down
38 changes: 37 additions & 1 deletion src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
#include "worker.h"

Server::Server(engine::Storage *storage, Config *config)
: storage(storage),
:
#ifdef ENABLE_HISTOGRAMS
stats(config),
#endif
storage(storage),
indexer(storage),
index_mgr(&indexer, storage),
start_time_secs_(util::GetTimeStamp()),
Expand All @@ -63,6 +67,17 @@ Server::Server(engine::Storage *storage, Config *config)
for (const auto &iter : *commands) {
stats.commands_stats[iter.first].calls = 0;
stats.commands_stats[iter.first].latency = 0;

#ifdef ENABLE_HISTOGRAMS
//NB: Extra index for the last bucket (Inf)
for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) {
auto bucket_ptr = std::shared_ptr<std::atomic<uint64_t>>(new std::atomic<uint64_t>(0));

stats.commands_histogram[iter.first].buckets.push_back(bucket_ptr);
}
stats.commands_histogram[iter.first].calls = 0;
stats.commands_histogram[iter.first].sum = 0;
#endif
}

// init cursor_dict_
Expand Down Expand Up @@ -1165,6 +1180,27 @@ void Server::GetCommandsStatsInfo(std::string *info) {
<< ",usec_per_call=" << static_cast<float>(latency / calls) << "\r\n";
}

#ifdef ENABLE_HISTOGRAMS
for (const auto &cmd_hist : stats.commands_histogram) {
auto command_name = cmd_hist.first;
auto calls = stats.commands_histogram[command_name].calls.load();
if (calls == 0) continue;

auto sum = stats.commands_histogram[command_name].sum.load();
string_stream << "cmdstathist_" << command_name << ":";
for (std::size_t i{0}; i < stats.commands_histogram[command_name].buckets.size(); ++i) {
auto bucket_value = stats.commands_histogram[command_name].buckets[i]->load();
auto bucket_bound = std::numeric_limits<double>::infinity();
if (i < stats.bucket_boundaries.size()) {
bucket_bound = stats.bucket_boundaries[i];
}

string_stream << bucket_bound << "=" << bucket_value << ",";
}
string_stream << "sum=" << sum << ",count=" << calls << "\r\n";
}
#endif

*info = string_stream.str();
}

Expand Down
28 changes: 28 additions & 0 deletions src/stats/stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@
#include "fmt/format.h"
#include "time_util.h"


#ifdef ENABLE_HISTOGRAMS
Stats::Stats(Config *config)
: config_(config) {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
im.last_sample_time_ms = 0;
im.last_sample_count = 0;
im.idx = 0;
for (uint64_t &sample : im.samples) {
sample = 0;
}
inst_metrics.push_back(im);
}
bucket_boundaries = config_->histogram_bucket_boundaries;
}
#else
Stats::Stats() {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
Expand All @@ -38,6 +55,7 @@ Stats::Stats() {
inst_metrics.push_back(im);
}
}
#endif

#if defined(__APPLE__)
#include <mach/mach_init.h>
Expand Down Expand Up @@ -86,10 +104,20 @@ int64_t Stats::GetMemoryRSS() {
void Stats::IncrCalls(const std::string &command_name) {
total_calls.fetch_add(1, std::memory_order_relaxed);
commands_stats[command_name].calls.fetch_add(1, std::memory_order_relaxed);
#ifdef ENABLE_HISTOGRAMS
commands_histogram[command_name].calls.fetch_add(1, std::memory_order_relaxed);
#endif
}

void Stats::IncrLatency(uint64_t latency, const std::string &command_name) {
commands_stats[command_name].latency.fetch_add(latency, std::memory_order_relaxed);
#ifdef ENABLE_HISTOGRAMS
commands_histogram[command_name].sum.fetch_add(latency, std::memory_order_relaxed);

const auto bucket_index = static_cast<std::size_t>(std::distance(
bucket_boundaries.begin(), std::lower_bound(bucket_boundaries.begin(), bucket_boundaries.end(), latency)));
commands_histogram[command_name].buckets[bucket_index]->fetch_add(1, std::memory_order_relaxed);
#endif
}

void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
Expand Down
27 changes: 27 additions & 0 deletions src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include <shared_mutex>
#include <string>
#include <vector>
#ifdef ENABLE_HISTOGRAMS
#include <algorithm>
#include "config/config.h"
#endif

enum StatsMetricFlags {
STATS_METRIC_COMMAND = 0, // Number of commands executed
Expand All @@ -43,6 +47,15 @@ enum StatsMetricFlags {

constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric

#ifdef ENABLE_HISTOGRAMS
// Experimental part to support histograms for cmd statistics
struct CommandHistogram {
std::vector<std::shared_ptr<std::atomic<uint64_t>>> buckets;
std::atomic<uint64_t> calls;
std::atomic<uint64_t> sum;
};
#endif

struct CommandStat {
std::atomic<uint64_t> calls;
std::atomic<uint64_t> latency;
Expand All @@ -57,6 +70,14 @@ struct InstMetric {

class Stats {
public:
#ifdef ENABLE_HISTOGRAMS
using BucketBoundaries = std::vector<double>;
BucketBoundaries bucket_boundaries;
std::map<std::string, CommandHistogram> commands_histogram;

Config *config_ = nullptr;
#endif

std::atomic<uint64_t> total_calls = {0};
std::atomic<uint64_t> in_bytes = {0};
std::atomic<uint64_t> out_bytes = {0};
Expand All @@ -69,7 +90,13 @@ class Stats {
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;


#ifdef ENABLE_HISTOGRAMS
explicit Stats(Config *config);
#else
Stats();
#endif

void IncrCalls(const std::string &command_name);
void IncrLatency(uint64_t latency, const std::string &command_name);
void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
Expand Down
4 changes: 4 additions & 0 deletions tests/cppunit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ TEST(Config, GetAndSet) {
{"rocksdb.rate_limiter_auto_tuned", "yes"},
{"rocksdb.compression_level", "32767"},
{"rocksdb.wal_compression", "no"},
#ifdef ENABLE_HISTOGRAMS
{"histogram-bucket-boundaries", "10,100,1000,10000"},
#endif

};
for (const auto &iter : immutable_cases) {
s = config.Set(nullptr, iter.first, iter.second);
Expand Down
23 changes: 22 additions & 1 deletion tests/gocase/unit/info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -33,7 +34,10 @@ import (
)

func TestInfo(t *testing.T) {
srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
srv0 := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"histogram-bucket-boundaries": "10,20,30,50",
})
defer func() { srv0.Close() }()
rdb0 := srv0.NewClient()
defer func() { require.NoError(t, rdb0.Close()) }()
Expand Down Expand Up @@ -102,6 +106,23 @@ func TestInfo(t *testing.T) {
t.Run("get cluster information by INFO - cluster enabled", func(t *testing.T) {
require.Equal(t, "1", util.FindInfoEntry(rdb0, "cluster_enabled", "cluster"))
})

t.Run("get command latencies via histogram INFO - histogram-bucket-boundaries", func(t *testing.T) {
output := util.FindInfoEntry(rdb0, "cmdstathist", "cmdstathist_info")
if len(output) == 0 {
t.SkipNow()
}

splitValues := strings.FieldsFunc(output, func(r rune) bool {
return r == '=' || r == ','
})

// expected: 10=..,20=..,30=..,50=..,inf=..,sum=...,count=..
require.GreaterOrEqual(t, len(splitValues), 15)
require.Contains(t, splitValues, "sum")
require.Contains(t, splitValues, "count")
require.Contains(t, splitValues, "info")
})
}

func TestKeyspaceHitMiss(t *testing.T) {
Expand Down
Loading