Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
4cd810e
redis: upstream client draining and temp host connection limit and cl…
msukalski May 29, 2019
4b26c56
redis: upstream client draining and temp host connection handling (#7…
msukalski May 29, 2019
8d1e739
redis: upstream client draining and temp host connection handling (#7…
msukalski Jun 14, 2019
da3a916
Merge remote-tracking branch 'upstream/master' into msukalski/redirec…
msukalski Jun 14, 2019
8f4209d
redis: upstream client draining and temp host connection handling (#7…
msukalski Jun 15, 2019
7da57ef
redis: upstream client draining and temp host connection handling (#7…
msukalski Jun 15, 2019
1e394b8
redis: upstream client draining and temp host connection handling (#7…
msukalski Jun 16, 2019
8361353
Merge remote-tracking branch 'upstream/master' into msukalski/redirec…
msukalski Jun 21, 2019
e53d68c
redis: upstream client draining and temp host connection handling (#7…
msukalski Jun 26, 2019
b064fa8
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 10, 2019
e85851b
Merge remote-tracking branch 'upstream/master' into msukalski/redirec…
msukalski Jul 10, 2019
3fd4f0d
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 11, 2019
5353969
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 11, 2019
2a2b88f
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 12, 2019
0b2ea71
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 12, 2019
c91ba8a
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 12, 2019
7405861
Merge remote-tracking branch 'upstream/master' into msukalski/redirec…
msukalski Jul 13, 2019
cb997ef
redis: upstream client draining and temp host connection handling (#7…
msukalski Jul 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ option go_package = "v2";
import "envoy/api/v2/core/base.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";
import "gogoproto/gogo.proto";
Expand Down Expand Up @@ -82,6 +83,13 @@ message RedisProxy {
// If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout` is not, the latter
// defaults to 3ms.
google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true];

// `max_upstream_unknown_connections` controls how many upstream connections to unknown hosts
// can be created at any given time by any given worker thread (see `enable_redirection` for
// more details). If the host is unknown and a connection cannot be created due to enforcing
// this limit, then redirection will fail and the original redirection error will be passed
// downstream unchanged. This limit defaults to 100.
google.protobuf.UInt32Value max_upstream_unknown_connections = 6;
}

// Network settings for the connection pool to the upstream clusters.
Expand Down
9 changes: 9 additions & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ following information:
For topology configuration details, see the Redis Cluster
:ref:`v2 API reference <envoy_api_msg_config.cluster.redis.RedisClusterConfig>`.

Every Redis cluster has its own extra statistics tree rooted at *cluster.<name>.redis_cluster.* with the following statistics:

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

max_upstream_unknown_connections_reached, Counter, Total number of times that an upstream connection to an unknown host is not created after redirection having reached the connection pool's max_upstream_unknown_connections limit
upstream_cx_drained, Counter, Total number of upstream connections drained of active requests before being closed

Supported commands
------------------

Expand Down
1 change: 1 addition & 0 deletions source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
bool enableRedirection() const override { return false; }
uint32_t maxBufferSizeBeforeFlush() const override { return 0; }
std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
uint32_t maxUpstreamUnknownConnections() const override { return 0; }

// Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks
void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ envoy_cc_library(
"//source/common/network:filter_lib",
"//source/common/protobuf:utility_lib",
"//source/common/upstream:load_balancer_lib",
"//source/common/upstream:upstream_lib",
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
],
)
Expand Down
23 changes: 23 additions & 0 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class Client : public Event::DeferredDeletable {
*/
virtual void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) PURE;

/**
* Called to determine if the client has pending requests.
* @return bool true if the client is processing requests or false if it is currently idle.
*/
virtual bool active() PURE;

/**
* Closes the underlying network connection.
*/
Expand Down Expand Up @@ -134,6 +140,23 @@ class Config {
* @return timeout for batching commands for a single upstream host.
*/
virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE;

/**
* @return the maximum number of upstream connections to unknown hosts when enableRedirection() is
* true.
*
* This value acts as an upper bound on the number of servers in a cluster if only a subset
* of the cluster's servers are known via configuration (cluster size - number of servers in
* cluster known to cluster manager <= maxUpstreamUnknownConnections() for proper operation).
* Redirection errors are processed if enableRedirection() is true, and a new upstream connection
* to a previously unknown server will be made as a result of redirection if the number of unknown
* server connections is currently less than maxUpstreamUnknownConnections(). If a connection
* cannot be made, then the original redirection error will be passed though unchanged to the
* downstream client. If a cluster is using the Redis cluster protocol (RedisCluster), then the
* cluster logic will periodically discover all of the servers in the cluster; this should
* minimize the need for a large maxUpstreamUnknownConnections() value.
*/
virtual uint32_t maxUpstreamUnknownConnections() const PURE;
};

/**
Expand Down
15 changes: 7 additions & 8 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ ConfigImpl::ConfigImpl(
config.max_buffer_size_before_flush()), // This is a scalar, so default is zero.
buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(
config, buffer_flush_timeout,
3)) // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
{}
3)), // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
max_upstream_unknown_connections_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)) {}

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
Expand Down Expand Up @@ -124,14 +125,12 @@ void ClientImpl::putOutlierEvent(Upstream::Outlier::Result result) {
void ClientImpl::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {

Upstream::reportUpstreamCxDestroy(host_, event);
if (!pending_requests_.empty()) {
host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc();
Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
if (event == Network::ConnectionEvent::RemoteClose) {
putOutlierEvent(Upstream::Outlier::Result::LOCAL_ORIGIN_CONNECT_FAILED);
host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc();
}
if (event == Network::ConnectionEvent::LocalClose) {
host_->cluster().stats().upstream_cx_destroy_local_with_active_rq_.inc();
}
}

Expand Down
6 changes: 6 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/protobuf/utility.h"
#include "common/singleton/const_singleton.h"
#include "common/upstream/load_balancer_impl.h"
#include "common/upstream/upstream_impl.h"

#include "extensions/filters/network/common/redis/client.h"

Expand Down Expand Up @@ -45,13 +46,17 @@ class ConfigImpl : public Config {
std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
return buffer_flush_timeout_;
}
uint32_t maxUpstreamUnknownConnections() const override {
return max_upstream_unknown_connections_;
}

private:
const std::chrono::milliseconds op_timeout_;
const bool enable_hashtagging_;
const bool enable_redirection_;
const uint32_t max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
const uint32_t max_upstream_unknown_connections_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
Expand All @@ -68,6 +73,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
}
void close() override;
PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override;
bool active() override { return !pending_requests_.empty(); }
void flushBufferAndResetTimer();

private:
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ envoy_cc_library(
deps = [
":config_interface",
":conn_pool_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/network/redis_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP

Upstreams upstreams;
for (auto& cluster : unique_clusters) {
Stats::ScopePtr stats_scope =
context.scope().createScope(fmt::format("cluster.{}.redis_cluster", cluster));
upstreams.emplace(cluster, std::make_shared<ConnPool::InstanceImpl>(
cluster, context.clusterManager(),
Common::Redis::Client::ClientFactoryImpl::instance_,
context.threadLocal(), proto_config.settings(), context.api(),
context.scope().symbolTable()));
std::move(stats_scope)));
}

auto router =
Expand Down
Loading