Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ The Redis filter will gather statistics for commands in the
total, Counter, Number of commands
success, Counter, Number of commands that were successful
error, Counter, Number of commands that returned a partial or complete error response

latency, Histogram, Command execution time in milliseconds

.. _config_network_filters_redis_proxy_per_command_stats:

Runtime
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Version history
* http: added new grpc_http1_reverse_bridge filter for converting gRPC requests into HTTP/1.1 requests.
* mysql: added a MySQL proxy filter that is capable of parsing SQL queries over MySQL wire protocol. Refer to ::ref:`MySQL proxy<config_network_filters_mysql_proxy>` for more details.
* redis: added :ref:`success and error stats <config_network_filters_redis_proxy_per_command_stats>` for commands.
* redis: added :ref:`latency stats <config_network_filters_redis_proxy_per_command_stats>` for commands.
* router: added ability to configure a :ref:`retry policy <envoy_api_msg_route.RetryPolicy>` at the
virtual host level.
* tls: enabled TLS 1.3 on the server-side (non-FIPS builds).
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ envoy_cc_library(
":command_splitter_interface",
":conn_pool_interface",
":supported_commands_lib",
"//include/envoy/stats:stats_macros",
"//include/envoy/stats:timespan",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:to_lower_table_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void SplitRequestBase::updateStats(const bool success) {
} else {
command_stats_.error_.inc();
}
command_latency_ms_->complete();
}

SingleServerRequest::~SingleServerRequest() { ASSERT(!handle_); }
Expand All @@ -60,8 +61,9 @@ void SingleServerRequest::cancel() {

SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool,
const RespValue& incoming_request, SplitCallbacks& callbacks,
CommandStats& command_stats) {
std::unique_ptr<SimpleRequest> request_ptr{new SimpleRequest(callbacks, command_stats)};
CommandStats& command_stats, TimeSource& time_source) {
std::unique_ptr<SimpleRequest> request_ptr{
new SimpleRequest(callbacks, command_stats, time_source)};

request_ptr->handle_ = conn_pool.makeRequest(incoming_request.asArray()[1].asString(),
incoming_request, *request_ptr);
Expand All @@ -75,7 +77,7 @@ SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool,

SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool,
const RespValue& incoming_request, SplitCallbacks& callbacks,
CommandStats& command_stats) {
CommandStats& command_stats, TimeSource& time_source) {

// EVAL looks like: EVAL script numkeys key [key ...] arg [arg ...]
// Ensure there are at least three args to the command or it cannot be hashed.
Expand All @@ -85,7 +87,7 @@ SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool,
return nullptr;
}

std::unique_ptr<EvalRequest> request_ptr{new EvalRequest(callbacks, command_stats)};
std::unique_ptr<EvalRequest> request_ptr{new EvalRequest(callbacks, command_stats, time_source)};
request_ptr->handle_ = conn_pool.makeRequest(incoming_request.asArray()[3].asString(),
incoming_request, *request_ptr);
if (!request_ptr->handle_) {
Expand Down Expand Up @@ -120,8 +122,8 @@ void FragmentedRequest::onChildFailure(uint32_t index) {

SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,
const RespValue& incoming_request, SplitCallbacks& callbacks,
CommandStats& command_stats) {
std::unique_ptr<MGETRequest> request_ptr{new MGETRequest(callbacks, command_stats)};
CommandStats& command_stats, TimeSource& time_source) {
std::unique_ptr<MGETRequest> request_ptr{new MGETRequest(callbacks, command_stats, time_source)};

request_ptr->num_pending_responses_ = incoming_request.asArray().size() - 1;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);
Expand Down Expand Up @@ -190,13 +192,13 @@ void MGETRequest::onChildResponse(RespValuePtr&& value, uint32_t index) {

SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool,
const RespValue& incoming_request, SplitCallbacks& callbacks,
CommandStats& command_stats) {
CommandStats& command_stats, TimeSource& time_source) {
if ((incoming_request.asArray().size() - 1) % 2 != 0) {
onWrongNumberOfArguments(callbacks, incoming_request);
command_stats.error_.inc();
return nullptr;
}
std::unique_ptr<MSETRequest> request_ptr{new MSETRequest(callbacks, command_stats)};
std::unique_ptr<MSETRequest> request_ptr{new MSETRequest(callbacks, command_stats, time_source)};

request_ptr->num_pending_responses_ = (incoming_request.asArray().size() - 1) / 2;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);
Expand Down Expand Up @@ -264,9 +266,10 @@ void MSETRequest::onChildResponse(RespValuePtr&& value, uint32_t index) {
SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool,
const RespValue& incoming_request,
SplitCallbacks& callbacks,
CommandStats& command_stats) {
CommandStats& command_stats,
TimeSource& time_source) {
std::unique_ptr<SplitKeysSumResultRequest> request_ptr{
new SplitKeysSumResultRequest(callbacks, command_stats)};
new SplitKeysSumResultRequest(callbacks, command_stats, time_source)};

request_ptr->num_pending_responses_ = incoming_request.asArray().size() - 1;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);
Expand Down Expand Up @@ -327,11 +330,12 @@ void SplitKeysSumResultRequest::onChildResponse(RespValuePtr&& value, uint32_t i
}

InstanceImpl::InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope,
const std::string& stat_prefix)
const std::string& stat_prefix, TimeSource& time_source)
: conn_pool_(std::move(conn_pool)), simple_command_handler_(*conn_pool_),
eval_command_handler_(*conn_pool_), mget_handler_(*conn_pool_), mset_handler_(*conn_pool_),
split_keys_sum_result_handler_(*conn_pool_),
stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))} {
stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
time_source_(time_source) {
for (const std::string& command : SupportedCommands::simpleCommands()) {
addHandler(scope, stat_prefix, command, simple_command_handler_);
}
Expand Down Expand Up @@ -388,8 +392,8 @@ SplitRequestPtr InstanceImpl::makeRequest(const RespValue& request, SplitCallbac
}
ENVOY_LOG(debug, "redis: splitting '{}'", request.toString());
handler->command_stats_.total_.inc();
SplitRequestPtr request_ptr =
handler->handler_.get().startRequest(request, callbacks, handler->command_stats_);
SplitRequestPtr request_ptr = handler->handler_.get().startRequest(
request, callbacks, handler->command_stats_, time_source_);
return request_ptr;
}

Expand All @@ -402,13 +406,12 @@ void InstanceImpl::addHandler(Stats::Scope& scope, const std::string& stat_prefi
const std::string& name, CommandHandler& handler) {
std::string to_lower_name(name);
to_lower_table_.toLowerCase(to_lower_name);
const std::string command_stat_prefix = fmt::format("{}command.{}.", stat_prefix, to_lower_name);
handler_lookup_table_.add(
to_lower_name.c_str(),
std::make_shared<HandlerData>(HandlerData{
CommandStats{
scope.counter(fmt::format("{}command.{}.total", stat_prefix, to_lower_name)),
scope.counter(fmt::format("{}command.{}.success", stat_prefix, to_lower_name)),
scope.counter(fmt::format("{}command.{}.error", stat_prefix, to_lower_name))},
CommandStats{ALL_COMMAND_STATS(POOL_COUNTER_PREFIX(scope, command_stat_prefix),
POOL_HISTOGRAM_PREFIX(scope, command_stat_prefix))},
handler}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <vector>

#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
#include "envoy/stats/timespan.h"

#include "common/common/logger.h"
#include "common/common/to_lower_table.h"
Expand Down Expand Up @@ -36,19 +38,30 @@ class Utility {
static RespValuePtr makeError(const std::string& error);
};

class CommandStats {
public:
Stats::Counter& total_;
Stats::Counter& success_;
Stats::Counter& error_;
/**
* All command level stats. @see stats_macros.h
*/
// clang-format off
#define ALL_COMMAND_STATS(COUNTER, HISTOGRAM) \
COUNTER(total) \
COUNTER(success) \
COUNTER(error) \
HISTOGRAM(latency) \
// clang-format on

/**
* Struct definition for all command stats. @see stats_macros.h
*/
struct CommandStats {
ALL_COMMAND_STATS(GENERATE_COUNTER_STRUCT,GENERATE_HISTOGRAM_STRUCT)
};

class CommandHandler {
public:
virtual ~CommandHandler() {}

virtual SplitRequestPtr startRequest(const RespValue& request, SplitCallbacks& callbacks,
CommandStats& command_stats) PURE;
CommandStats& command_stats, TimeSource& time_source) PURE;
};

class CommandHandlerBase {
Expand All @@ -61,11 +74,14 @@ class CommandHandlerBase {
class SplitRequestBase : public SplitRequest {
protected:
static void onWrongNumberOfArguments(SplitCallbacks& callbacks, const RespValue& request);

void updateStats(const bool success);

SplitRequestBase(CommandStats& command_stats) : command_stats_(command_stats) {}
SplitRequestBase(CommandStats& command_stats, TimeSource& time_source)
: command_stats_(command_stats) {
command_latency_ms_ = std::make_unique<Stats::Timespan>(command_stats_.latency_, time_source);
}
CommandStats& command_stats_;
Stats::TimespanPtr command_latency_ms_;
};

/**
Expand All @@ -83,8 +99,9 @@ class SingleServerRequest : public SplitRequestBase, public ConnPool::PoolCallba
void cancel() override;

protected:
SingleServerRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: SplitRequestBase(command_stats), callbacks_(callbacks) {}
SingleServerRequest(SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source)
: SplitRequestBase(command_stats, time_source), callbacks_(callbacks) {}

SplitCallbacks& callbacks_;
ConnPool::PoolRequest* handle_{};
Expand All @@ -96,11 +113,12 @@ class SingleServerRequest : public SplitRequestBase, public ConnPool::PoolCallba
class SimpleRequest : public SingleServerRequest {
public:
static SplitRequestPtr create(ConnPool::Instance& conn_pool, const RespValue& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats);
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source);

private:
SimpleRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: SingleServerRequest(callbacks, command_stats) {}
SimpleRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source)
: SingleServerRequest(callbacks, command_stats, time_source) {}
};

/**
Expand All @@ -109,11 +127,12 @@ class SimpleRequest : public SingleServerRequest {
class EvalRequest : public SingleServerRequest {
public:
static SplitRequestPtr create(ConnPool::Instance& conn_pool, const RespValue& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats);
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source);

private:
EvalRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: SingleServerRequest(callbacks, command_stats) {}
EvalRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source)
: SingleServerRequest(callbacks, command_stats, time_source) {}
};

/**
Expand All @@ -129,8 +148,8 @@ class FragmentedRequest : public SplitRequestBase {
void cancel() override;

protected:
FragmentedRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: SplitRequestBase(command_stats), callbacks_(callbacks) {}
FragmentedRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source)
: SplitRequestBase(command_stats, time_source), callbacks_(callbacks) {}

struct PendingRequest : public ConnPool::PoolCallbacks {
PendingRequest(FragmentedRequest& parent, uint32_t index) : parent_(parent), index_(index) {}
Expand Down Expand Up @@ -163,11 +182,12 @@ class FragmentedRequest : public SplitRequestBase {
class MGETRequest : public FragmentedRequest, Logger::Loggable<Logger::Id::redis> {
public:
static SplitRequestPtr create(ConnPool::Instance& conn_pool, const RespValue& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats);
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source);

private:
MGETRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: FragmentedRequest(callbacks, command_stats) {}
MGETRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source)
: FragmentedRequest(callbacks, command_stats, time_source) {}

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(RespValuePtr&& value, uint32_t index) override;
Expand All @@ -182,11 +202,13 @@ class MGETRequest : public FragmentedRequest, Logger::Loggable<Logger::Id::redis
class SplitKeysSumResultRequest : public FragmentedRequest, Logger::Loggable<Logger::Id::redis> {
public:
static SplitRequestPtr create(ConnPool::Instance& conn_pool, const RespValue& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats);
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source);

private:
SplitKeysSumResultRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: FragmentedRequest(callbacks, command_stats) {}
SplitKeysSumResultRequest(SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source)
: FragmentedRequest(callbacks, command_stats, time_source) {}

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(RespValuePtr&& value, uint32_t index) override;
Expand All @@ -202,11 +224,12 @@ class SplitKeysSumResultRequest : public FragmentedRequest, Logger::Loggable<Log
class MSETRequest : public FragmentedRequest, Logger::Loggable<Logger::Id::redis> {
public:
static SplitRequestPtr create(ConnPool::Instance& conn_pool, const RespValue& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats);
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source);

private:
MSETRequest(SplitCallbacks& callbacks, CommandStats& command_stats)
: FragmentedRequest(callbacks, command_stats) {}
MSETRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source)
: FragmentedRequest(callbacks, command_stats, time_source) {}

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(RespValuePtr&& value, uint32_t index) override;
Expand All @@ -221,8 +244,8 @@ class CommandHandlerFactory : public CommandHandler, CommandHandlerBase {
public:
CommandHandlerFactory(ConnPool::Instance& conn_pool) : CommandHandlerBase(conn_pool) {}
SplitRequestPtr startRequest(const RespValue& request, SplitCallbacks& callbacks,
CommandStats& command_stats) {
return RequestClass::create(conn_pool_, request, callbacks, command_stats);
CommandStats& command_stats, TimeSource& time_source) {
return RequestClass::create(conn_pool_, request, callbacks, command_stats, time_source);
}
};

Expand All @@ -245,7 +268,7 @@ struct InstanceStats {
class InstanceImpl : public Instance, Logger::Loggable<Logger::Id::redis> {
public:
InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope,
const std::string& stat_prefix);
const std::string& stat_prefix, TimeSource& time_source);

// RedisProxy::CommandSplitter::Instance
SplitRequestPtr makeRequest(const RespValue& request, SplitCallbacks& callbacks) override;
Expand All @@ -271,6 +294,7 @@ class InstanceImpl : public Instance, Logger::Loggable<Logger::Id::redis> {
TrieLookupTable<HandlerDataPtr> handler_lookup_table_;
InstanceStats stats_;
const ToLowerTable to_lower_table_;
TimeSource& time_source_;
};

} // namespace CommandSplitter
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/network/redis_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP
filter_config->cluster_name_, context.clusterManager(),
ConnPool::ClientFactoryImpl::instance_, context.threadLocal(), proto_config.settings()));
std::shared_ptr<CommandSplitter::Instance> splitter(new CommandSplitter::InstanceImpl(
std::move(conn_pool), context.scope(), filter_config->stat_prefix_));
std::move(conn_pool), context.scope(), filter_config->stat_prefix_, context.timeSource()));
return [splitter, filter_config](Network::FilterManager& filter_manager) -> void {
DecoderFactoryImpl factory;
filter_manager.addReadFilter(std::make_shared<ProxyFilter>(
Expand Down
3 changes: 3 additions & 0 deletions test/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ envoy_extension_cc_test(
"//source/common/stats:stats_lib",
"//source/extensions/filters/network/redis_proxy:command_splitter_lib",
"//test/mocks:common_lib",
"//test/mocks/stats:stats_mocks",
"//test/test_common:simulated_time_system_lib",
],
)

Expand Down Expand Up @@ -107,5 +109,6 @@ envoy_extension_cc_test_binary(
"//source/common/stats:stats_lib",
"//source/extensions/filters/network/redis_proxy:command_splitter_lib",
"//test/test_common:printers_lib",
"//test/test_common:simulated_time_system_lib",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "extensions/filters/network/redis_proxy/supported_commands.h"

#include "test/test_common/printers.h"
#include "test/test_common/simulated_time_system.h"

#include "testing/base/public/benchmark.h"

Expand Down Expand Up @@ -64,7 +65,9 @@ class CommandLookUpSpeedTest {

ConnPool::Instance* conn_pool_{new NullInstanceImpl()};
Stats::IsolatedStoreImpl store_;
CommandSplitter::InstanceImpl splitter_{ConnPool::InstancePtr{conn_pool_}, store_, "redis.foo."};
Event::SimulatedTimeSystem time_system_;
CommandSplitter::InstanceImpl splitter_{ConnPool::InstancePtr{conn_pool_}, store_, "redis.foo.",
time_system_};
NoOpSplitCallbacks callbacks_;
CommandSplitter::SplitRequestPtr handle_;
};
Expand Down
Loading