diff --git a/api/envoy/config/filter/network/redis_proxy/v2/BUILD b/api/envoy/config/filter/network/redis_proxy/v2/BUILD index 55ba69f1c4c22..8dbda11b1e18e 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/BUILD +++ b/api/envoy/config/filter/network/redis_proxy/v2/BUILD @@ -5,4 +5,7 @@ licenses(["notice"]) # Apache 2 api_proto_library_internal( name = "redis_proxy", srcs = ["redis_proxy.proto"], + deps = [ + "//envoy/api/v2/core:base", + ], ) diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index eec8c3f409544..89946341426f6 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -7,6 +7,8 @@ option java_multiple_files = true; option java_package = "io.envoyproxy.envoy.config.filter.network.redis_proxy.v2"; option go_package = "v2"; +import "envoy/api/v2/core/base.proto"; + import "google/protobuf/duration.proto"; import "validate/validate.proto"; @@ -141,4 +143,23 @@ message RedisProxy { // ` of the architecture overview for recommendations on // configuring the backing clusters. PrefixRoutes prefix_routes = 5 [(gogoproto.nullable) = false]; + + // Authenticate Redis client connections locally by forcing downstream clients to issue a 'Redis + // AUTH command `_ with this password before enabling any other + // command. If an AUTH command's password matches this password, an "OK" response will be returned + // to the client. If the AUTH command password does not match this password, then an "ERR invalid + // password" error will be returned. If any other command is received before AUTH when this + // password is set, then a "NOAUTH Authentication required." error response will be sent to the + // client. If an AUTH command is received when the password is not set, then an "ERR Client sent + // AUTH, but no password is set" error will be returned. + envoy.api.v2.core.DataSource downstream_auth_password = 6; } + +// RedisProtocolOptions specifies Redis upstream protocol options. This object is used in +// :ref:`extension_protocol_options`, keyed +// by the name `envoy.redis_proxy`. +message RedisProtocolOptions { + // Upstream server password as defined by the `requirepass directive + // `_ in the server's configuration file. + envoy.api.v2.core.DataSource auth_password = 1; +} \ No newline at end of file diff --git a/docs/root/intro/arch_overview/redis.rst b/docs/root/intro/arch_overview/redis.rst index 866f2ad01ad9a..a3b159ff97517 100644 --- a/docs/root/intro/arch_overview/redis.rst +++ b/docs/root/intro/arch_overview/redis.rst @@ -25,6 +25,7 @@ The Redis project offers a thorough reference on partitioning as it relates to R * Active and passive healthchecking. * Hash tagging. * Prefix routing. +* Separate downstream client and upstream server authentication. **Planned future enhancements**: @@ -85,10 +86,13 @@ Supported commands At the protocol level, pipelines are supported. MULTI (transaction block) is not. Use pipelining wherever possible for the best performance. -At the command level, Envoy only supports commands that can be reliably hashed to a server. PING -is the only exception, which Envoy responds to immediately with PONG. Arguments to PING are not -allowed. All other supported commands must contain a key. Supported commands are functionally -identical to the original Redis command except possibly in failure scenarios. +At the command level, Envoy only supports commands that can be reliably hashed to a server. AUTH and PING +are the only exceptions. AUTH is processed locally by Envoy if a downstream password has been configured, +and no other commands will be processed until authentication is successful when a password has been +configured. Envoy will transparently issue AUTH commands upon connecting to upstream servers, if upstream +authentication passwords are configured for the cluster. Envoy responds to PING immediately with PONG. +Arguments to PING are not allowed. All other supported commands must contain a key. Supported commands are +functionally identical to the original Redis command except possibly in failure scenarios. For details on each command's usage see the official `Redis command reference `_. @@ -97,6 +101,7 @@ For details on each command's usage see the official :header: Command, Group :widths: 1, 1 + AUTH, Authentication PING, Connection DEL, Generic DUMP, Generic @@ -227,6 +232,12 @@ Envoy can also generate its own errors in response to the client. responded with a response that not conform to the Redis protocol." wrong number of arguments for command, "Certain commands check in Envoy that the number of arguments is correct." + "NOAUTH Authentication required.", "The command was rejected because a downstream authentication + password has been set and the client has not successfully authenticated." + ERR invalid password, "The authentication command failed due to an invalid password." + "ERR Client sent AUTH, but no password is set", "An authentication command was received, but no + downstream authentication password has been configured." + In the case of MGET, each individual key that cannot be fetched will generate an error response. For example, if we fetch five keys and two of the keys' backends time out, we would get an error diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 0ba3ca7a27c1a..01013221d5608 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -31,6 +31,7 @@ Version history * redis: added :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. +* redis: added auth support :ref:`downstream_auth_password ` for downstream client authentication, and :ref:`auth_password ` to configure authentication passwords for upstream server clusters. * router: add support for configuring a :ref:`grpc timeout offset ` on incoming requests. * router: added ability to control retry back-off intervals via :ref:`retry policy `. * router: added ability to issue a hedged retry in response to a per try timeout via a :ref:`hedge policy `. diff --git a/source/extensions/clusters/redis/BUILD b/source/extensions/clusters/redis/BUILD index 7248e397168b6..cbea929dc6000 100644 --- a/source/extensions/clusters/redis/BUILD +++ b/source/extensions/clusters/redis/BUILD @@ -20,6 +20,7 @@ envoy_cc_library( "//include/envoy/upstream:cluster_factory_interface", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:upstream_interface", + "//source/common/config:datasource_lib", "//source/common/config:metadata_lib", "//source/common/event:dispatcher_lib", "//source/common/json:config_schemas_lib", @@ -33,6 +34,8 @@ envoy_cc_library( "//source/extensions/filters/network/common/redis:client_interface", "//source/extensions/filters/network/common/redis:client_lib", "//source/extensions/filters/network/common/redis:codec_interface", + "//source/extensions/filters/network/common/redis:utility_lib", + "//source/extensions/filters/network/redis_proxy:config_interface", "//source/extensions/transport_sockets/raw_buffer:config", "//source/server:transport_socket_config_lib", "@envoy_api//envoy/config/cluster/redis:redis_cluster_cc", diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 96c64714211c9..4a141e04a89e4 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -7,11 +7,15 @@ namespace Extensions { namespace Clusters { namespace Redis { +namespace { +Extensions::NetworkFilters::Common::Redis::Client::DoNothingPoolCallbacks null_pool_callbacks; +} // namespace + RedisCluster::RedisCluster( const envoy::api::v2::Cluster& cluster, const envoy::config::cluster::redis::RedisClusterConfig& redisCluster, NetworkFilters::Common::Redis::Client::ClientFactory& redis_client_factory, - Upstream::ClusterManager& clusterManager, Runtime::Loader& runtime, + Upstream::ClusterManager& clusterManager, Runtime::Loader& runtime, Api::Api& api, Network::DnsResolverSharedPtr dns_resolver, Server::Configuration::TransportSocketFactoryContext& factory_context, Stats::ScopePtr&& stats_scope, bool added_via_api) @@ -28,7 +32,7 @@ RedisCluster::RedisCluster( ? cluster.load_assignment() : Config::Utility::translateClusterHosts(cluster.hosts())), local_info_(factory_context.localInfo()), random_(factory_context.random()), - redis_discovery_session_(*this, redis_client_factory) { + redis_discovery_session_(*this, redis_client_factory), api_(api) { const auto& locality_lb_endpoints = load_assignment_.endpoints(); for (const auto& locality_lb_endpoint : locality_lb_endpoints) { for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { @@ -38,7 +42,14 @@ RedisCluster::RedisCluster( locality_lb_endpoint, lb_endpoint)); } } -}; + + auto options = + info()->extensionProtocolOptionsTyped( + NetworkFilters::NetworkFilterNames::get().RedisProxy); + if (options) { + auth_password_datasource_ = options->auth_password_datasource(); + } +} void RedisCluster::startPreInit() { for (const DnsDiscoveryResolveTargetPtr& target : dns_discovery_resolve_targets_) { @@ -221,6 +232,14 @@ void RedisCluster::RedisDiscoverySession::startResolve() { client->host_ = current_host_address_; client->client_ = client_factory_.create(host, dispatcher_, *this); client->client_->addConnectionCallbacks(*client); + std::string auth_password = + Envoy::Config::DataSource::read(parent_.auth_password_datasource_, true, parent_.api_); + if (!auth_password.empty()) { + // Send an AUTH command to the upstream server. + client->client_->makeRequest( + Extensions::NetworkFilters::Common::Redis::Utility::makeAuthCommand(auth_password), + null_pool_callbacks); + } } current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); @@ -301,8 +320,9 @@ Upstream::ClusterImplBaseSharedPtr RedisClusterFactory::createClusterWithConfig( } return std::make_shared( cluster, proto_config, NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_, - context.clusterManager(), context.runtime(), selectDnsResolver(cluster, context), - socket_factory_context, std::move(stats_scope), context.addedViaApi()); + context.clusterManager(), context.runtime(), context.api(), + selectDnsResolver(cluster, context), socket_factory_context, std::move(stats_scope), + context.addedViaApi()); } REGISTER_FACTORY(RedisClusterFactory, Upstream::ClusterFactory); diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 9c432298acda9..e0872bb27b698 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -39,6 +39,7 @@ #include "common/common/callback_impl.h" #include "common/common/enum_to_int.h" #include "common/common/logger.h" +#include "common/config/datasource.h" #include "common/config/metadata.h" #include "common/config/well_known_names.h" #include "common/network/address_impl.h" @@ -56,6 +57,8 @@ #include "extensions/filters/network/common/redis/client.h" #include "extensions/filters/network/common/redis/client_impl.h" #include "extensions/filters/network/common/redis/codec.h" +#include "extensions/filters/network/common/redis/utility.h" +#include "extensions/filters/network/redis_proxy/config.h" namespace Envoy { namespace Extensions { @@ -92,7 +95,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { RedisCluster(const envoy::api::v2::Cluster& cluster, const envoy::config::cluster::redis::RedisClusterConfig& redisCluster, NetworkFilters::Common::Redis::Client::ClientFactory& client_factory, - Upstream::ClusterManager& clusterManager, Runtime::Loader& runtime, + Upstream::ClusterManager& clusterManager, Runtime::Loader& runtime, Api::Api& api, Network::DnsResolverSharedPtr dns_resolver, Server::Configuration::TransportSocketFactoryContext& factory_context, Stats::ScopePtr&& stats_scope, bool added_via_api); @@ -261,6 +264,9 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { Upstream::HostVector hosts_; Upstream::HostMap all_hosts_; + + envoy::api::v2::core::DataSource auth_password_datasource_; + Api::Api& api_; }; class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase< diff --git a/source/extensions/filters/network/common/redis/BUILD b/source/extensions/filters/network/common/redis/BUILD index c8b07c4ceb5bf..f74e283743c69 100644 --- a/source/extensions/filters/network/common/redis/BUILD +++ b/source/extensions/filters/network/common/redis/BUILD @@ -14,6 +14,12 @@ envoy_cc_library( deps = ["//include/envoy/buffer:buffer_interface"], ) +envoy_cc_library( + name = "utility_interface", + hdrs = ["utility.h"], + deps = [":codec_interface"], +) + envoy_cc_library( name = "codec_lib", srcs = ["codec_impl.cc"], @@ -62,3 +68,12 @@ envoy_cc_library( "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", ], ) + +envoy_cc_library( + name = "utility_lib", + srcs = ["utility.cc"], + hdrs = ["utility.h"], + deps = [ + ":codec_lib", + ], +) diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 4a7c53912afc3..24170c127b9cf 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -52,6 +52,18 @@ class PoolCallbacks { virtual bool onRedirection(const Common::Redis::RespValue& value) PURE; }; +/** + * DoNothingPoolCallbacks is used for internally generated commands whose response is + * transparently filtered, and redirection never occurs (e.g., "asking", "auth", etc.). + */ +class DoNothingPoolCallbacks : public PoolCallbacks { +public: + // PoolCallbacks + void onResponse(Common::Redis::RespValuePtr&&) override {} + void onFailure() override {} + bool onRedirection(const Common::Redis::RespValue&) override { return false; } +}; + /** * A single redis client connection. */ diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index fd9b7b7af7b80..377d54268d838 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -71,6 +71,8 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne void flushBufferAndResetTimer(); private: + friend class RedisClientImplTest; + struct UpstreamReadFilter : public Network::ReadFilterBaseImpl { UpstreamReadFilter(ClientImpl& parent) : parent_(parent) {} diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index 13210d62abb72..91e9ddbd05dc0 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -45,6 +45,11 @@ struct SupportedCommands { CONSTRUCT_ON_FIRST_USE(std::vector, "del", "exists", "touch", "unlink"); } + /** + * @return auth command + */ + static const std::string& auth() { CONSTRUCT_ON_FIRST_USE(std::string, "auth"); } + /** * @return mget command */ diff --git a/source/extensions/filters/network/common/redis/utility.cc b/source/extensions/filters/network/common/redis/utility.cc new file mode 100644 index 0000000000000..81f69e20b0ada --- /dev/null +++ b/source/extensions/filters/network/common/redis/utility.cc @@ -0,0 +1,26 @@ +#include "extensions/filters/network/common/redis/utility.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Common { +namespace Redis { +namespace Utility { + +Redis::RespValue makeAuthCommand(const std::string& password) { + Redis::RespValue auth_command, value; + auth_command.type(Redis::RespType::Array); + value.type(Redis::RespType::BulkString); + value.asString() = "auth"; + auth_command.asArray().push_back(value); + value.asString() = password; + auth_command.asArray().push_back(value); + return auth_command; +} + +} // namespace Utility +} // namespace Redis +} // namespace Common +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/common/redis/utility.h b/source/extensions/filters/network/common/redis/utility.h new file mode 100644 index 0000000000000..619c470b359eb --- /dev/null +++ b/source/extensions/filters/network/common/redis/utility.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +#include "extensions/filters/network/common/redis/codec.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Common { +namespace Redis { +namespace Utility { + +Redis::RespValue makeAuthCommand(const std::string& password); + +} // namespace Utility +} // namespace Redis +} // namespace Common +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/redis_proxy/BUILD b/source/extensions/filters/network/redis_proxy/BUILD index 9825a435144e7..fa941441908f2 100644 --- a/source/extensions/filters/network/redis_proxy/BUILD +++ b/source/extensions/filters/network/redis_proxy/BUILD @@ -20,6 +20,17 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "config_interface", + hdrs = ["config.h"], + deps = [ + "//source/common/config:datasource_lib", + "//source/extensions/filters/network:well_known_names", + "//source/extensions/filters/network/common:factory_base_lib", + "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", + ], +) + envoy_cc_library( name = "conn_pool_interface", hdrs = ["conn_pool.h"], @@ -62,6 +73,7 @@ envoy_cc_library( srcs = ["conn_pool_impl.cc"], hdrs = ["conn_pool_impl.h"], deps = [ + ":config_interface", ":conn_pool_interface", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", @@ -73,6 +85,7 @@ envoy_cc_library( "//source/common/upstream:load_balancer_lib", "//source/common/upstream:upstream_lib", "//source/extensions/filters/network/common/redis:client_lib", + "//source/extensions/filters/network/common/redis:utility_lib", "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", ], ) @@ -88,6 +101,7 @@ envoy_cc_library( "//include/envoy/upstream:cluster_manager_interface", "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", + "//source/common/config:datasource_lib", "//source/common/config:utility_lib", "//source/extensions/filters/network/common/redis:codec_interface", "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", @@ -105,6 +119,7 @@ envoy_cc_library( "//source/extensions/filters/network/common:factory_base_lib", "//source/extensions/filters/network/common/redis:codec_lib", "//source/extensions/filters/network/redis_proxy:command_splitter_lib", + "//source/extensions/filters/network/redis_proxy:conn_pool_lib", "//source/extensions/filters/network/redis_proxy:proxy_filter_lib", "//source/extensions/filters/network/redis_proxy:router_lib", ], diff --git a/source/extensions/filters/network/redis_proxy/command_splitter.h b/source/extensions/filters/network/redis_proxy/command_splitter.h index 678a9e9807907..53bb929a2dd8d 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter.h @@ -17,7 +17,7 @@ namespace CommandSplitter { */ class SplitRequest { public: - virtual ~SplitRequest() {} + virtual ~SplitRequest() = default; /** * Cancel the request. No further request callbacks will be called. @@ -32,7 +32,19 @@ typedef std::unique_ptr SplitRequestPtr; */ class SplitCallbacks { public: - virtual ~SplitCallbacks() {} + virtual ~SplitCallbacks() = default; + + /** + * Called to verify that commands should be processed. + * @return bool true if commands from this client connection can be processed, false if not. + */ + virtual bool connectionAllowed() PURE; + + /** + * Called when an authentication command has been received. + * @param password supplies the AUTH password provided by the downstream client. + */ + virtual void onAuth(const std::string& password) PURE; /** * Called when the response is ready. @@ -47,7 +59,7 @@ class SplitCallbacks { */ class Instance { public: - virtual ~Instance() {} + virtual ~Instance() = default; /** * Make a split redis request capable of being retried/redirected. diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index e41440ec7da28..d1a19a4a6a96f 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -29,7 +29,7 @@ namespace { // null_pool_callbacks is used for requests that must be filtered and not redirected such as // "asking". -DoNothingPoolCallbacks null_pool_callbacks; +Common::Redis::Client::DoNothingPoolCallbacks null_pool_callbacks; // Create an asking command request. const Common::Redis::RespValue& askingRequest() { @@ -558,14 +558,35 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request, SplitCallbacks& callbacks) { - if (request->type() != Common::Redis::RespType::Array) { + if ((request->type() != Common::Redis::RespType::Array) || request->asArray().empty()) { onInvalidRequest(callbacks); return nullptr; } + for (const Common::Redis::RespValue& value : request->asArray()) { + if (value.type() != Common::Redis::RespType::BulkString) { + onInvalidRequest(callbacks); + return nullptr; + } + } + std::string to_lower_string(request->asArray()[0].asString()); to_lower_table_.toLowerCase(to_lower_string); + if (to_lower_string == Common::Redis::SupportedCommands::auth()) { + if (request->asArray().size() < 2) { + onInvalidRequest(callbacks); + return nullptr; + } + callbacks.onAuth(request->asArray()[1].asString()); + return nullptr; + } + + if (!callbacks.connectionAllowed()) { + callbacks.onResponse(Utility::makeError(Response::get().AuthRequiredError)); + return nullptr; + } + if (to_lower_string == Common::Redis::SupportedCommands::ping()) { // Respond to PING locally. Common::Redis::RespValuePtr pong(new Common::Redis::RespValue()); @@ -581,13 +602,6 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request, return nullptr; } - for (const Common::Redis::RespValue& value : request->asArray()) { - if (value.type() != Common::Redis::RespType::BulkString) { - onInvalidRequest(callbacks); - return nullptr; - } - } - auto handler = handler_lookup_table_.find(to_lower_string.c_str()); if (handler == nullptr) { stats_.unsupported_command_.inc(); diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h index 5ca017ca8fdb9..6c2233fdc9642 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h @@ -31,9 +31,10 @@ struct ResponseValues { const std::string NoUpstreamHost = "no upstream host"; const std::string UpstreamFailure = "upstream failure"; const std::string UpstreamProtocolError = "upstream protocol error"; + const std::string AuthRequiredError = "NOAUTH Authentication required."; }; -typedef ConstSingleton Response; +using Response = ConstSingleton; class Utility { public: @@ -332,18 +333,6 @@ class InstanceImpl : public Instance, Logger::Loggable { TimeSource& time_source_; }; -/** - * DoNothingPoolCallbacks is used for internally generated commands whose response is - * transparently filtered, and redirection never occurs (e.g., "asking", etc.). - */ -class DoNothingPoolCallbacks : public Common::Redis::Client::PoolCallbacks { -public: - // Common::Redis::Client::PoolCallbacks - void onResponse(Common::Redis::RespValuePtr&&) override {} - void onFailure() override {} - bool onRedirection(const Common::Redis::RespValue&) override { return false; } -}; - } // namespace CommandSplitter } // namespace RedisProxy } // namespace NetworkFilters diff --git a/source/extensions/filters/network/redis_proxy/config.cc b/source/extensions/filters/network/redis_proxy/config.cc index 137886868cbfc..dded28114a517 100644 --- a/source/extensions/filters/network/redis_proxy/config.cc +++ b/source/extensions/filters/network/redis_proxy/config.cc @@ -27,7 +27,7 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP ASSERT(proto_config.has_settings()); ProxyFilterConfigSharedPtr filter_config(std::make_shared( - proto_config, context.scope(), context.drainDecision(), context.runtime())); + proto_config, context.scope(), context.drainDecision(), context.runtime(), context.api())); envoy::config::filter::network::redis_proxy::v2::RedisProxy::PrefixRoutes prefix_routes( proto_config.prefix_routes()); @@ -52,7 +52,7 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP upstreams.emplace(cluster, std::make_shared( cluster, context.clusterManager(), Common::Redis::Client::ClientFactoryImpl::instance_, - context.threadLocal(), proto_config.settings(), + context.threadLocal(), proto_config.settings(), context.api(), context.scope().symbolTable())); } diff --git a/source/extensions/filters/network/redis_proxy/config.h b/source/extensions/filters/network/redis_proxy/config.h index 51562452b0bb7..6a6bf7914e8b6 100644 --- a/source/extensions/filters/network/redis_proxy/config.h +++ b/source/extensions/filters/network/redis_proxy/config.h @@ -5,6 +5,8 @@ #include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h" #include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.validate.h" +#include "common/config/datasource.h" + #include "extensions/filters/network/common/factory_base.h" #include "extensions/filters/network/well_known_names.h" @@ -13,11 +15,31 @@ namespace Extensions { namespace NetworkFilters { namespace RedisProxy { +class ProtocolOptionsConfigImpl : public Upstream::ProtocolOptionsConfig { +public: + ProtocolOptionsConfigImpl( + const envoy::config::filter::network::redis_proxy::v2::RedisProtocolOptions& proto_config) + : auth_password_(proto_config.auth_password()) {} + + std::string auth_password(Api::Api& api) const { + return Config::DataSource::read(auth_password_, true, api); + } + + const envoy::api::v2::core::DataSource& auth_password_datasource() const { + return auth_password_; + } + +private: + envoy::api::v2::core::DataSource auth_password_; +}; + /** * Config registration for the redis proxy filter. @see NamedNetworkFilterConfigFactory. */ class RedisProxyFilterConfigFactory - : public Common::FactoryBase { + : public Common::FactoryBase< + envoy::config::filter::network::redis_proxy::v2::RedisProxy, + envoy::config::filter::network::redis_proxy::v2::RedisProtocolOptions> { public: RedisProxyFilterConfigFactory() : FactoryBase(NetworkFilterNames::get().RedisProxy) {} @@ -30,6 +52,12 @@ class RedisProxyFilterConfigFactory Network::FilterFactoryCb createFilterFactoryFromProtoTyped( const envoy::config::filter::network::redis_proxy::v2::RedisProxy& proto_config, Server::Configuration::FactoryContext& context) override; + + Upstream::ProtocolOptionsConfigConstSharedPtr createProtocolOptionsTyped( + const envoy::config::filter::network::redis_proxy::v2::RedisProtocolOptions& proto_config) + override { + return std::make_shared(proto_config); + } }; } // namespace RedisProxy diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index be62ad9cdfaca..2af267ce287cc 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -7,19 +7,26 @@ #include "common/common/assert.h" +#include "extensions/filters/network/redis_proxy/config.h" +#include "extensions/filters/network/well_known_names.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace RedisProxy { namespace ConnPool { +namespace { +Common::Redis::Client::DoNothingPoolCallbacks null_pool_callbacks; +} // namespace + InstanceImpl::InstanceImpl( const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config, - Stats::SymbolTable& symbol_table) + Api::Api& api, Stats::SymbolTable& symbol_table) : cm_(cm), client_factory_(client_factory), tls_(tls.allocateSlot()), config_(config), - symbol_table_(symbol_table) { + api_(api), symbol_table_(symbol_table) { tls_->set([this, cluster_name]( Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { return std::make_shared(*this, dispatcher, cluster_name); @@ -46,6 +53,11 @@ InstanceImpl::ThreadLocalPool::ThreadLocalPool(InstanceImpl& parent, Event::Disp cluster_update_handle_ = parent_.cm_.addThreadLocalClusterUpdateCallbacks(*this); Upstream::ThreadLocalCluster* cluster = parent_.cm_.get(cluster_name_); if (cluster != nullptr) { + auto options = cluster->info()->extensionProtocolOptionsTyped( + NetworkFilterNames::get().RedisProxy); + if (options) { + auth_password_ = options->auth_password(parent_.api_); + } onClusterAddOrUpdateNonVirtual(*cluster); } } @@ -116,6 +128,23 @@ void InstanceImpl::ThreadLocalPool::onHostsRemoved( } } +InstanceImpl::ThreadLocalActiveClientPtr& +InstanceImpl::ThreadLocalPool::threadLocalActiveClient(Upstream::HostConstSharedPtr host) { + ThreadLocalActiveClientPtr& client = client_map_[host]; + if (!client) { + client = std::make_unique(*this); + client->host_ = host; + client->redis_client_ = parent_.client_factory_.create(host, dispatcher_, parent_.config_); + client->redis_client_->addConnectionCallbacks(*client); + if (!auth_password_.empty()) { + // Send an AUTH command to the upstream server. + client->redis_client_->makeRequest(Common::Redis::Utility::makeAuthCommand(auth_password_), + null_pool_callbacks); + } + } + return client; +} + Common::Redis::Client::PoolRequest* InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, const Common::Redis::RespValue& request, @@ -132,13 +161,7 @@ InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, return nullptr; } - ThreadLocalActiveClientPtr& client = client_map_[host]; - if (!client) { - client = std::make_unique(*this); - client->host_ = host; - client->redis_client_ = parent_.client_factory_.create(host, dispatcher_, parent_.config_); - client->redis_client_->addConnectionCallbacks(*client); - } + ThreadLocalActiveClientPtr& client = threadLocalActiveClient(host); // Keep host_address_map_ in sync with client_map_. auto host_cached_by_address = host_address_map_.find(host->address()->asString()); @@ -213,14 +236,7 @@ InstanceImpl::ThreadLocalPool::makeRequestToHost(const std::string& host_address it = host_address_map_.find(host_address_map_key); } - ThreadLocalActiveClientPtr& client = client_map_[it->second]; - if (!client) { - client = std::make_unique(*this); - client->host_ = it->second; - client->redis_client_ = - parent_.client_factory_.create(it->second, dispatcher_, parent_.config_); - client->redis_client_->addConnectionCallbacks(*client); - } + ThreadLocalActiveClientPtr& client = threadLocalActiveClient(it->second); return client->redis_client_->makeRequest(request, callbacks); } diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index 2825cfe6a5fff..979ec7aa8c534 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -23,6 +23,7 @@ #include "extensions/filters/network/common/redis/client_impl.h" #include "extensions/filters/network/common/redis/codec_impl.h" +#include "extensions/filters/network/common/redis/utility.h" #include "extensions/filters/network/redis_proxy/conn_pool.h" namespace Envoy { @@ -40,7 +41,7 @@ class InstanceImpl : public Instance { const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config, - Stats::SymbolTable& symbol_table); + Api::Api& api, Stats::SymbolTable& symbol_table); // RedisProxy::ConnPool::Instance Common::Redis::Client::PoolRequest* makeRequest(const std::string& key, const Common::Redis::RespValue& request, @@ -48,9 +49,11 @@ class InstanceImpl : public Instance { Common::Redis::Client::PoolRequest* makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request, Common::Redis::Client::PoolCallbacks& callbacks) override; - Stats::SymbolTable& symbolTable() { return symbol_table_; } + // Allow the unit test to have access to private members. + friend class RedisConnPoolImplTest; + private: struct ThreadLocalPool; @@ -73,6 +76,7 @@ class InstanceImpl : public Instance { public Upstream::ClusterUpdateCallbacks { ThreadLocalPool(InstanceImpl& parent, Event::Dispatcher& dispatcher, std::string cluster_name); ~ThreadLocalPool(); + ThreadLocalActiveClientPtr& threadLocalActiveClient(Upstream::HostConstSharedPtr host); Common::Redis::Client::PoolRequest* makeRequest(const std::string& key, const Common::Redis::RespValue& request, Common::Redis::Client::PoolCallbacks& callbacks); @@ -96,6 +100,7 @@ class InstanceImpl : public Instance { std::unordered_map client_map_; Envoy::Common::CallbackHandle* host_set_member_update_cb_handle_{}; std::unordered_map host_address_map_; + std::string auth_password_; }; struct LbContextImpl : public Upstream::LoadBalancerContextBase { @@ -113,6 +118,7 @@ class InstanceImpl : public Instance { Common::Redis::Client::ClientFactory& client_factory_; ThreadLocal::SlotPtr tls_; Common::Redis::Client::ConfigImpl config_; + Api::Api& api_; Stats::SymbolTable& symbol_table_; }; diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index acc5ccca0e211..58628bab6f624 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -7,6 +7,7 @@ #include "common/common/assert.h" #include "common/common/fmt.h" +#include "common/config/datasource.h" #include "common/config/utility.h" namespace Envoy { @@ -16,10 +17,12 @@ namespace RedisProxy { ProxyFilterConfig::ProxyFilterConfig( const envoy::config::filter::network::redis_proxy::v2::RedisProxy& config, Stats::Scope& scope, - const Network::DrainDecision& drain_decision, Runtime::Loader& runtime) + const Network::DrainDecision& drain_decision, Runtime::Loader& runtime, Api::Api& api) : drain_decision_(drain_decision), runtime_(runtime), stat_prefix_(fmt::format("redis.{}.", config.stat_prefix())), - stats_(generateStats(stat_prefix_, scope)) {} + stats_(generateStats(stat_prefix_, scope)), + downstream_auth_password_( + Config::DataSource::read(config.downstream_auth_password(), true, api)) {} ProxyStats ProxyFilterConfig::generateStats(const std::string& prefix, Stats::Scope& scope) { return { @@ -33,6 +36,7 @@ ProxyFilter::ProxyFilter(Common::Redis::DecoderFactory& factory, config_(config) { config_->stats_.downstream_cx_total_.inc(); config_->stats_.downstream_cx_active_.inc(); + connection_allowed_ = config_->downstream_auth_password_.empty(); } ProxyFilter::~ProxyFilter() { @@ -73,6 +77,23 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) { } } +void ProxyFilter::onAuth(PendingRequest& request, const std::string& password) { + Common::Redis::RespValuePtr response{new Common::Redis::RespValue()}; + if (config_->downstream_auth_password_.empty()) { + response->type(Common::Redis::RespType::Error); + response->asString() = "ERR Client sent AUTH, but no password is set"; + } else if (password == config_->downstream_auth_password_) { + response->type(Common::Redis::RespType::SimpleString); + response->asString() = "OK"; + connection_allowed_ = true; + } else { + response->type(Common::Redis::RespType::Error); + response->asString() = "ERR invalid password"; + connection_allowed_ = false; + } + request.onResponse(std::move(response)); +} + void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePtr&& value) { ASSERT(!pending_requests_.empty()); request.pending_response_ = std::move(value); diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.h b/source/extensions/filters/network/redis_proxy/proxy_filter.h index ae2141a322d94..06afd84bc45a9 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.h +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.h @@ -52,13 +52,14 @@ class ProxyFilterConfig { public: ProxyFilterConfig(const envoy::config::filter::network::redis_proxy::v2::RedisProxy& config, Stats::Scope& scope, const Network::DrainDecision& drain_decision, - Runtime::Loader& runtime); + Runtime::Loader& runtime, Api::Api& api); const Network::DrainDecision& drain_decision_; Runtime::Loader& runtime_; const std::string stat_prefix_; const std::string redis_drain_close_runtime_key_{"redis.drain_close_enabled"}; ProxyStats stats_; + const std::string downstream_auth_password_; private: static ProxyStats generateStats(const std::string& prefix, Stats::Scope& scope); @@ -91,12 +92,16 @@ class ProxyFilter : public Network::ReadFilter, // Common::Redis::DecoderCallbacks void onRespValue(Common::Redis::RespValuePtr&& value) override; + bool connectionAllowed() { return connection_allowed_; } + private: struct PendingRequest : public CommandSplitter::SplitCallbacks { PendingRequest(ProxyFilter& parent); ~PendingRequest(); // RedisProxy::CommandSplitter::SplitCallbacks + bool connectionAllowed() override { return parent_.connectionAllowed(); } + void onAuth(const std::string& password) override { parent_.onAuth(*this, password); } void onResponse(Common::Redis::RespValuePtr&& value) override { parent_.onResponse(*this, std::move(value)); } @@ -106,6 +111,7 @@ class ProxyFilter : public Network::ReadFilter, CommandSplitter::SplitRequestPtr request_handle_; }; + void onAuth(PendingRequest& request, const std::string& password); void onResponse(PendingRequest& request, Common::Redis::RespValuePtr&& value); Common::Redis::DecoderPtr decoder_; @@ -115,6 +121,7 @@ class ProxyFilter : public Network::ReadFilter, Buffer::OwnedImpl encoder_buffer_; Network::ReadFilterCallbacks* callbacks_{}; std::list pending_requests_; + bool connection_allowed_; }; } // namespace RedisProxy diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index f43aabe7622a6..f0be06351adfc 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -43,6 +43,8 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { } private: + friend class RedisHealthCheckerTest; + struct RedisActiveHealthCheckSession : public ActiveHealthCheckSession, public Extensions::NetworkFilters::Common::Redis::Client::Config, diff --git a/test/extensions/clusters/redis/redis_cluster_integration_test.cc b/test/extensions/clusters/redis/redis_cluster_integration_test.cc index 28d0a67fa0236..56737d3a0331a 100644 --- a/test/extensions/clusters/redis/redis_cluster_integration_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_integration_test.cc @@ -22,20 +22,6 @@ const std::string CONFIG = R"EOF( address: 127.0.0.1 port_value: 0 static_resources: - clusters: - - name: cluster_0 - lb_policy: RANDOM - hosts: - - socket_address: - address: 127.0.0.1 - port_value: 0 - cluster_type: - name: envoy.clusters.redis - typed_config: - "@type": type.googleapis.com/google.protobuf.Struct - value: - cluster_refresh_rate: 1s - cluster_refresh_timeout: 4s listeners: name: listener_0 address: @@ -50,6 +36,28 @@ const std::string CONFIG = R"EOF( cluster: cluster_0 settings: op_timeout: 5s + clusters: + - name: cluster_0 + lb_policy: RANDOM + hosts: + - socket_address: + address: 127.0.0.1 + port_value: 0 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 1s + cluster_refresh_timeout: 4s +)EOF"; + +// This is the basic redis_proxy configuration with an upstream +// authentication password specified. + +const std::string CONFIG_WITH_AUTH = CONFIG + R"EOF( + extension_protocol_options: + envoy.redis_proxy: { auth_password: { inline_string: somepassword }} )EOF"; // This function encodes commands as an array of bulkstrings as transmitted by Redis clients to @@ -111,32 +119,71 @@ class RedisClusterIntegrationTest : public testing::TestWithParamclearData(); redis_client->write(request); - FakeRawConnectionPtr fake_upstream_connection; - EXPECT_TRUE(fake_upstreams_[stream_index]->waitForRawConnection(fake_upstream_connection)); - EXPECT_TRUE(fake_upstream_connection->waitForData(request.size(), &proxy_to_server)); - // The original request should be the same as the data received by the server. - EXPECT_EQ(request, proxy_to_server); + if (fake_upstream_connection.get() == nullptr) { + expect_auth_command = (!auth_password.empty()); + EXPECT_TRUE(upstream->waitForRawConnection(fake_upstream_connection)); + } + + if (expect_auth_command) { + std::string auth_command = makeBulkStringArray({"auth", auth_password}); + EXPECT_TRUE(fake_upstream_connection->waitForData(auth_command.size() + request.size(), + &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(auth_command + request, proxy_to_server); + // Send back an OK for the auth command. + EXPECT_TRUE(fake_upstream_connection->write(ok)); + + } else { + EXPECT_TRUE(fake_upstream_connection->waitForData(request.size(), &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(request, proxy_to_server); + } EXPECT_TRUE(fake_upstream_connection->write(response)); redis_client->waitForData(response); // The original response should be received by the fake Redis client. EXPECT_EQ(response, redis_client->data()); + } + + /** + * Simple bi-directional test between a fake Redis client and Redis server. + * @param request supplies Redis client data to transmit to the Redis server. + * @param response supplies Redis server data to transmit to the client. + */ + void simpleRequestAndResponse(const int stream_index, const std::string& request, + const std::string& response) { + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + FakeRawConnectionPtr fake_upstream_connection; + + roundtripToUpstreamStep(fake_upstreams_[stream_index], request, response, redis_client, + fake_upstream_connection, ""); redis_client->close(); EXPECT_TRUE(fake_upstream_connection->close()); } - void expectCallClusterSlot(int stream_index, std::string& response) { + void expectCallClusterSlot(int stream_index, std::string& response, + const std::string& auth_password = "") { std::string cluster_slot_request = makeBulkStringArray({"CLUSTER", "SLOTS"}); fake_upstreams_[stream_index]->set_allow_unexpected_disconnects(true); @@ -145,10 +192,19 @@ class RedisClusterIntegrationTest : public testing::TestWithParamwaitForRawConnection(fake_upstream_connection_)); - EXPECT_TRUE(fake_upstream_connection_->waitForData(cluster_slot_request.size(), - &proxied_cluster_slot_request)); - - EXPECT_EQ(cluster_slot_request, proxied_cluster_slot_request); + if (auth_password.empty()) { + EXPECT_TRUE(fake_upstream_connection_->waitForData(cluster_slot_request.size(), + &proxied_cluster_slot_request)); + EXPECT_EQ(cluster_slot_request, proxied_cluster_slot_request); + } else { + std::string auth_request = makeBulkStringArray({"auth", auth_password}); + std::string ok = "+OK\r\n"; + + EXPECT_TRUE(fake_upstream_connection_->waitForData( + auth_request.size() + cluster_slot_request.size(), &proxied_cluster_slot_request)); + EXPECT_EQ(auth_request + cluster_slot_request, proxied_cluster_slot_request); + EXPECT_TRUE(fake_upstream_connection_->write(ok)); + } EXPECT_TRUE(fake_upstream_connection_->write(response)); EXPECT_TRUE(fake_upstream_connection_->close()); @@ -210,10 +266,21 @@ class RedisClusterIntegrationTest : public testing::TestWithParamlocalAddress()->ip(), fake_upstreams_[1]->localAddress()->ip()); + expectCallClusterSlot(0, cluster_slot_response, "somepassword"); + }; + + initialize(); + + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + FakeRawConnectionPtr fake_upstream_connection; + + roundtripToUpstreamStep(fake_upstreams_[random_index_], makeBulkStringArray({"get", "foo"}), + "$3\r\nbar\r\n", redis_client, fake_upstream_connection, "somepassword"); + + redis_client->close(); + EXPECT_TRUE(fake_upstream_connection->close()); +} + } // namespace } // namespace Envoy diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index ce4da493a5f8c..c1df531047ad9 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -81,7 +81,7 @@ class RedisClusterTest : public testing::Test, cluster_config, MessageUtil::downcastAndValidate( config), - *this, cm, runtime_, dns_resolver_, factory_context, std::move(scope), false)); + *this, cm, runtime_, *api_, dns_resolver_, factory_context, std::move(scope), false)); // This allows us to create expectation on cluster slot response without waiting for // makeRequest. pool_callbacks_ = &cluster_->redis_discovery_session_; @@ -370,6 +370,22 @@ class RedisClusterTest : public testing::Test, expectHealthyHosts(std::list({"127.0.0.1:22120"})); } + void exerciseStubs() { + EXPECT_CALL(dispatcher_, createTimer_(_)); + RedisCluster::RedisDiscoverySession discovery_session(*cluster_, *this); + EXPECT_FALSE(discovery_session.enableHashtagging()); + EXPECT_EQ(discovery_session.bufferFlushTimeoutInMs(), std::chrono::milliseconds(0)); + + NetworkFilters::Common::Redis::RespValue dummy_value; + dummy_value.type(NetworkFilters::Common::Redis::RespType::Error); + dummy_value.asString() = "dummy text"; + EXPECT_TRUE(discovery_session.onRedirection(dummy_value)); + + RedisCluster::RedisDiscoveryClient discovery_client(discovery_session); + EXPECT_NO_THROW(discovery_client.onAboveWriteBufferHighWatermark()); + EXPECT_NO_THROW(discovery_client.onBelowWriteBufferLowWatermark()); + } + Stats::IsolatedStoreImpl stats_store_; Ssl::MockContextManager ssl_context_manager_; std::shared_ptr> dns_resolver_{ @@ -522,6 +538,9 @@ TEST_F(RedisClusterTest, Basic) { testBasicSetup(basic_yaml_hosts, "foo.bar.com"); testBasicSetup(basic_yaml_load_assignment, "foo.bar.com"); + + // Exercise stubbed out interfaces for coverage. + exerciseStubs(); } TEST_F(RedisClusterTest, RedisResolveFailure) { diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index c1d8269f5b024..bdd12408c0145 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -91,6 +91,15 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF upstream_connection_->raiseEvent(Network::ConnectionEvent::Connected); } + void respond() { + Common::Redis::RespValuePtr response1{new Common::Redis::RespValue()}; + response1->type(Common::Redis::RespType::SimpleString); + response1->asString() = "OK"; + ClientImpl* client_impl = dynamic_cast(client_.get()); + EXPECT_NE(client_impl, nullptr); + client_impl->onRespValue(std::move(response1)); + } + const std::string cluster_name_{"foo"}; std::shared_ptr host_{new NiceMock()}; Event::MockDispatcher dispatcher_; @@ -505,6 +514,23 @@ TEST_F(RedisClientImplTest, OpTimeout) { onConnected(); + EXPECT_EQ(1UL, host_->cluster_.stats_.upstream_rq_total_.value()); + EXPECT_EQ(1UL, host_->cluster_.stats_.upstream_rq_active_.value()); + + EXPECT_CALL(callbacks1, onResponse_(_)); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + respond(); + + EXPECT_EQ(1UL, host_->cluster_.stats_.upstream_rq_total_.value()); + EXPECT_EQ(0UL, host_->cluster_.stats_.upstream_rq_active_.value()); + + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + EXPECT_CALL(*connect_or_op_timer_, enableTimer(_)); + handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::TIMEOUT)); EXPECT_CALL(*upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); EXPECT_CALL(callbacks1, onFailure()); @@ -513,6 +539,8 @@ TEST_F(RedisClientImplTest, OpTimeout) { EXPECT_EQ(1UL, host_->cluster_.stats_.upstream_rq_timeout_.value()); EXPECT_EQ(1UL, host_->stats_.rq_timeout_.value()); + EXPECT_EQ(2UL, host_->cluster_.stats_.upstream_rq_total_.value()); + EXPECT_EQ(0UL, host_->cluster_.stats_.upstream_rq_active_.value()); } TEST_F(RedisClientImplTest, AskRedirection) { diff --git a/test/extensions/filters/network/redis_proxy/BUILD b/test/extensions/filters/network/redis_proxy/BUILD index eb6d003e5927e..a6494ab5a05bf 100644 --- a/test/extensions/filters/network/redis_proxy/BUILD +++ b/test/extensions/filters/network/redis_proxy/BUILD @@ -39,9 +39,11 @@ envoy_extension_cc_test( "//source/common/network:utility_lib", "//source/common/upstream:upstream_includes", "//source/common/upstream:upstream_lib", + "//source/extensions/filters/network/common/redis:utility_lib", "//source/extensions/filters/network/redis_proxy:conn_pool_lib", "//test/extensions/filters/network/common/redis:redis_mocks", "//test/extensions/filters/network/common/redis:test_utils_lib", + "//test/mocks/api:api_mocks", "//test/mocks/network:network_mocks", "//test/mocks/thread_local:thread_local_mocks", "//test/mocks/upstream:upstream_mocks", @@ -54,11 +56,12 @@ envoy_extension_cc_test( extension_name = "envoy.filters.network.redis_proxy", deps = [ ":redis_mocks", - "//source/common/config:filter_json_lib", + "//source/common/config:json_utility_lib", "//source/common/event:dispatcher_lib", "//source/extensions/filters/network/redis_proxy:proxy_filter_lib", "//test/extensions/filters/network/common/redis:redis_mocks", "//test/mocks:common_lib", + "//test/mocks/api:api_mocks", "//test/mocks/network:network_mocks", "//test/mocks/upstream:upstream_mocks", "//test/test_common:utility_lib", diff --git a/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc b/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc index 555088f075731..d6b471e948aa4 100644 --- a/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc +++ b/test/extensions/filters/network/redis_proxy/command_lookup_speed_test.cc @@ -26,9 +26,11 @@ namespace RedisProxy { class NoOpSplitCallbacks : public CommandSplitter::SplitCallbacks { public: - NoOpSplitCallbacks() {} - ~NoOpSplitCallbacks() {} + NoOpSplitCallbacks() = default; + ~NoOpSplitCallbacks() = default; + bool connectionAllowed() override { return true; } + void onAuth(const std::string&) override {} void onResponse(Common::Redis::RespValuePtr&&) override {} }; diff --git a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc index 207f6871aed8e..79df378c5fbfb 100644 --- a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc @@ -71,6 +71,29 @@ class RedisCommandSplitterImplTest : public testing::Test { SplitRequestPtr handle_; }; +TEST_F(RedisCommandSplitterImplTest, AuthWithNoPassword) { + Common::Redis::RespValue response; + response.type(Common::Redis::RespType::Error); + response.asString() = Response::get().InvalidRequest; + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"auth"}); + EXPECT_EQ(nullptr, splitter_.makeRequest(std::move(request), callbacks_)); + + EXPECT_EQ(1UL, store_.counter("redis.foo.splitter.invalid_request").value()); +} + +TEST_F(RedisCommandSplitterImplTest, CommandWhenAuthStillNeeded) { + Common::Redis::RespValue response; + response.type(Common::Redis::RespType::Error); + response.asString() = "NOAUTH Authentication required."; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(false)); + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"get", "foo"}); + EXPECT_EQ(nullptr, splitter_.makeRequest(std::move(request), callbacks_)); +} + TEST_F(RedisCommandSplitterImplTest, InvalidRequestNotArray) { Common::Redis::RespValue response; response.type(Common::Redis::RespType::Error); @@ -82,10 +105,23 @@ TEST_F(RedisCommandSplitterImplTest, InvalidRequestNotArray) { EXPECT_EQ(1UL, store_.counter("redis.foo.splitter.invalid_request").value()); } +TEST_F(RedisCommandSplitterImplTest, InvalidRequestEmptyArray) { + Common::Redis::RespValue response; + response.type(Common::Redis::RespType::Error); + response.asString() = Response::get().InvalidRequest; + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + request->type(Common::Redis::RespType::Array); + EXPECT_EQ(nullptr, splitter_.makeRequest(std::move(request), callbacks_)); + + EXPECT_EQ(1UL, store_.counter("redis.foo.splitter.invalid_request").value()); +} + TEST_F(RedisCommandSplitterImplTest, InvalidRequestArrayTooSmall) { Common::Redis::RespValue response; response.type(Common::Redis::RespType::Error); response.asString() = Response::get().InvalidRequest; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {"incr"}); @@ -111,6 +147,7 @@ TEST_F(RedisCommandSplitterImplTest, UnsupportedCommand) { Common::Redis::RespValue response; response.type(Common::Redis::RespType::Error); response.asString() = "unsupported command 'newcommand'"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {"newcommand", "hello"}); @@ -123,6 +160,7 @@ class RedisSingleServerRequestTest : public RedisCommandSplitterImplTest, public testing::WithParamInterface { public: void makeRequest(const std::string& hash_key, Common::Redis::RespValuePtr&& request) { + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(*conn_pool_, makeRequest(hash_key, Ref(*request), _)) .WillOnce(DoAll(WithArg<2>(SaveArgAddress(&pool_callbacks_)), Return(&pool_request_))); handle_ = splitter_.makeRequest(std::move(request), callbacks_); @@ -233,6 +271,7 @@ TEST_P(RedisSingleServerRequestTest, Cancel) { TEST_P(RedisSingleServerRequestTest, NoUpstream) { InSequence s; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {GetParam(), "hello"}); EXPECT_CALL(*conn_pool_, makeRequest("hello", Ref(*request), _)).WillOnce(Return(nullptr)); @@ -261,6 +300,7 @@ TEST_F(RedisSingleServerRequestTest, PingSuccess) { response.type(Common::Redis::RespType::SimpleString); response.asString() = "PONG"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); handle_ = splitter_.makeRequest(std::move(request), callbacks_); EXPECT_EQ(nullptr, handle_); @@ -323,11 +363,13 @@ TEST_F(RedisSingleServerRequestTest, EvalWrongNumberOfArgs) { response.type(Common::Redis::RespType::Error); response.asString() = "wrong number of arguments for 'eval' command"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); makeBulkStringArray(*request1, {"eval", "return {ARGV[1]}"}); EXPECT_EQ(nullptr, splitter_.makeRequest(std::move(request1), callbacks_)); response.asString() = "wrong number of arguments for 'evalsha' command"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); makeBulkStringArray(*request2, {"evalsha", "return {ARGV[1]}", "1"}); EXPECT_EQ(nullptr, splitter_.makeRequest(std::move(request2), callbacks_)); @@ -336,6 +378,7 @@ TEST_F(RedisSingleServerRequestTest, EvalWrongNumberOfArgs) { TEST_F(RedisSingleServerRequestTest, EvalNoUpstream) { InSequence s; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {"eval", "return {ARGV[1]}", "1", "key", "arg"}); EXPECT_CALL(*conn_pool_, makeRequest("key", Ref(*request), _)).WillOnce(Return(nullptr)); @@ -565,6 +608,9 @@ class RedisMGETCommandHandlerTest : public RedisCommandSplitterImplTest { pool_callbacks_.resize(num_gets); std::vector tmp_pool_requests(num_gets); pool_requests_.swap(tmp_pool_requests); + + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + for (uint32_t i = 0; i < num_gets; i++) { makeBulkStringArray(expected_requests_[i], {"get", std::to_string(i)}); Common::Redis::Client::PoolRequest* request_to_use = nullptr; @@ -959,6 +1005,9 @@ class RedisMSETCommandHandlerTest : public RedisCommandSplitterImplTest { pool_callbacks_.resize(num_sets); std::vector tmp_pool_requests(num_sets); pool_requests_.swap(tmp_pool_requests); + + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + for (uint32_t i = 0; i < num_sets; i++) { makeBulkStringArray(expected_requests_[i], {"set", std::to_string(i), std::to_string(i)}); Common::Redis::Client::PoolRequest* request_to_use = nullptr; @@ -1057,6 +1106,7 @@ TEST_F(RedisMSETCommandHandlerTest, WrongNumberOfArgs) { Common::Redis::RespValue response; response.type(Common::Redis::RespType::Error); response.asString() = "wrong number of arguments for 'mset' command"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&response))); Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; makeBulkStringArray(*request, {"mset", "foo", "bar", "fizz"}); @@ -1212,6 +1262,9 @@ class RedisSplitKeysSumResultHandlerTest : public RedisCommandSplitterImplTest, pool_callbacks_.resize(num_commands); std::vector tmp_pool_requests(num_commands); pool_requests_.swap(tmp_pool_requests); + + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + for (uint32_t i = 0; i < num_commands; i++) { makeBulkStringArray(expected_requests_[i], {GetParam(), std::to_string(i)}); Common::Redis::Client::PoolRequest* request_to_use = nullptr; @@ -1435,6 +1488,7 @@ INSTANTIATE_TEST_SUITE_P( class RedisSingleServerRequestWithLatencyMicrosTest : public RedisSingleServerRequestTest { public: void makeRequest(const std::string& hash_key, Common::Redis::RespValuePtr&& request) { + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); EXPECT_CALL(*conn_pool_, makeRequest(hash_key, Ref(*request), _)) .WillOnce(DoAll(WithArg<2>(SaveArgAddress(&pool_callbacks_)), Return(&pool_request_))); handle_ = splitter_.makeRequest(std::move(request), callbacks_); diff --git a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc index c002c19836e3e..8536c823f7630 100644 --- a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc @@ -5,11 +5,13 @@ #include "common/stats/fake_symbol_table_impl.h" #include "common/upstream/upstream_impl.h" +#include "extensions/filters/network/common/redis/utility.h" #include "extensions/filters/network/redis_proxy/conn_pool_impl.h" #include "test/extensions/filters/network/common/redis/mocks.h" #include "test/extensions/filters/network/common/redis/test_utils.h" #include "test/extensions/filters/network/redis_proxy/mocks.h" +#include "test/mocks/api/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/thread_local/mocks.h" #include "test/mocks/upstream/mocks.h" @@ -46,9 +48,12 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client EXPECT_CALL(cm_, get("fake_cluster")).WillOnce(Return(nullptr)); } - conn_pool_ = std::make_unique( + std::unique_ptr conn_pool_impl = std::make_unique( cluster_name_, cm_, *this, tls_, - Common::Redis::Client::createConnPoolSettings(20, hashtagging, true), *symbol_table_); + Common::Redis::Client::createConnPoolSettings(20, hashtagging, true), api_, *symbol_table_); + // Set the authentication password for this connection pool. + conn_pool_impl->tls_->getTyped().auth_password_ = auth_password_; + conn_pool_ = std::move(conn_pool_impl); test_address_ = Network::Utility::resolveUrl("tcp://127.0.0.1:3000"); } @@ -61,6 +66,9 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client Common::Redis::RespValue value; Common::Redis::Client::MockPoolCallbacks callbacks; Common::Redis::Client::MockPoolRequest active_request; + if (create_client && !auth_password_.empty()) { + EXPECT_CALL(*client_, makeRequest(_, _)).WillOnce(Return(nullptr)); + } EXPECT_CALL(*cm_.thread_local_cluster_.lb_.host_, address()) .WillRepeatedly(Return(test_address_)); EXPECT_CALL(*client_, makeRequest(Ref(value), Ref(callbacks))) @@ -86,6 +94,8 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client Upstream::ClusterUpdateCallbacks* update_callbacks_{}; Common::Redis::Client::MockClient* client_{}; Network::Address::InstanceConstSharedPtr test_address_; + std::string auth_password_; + NiceMock api_; }; TEST_F(RedisConnPoolImplTest, Basic) { @@ -117,6 +127,40 @@ TEST_F(RedisConnPoolImplTest, Basic) { tls_.shutdownThread(); }; +TEST_F(RedisConnPoolImplTest, BasicWithAuthPassword) { + InSequence s; + + auth_password_ = "testing password"; + setup(); + + Common::Redis::RespValue value; + Common::Redis::Client::MockPoolRequest auth_request, active_request; + Common::Redis::Client::MockPoolCallbacks callbacks; + Common::Redis::Client::MockClient* client = new NiceMock(); + + EXPECT_CALL(cm_.thread_local_cluster_.lb_, chooseHost(_)) + .WillOnce(Invoke([&](Upstream::LoadBalancerContext* context) -> Upstream::HostConstSharedPtr { + EXPECT_EQ(context->computeHashKey().value(), MurmurHash::murmurHash2_64("hash_key")); + EXPECT_EQ(context->metadataMatchCriteria(), nullptr); + EXPECT_EQ(context->downstreamConnection(), nullptr); + return cm_.thread_local_cluster_.lb_.host_; + })); + EXPECT_CALL(*this, create_(_)).WillOnce(Return(client)); + EXPECT_CALL( + *client, + makeRequest(Eq(NetworkFilters::Common::Redis::Utility::makeAuthCommand(auth_password_)), _)) + .WillOnce(Return(&auth_request)); + EXPECT_CALL(*cm_.thread_local_cluster_.lb_.host_, address()) + .WillRepeatedly(Return(test_address_)); + EXPECT_CALL(*client, makeRequest(Ref(value), Ref(callbacks))).WillOnce(Return(&active_request)); + Common::Redis::Client::PoolRequest* request = + conn_pool_->makeRequest("hash_key", value, callbacks); + EXPECT_EQ(&active_request, request); + + EXPECT_CALL(*client, close()); + tls_.shutdownThread(); +}; + TEST_F(RedisConnPoolImplTest, Hashtagging) { InSequence s; @@ -381,6 +425,59 @@ TEST_F(RedisConnPoolImplTest, makeRequestToHost) { tls_.shutdownThread(); } +TEST_F(RedisConnPoolImplTest, makeRequestToHostWithAuthPassword) { + InSequence s; + + auth_password_ = "superduperpassword"; + setup(false); + + Common::Redis::RespValue value; + Common::Redis::Client::MockPoolRequest auth_request1, active_request1; + Common::Redis::Client::MockPoolRequest auth_request2, active_request2; + Common::Redis::Client::MockPoolCallbacks callbacks1; + Common::Redis::Client::MockPoolCallbacks callbacks2; + Common::Redis::Client::MockClient* client1 = new NiceMock(); + Common::Redis::Client::MockClient* client2 = new NiceMock(); + Upstream::HostConstSharedPtr host1; + Upstream::HostConstSharedPtr host2; + + // There is no cluster yet, so makeRequestToHost() should fail. + EXPECT_EQ(nullptr, conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1)); + // Add the cluster now. + update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_); + + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host1), Return(client1))); + EXPECT_CALL( + *client1, + makeRequest(Eq(NetworkFilters::Common::Redis::Utility::makeAuthCommand(auth_password_)), _)) + .WillOnce(Return(&auth_request1)); + EXPECT_CALL(*client1, makeRequest(Ref(value), Ref(callbacks1))) + .WillOnce(Return(&active_request1)); + Common::Redis::Client::PoolRequest* request1 = + conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1); + EXPECT_EQ(&active_request1, request1); + EXPECT_EQ(host1->address()->asString(), "10.0.0.1:3000"); + + // IPv6 address returned from Redis server will not have square brackets + // around it, while Envoy represents Address::Ipv6Instance addresses with square brackets around + // the address. + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host2), Return(client2))); + EXPECT_CALL( + *client2, + makeRequest(Eq(NetworkFilters::Common::Redis::Utility::makeAuthCommand(auth_password_)), _)) + .WillOnce(Return(&auth_request2)); + EXPECT_CALL(*client2, makeRequest(Ref(value), Ref(callbacks2))) + .WillOnce(Return(&active_request2)); + Common::Redis::Client::PoolRequest* request2 = + conn_pool_->makeRequestToHost("2001:470:813B:0:0:0:0:1:3333", value, callbacks2); + EXPECT_EQ(&active_request2, request2); + EXPECT_EQ(host2->address()->asString(), "[2001:470:813b::1]:3333"); + + EXPECT_CALL(*client2, close()); + EXPECT_CALL(*client1, close()); + tls_.shutdownThread(); +} + } // namespace ConnPool } // namespace RedisProxy } // namespace NetworkFilters diff --git a/test/extensions/filters/network/redis_proxy/mocks.h b/test/extensions/filters/network/redis_proxy/mocks.h index 381ef0a19bf59..e194374f1e240 100644 --- a/test/extensions/filters/network/redis_proxy/mocks.h +++ b/test/extensions/filters/network/redis_proxy/mocks.h @@ -61,6 +61,9 @@ class MockSplitCallbacks : public SplitCallbacks { MockSplitCallbacks(); ~MockSplitCallbacks(); + MOCK_METHOD0(connectionAllowed, bool()); + MOCK_METHOD1(onAuth, void(const std::string& password)); + void onResponse(Common::Redis::RespValuePtr&& value) override { onResponse_(value); } MOCK_METHOD1(onResponse_, void(Common::Redis::RespValuePtr& value)); diff --git a/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc b/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc index 4cb73b89186b2..934d472e9c2c7 100644 --- a/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc +++ b/test/extensions/filters/network/redis_proxy/proxy_filter_test.cc @@ -1,12 +1,13 @@ #include #include -#include "common/config/filter_json.h" +#include "common/config/json_utility.h" #include "extensions/filters/network/redis_proxy/proxy_filter.h" #include "test/extensions/filters/network/common/redis/mocks.h" #include "test/extensions/filters/network/redis_proxy/mocks.h" +#include "test/mocks/api/mocks.h" #include "test/mocks/common.h" #include "test/mocks/network/mocks.h" #include "test/mocks/upstream/mocks.h" @@ -36,7 +37,23 @@ envoy::config::filter::network::redis_proxy::v2::RedisProxy parseProtoFromJson(const std::string& json_string) { envoy::config::filter::network::redis_proxy::v2::RedisProxy config; auto json_object_ptr = Json::Factory::loadFromString(json_string); - Config::FilterJson::translateRedisProxy(*json_object_ptr, config); + + config.set_cluster(json_object_ptr->getString("cluster_name")); + config.set_stat_prefix(json_object_ptr->getString("stat_prefix", "")); + + const auto json_conn_pool = json_object_ptr->getObject("conn_pool"); + auto* conn_pool = config.mutable_settings(); + JSON_UTIL_SET_DURATION(*json_conn_pool, *conn_pool, op_timeout); + + if (json_object_ptr->hasObject("downstream_auth_password")) { + auto downstream_auth_obj = json_object_ptr->getObject("downstream_auth_password"); + // Process only the inline_string specifier for DataSource downstream_auth_password, + // since that is the only form of the DataSource used in this test. + if (downstream_auth_obj->hasObject("inline_string")) { + const std::string password = downstream_auth_obj->getString("inline_string"); + config.mutable_downstream_auth_password()->set_inline_string(password); + } + } return config; } @@ -46,6 +63,7 @@ class RedisProxyFilterConfigTest : public testing::Test { Stats::IsolatedStoreImpl store_; Network::MockDrainDecision drain_decision_; Runtime::MockLoader runtime_; + NiceMock api_; }; TEST_F(RedisProxyFilterConfigTest, Normal) { @@ -59,8 +77,9 @@ TEST_F(RedisProxyFilterConfigTest, Normal) { envoy::config::filter::network::redis_proxy::v2::RedisProxy proto_config = parseProtoFromJson(json_string); - ProxyFilterConfig config(proto_config, store_, drain_decision_, runtime_); + ProxyFilterConfig config(proto_config, store_, drain_decision_, runtime_, api_); EXPECT_EQ("redis.foo.", config.stat_prefix_); + EXPECT_TRUE(config.downstream_auth_password_.empty()); } TEST_F(RedisProxyFilterConfigTest, BadRedisProxyConfig) { @@ -74,10 +93,25 @@ TEST_F(RedisProxyFilterConfigTest, BadRedisProxyConfig) { EXPECT_THROW(parseProtoFromJson(json_string), Json::Exception); } +TEST_F(RedisProxyFilterConfigTest, DownstreamAuthPasswordSet) { + std::string json_string = R"EOF( + { + "cluster_name": "fake_cluster", + "stat_prefix": "foo", + "conn_pool": { "op_timeout_ms" : 10 }, + "downstream_auth_password": { "inline_string": "somepassword" } + } + )EOF"; + + envoy::config::filter::network::redis_proxy::v2::RedisProxy proto_config = + parseProtoFromJson(json_string); + ProxyFilterConfig config(proto_config, store_, drain_decision_, runtime_, api_); + EXPECT_EQ(config.downstream_auth_password_, "somepassword"); +} + class RedisProxyFilterTest : public testing::Test, public Common::Redis::DecoderFactory { public: - RedisProxyFilterTest() { - std::string json_string = R"EOF( + const std::string default_config = R"EOF( { "cluster_name": "fake_cluster", "stat_prefix": "foo", @@ -85,9 +119,10 @@ class RedisProxyFilterTest : public testing::Test, public Common::Redis::Decoder } )EOF"; + RedisProxyFilterTest(const std::string& json_string) { envoy::config::filter::network::redis_proxy::v2::RedisProxy proto_config = parseProtoFromJson(json_string); - config_.reset(new ProxyFilterConfig(proto_config, store_, drain_decision_, runtime_)); + config_.reset(new ProxyFilterConfig(proto_config, store_, drain_decision_, runtime_, api_)); filter_ = std::make_unique(*this, Common::Redis::EncoderPtr{encoder_}, splitter_, config_); filter_->initializeReadFilterCallbacks(filter_callbacks_); @@ -100,7 +135,9 @@ class RedisProxyFilterTest : public testing::Test, public Common::Redis::Decoder filter_->onBelowWriteBufferLowWatermark(); } - ~RedisProxyFilterTest() { + RedisProxyFilterTest() : RedisProxyFilterTest(default_config) {} + + ~RedisProxyFilterTest() override { filter_.reset(); for (const Stats::GaugeSharedPtr& gauge : store_.gauges()) { EXPECT_EQ(0U, gauge->value()); @@ -123,6 +160,7 @@ class RedisProxyFilterTest : public testing::Test, public Common::Redis::Decoder ProxyFilterConfigSharedPtr config_; std::unique_ptr filter_; NiceMock filter_callbacks_; + NiceMock api_; }; TEST_F(RedisProxyFilterTest, OutOfOrderResponseWithDrainClose) { @@ -259,6 +297,101 @@ TEST_F(RedisProxyFilterTest, ProtocolError) { EXPECT_EQ(1UL, store_.counter("redis.foo.downstream_cx_protocol_error").value()); } +TEST_F(RedisProxyFilterTest, AuthWhenNotRequired) { + InSequence s; + + Buffer::OwnedImpl fake_data; + Common::Redis::RespValuePtr request(new Common::Redis::RespValue()); + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + decoder_callbacks_->onRespValue(std::move(request)); + })); + EXPECT_CALL(splitter_, makeRequest_(Ref(*request), _)) + .WillOnce( + Invoke([&](const Common::Redis::RespValue&, + CommandSplitter::SplitCallbacks& callbacks) -> CommandSplitter::SplitRequest* { + EXPECT_TRUE(callbacks.connectionAllowed()); + Common::Redis::RespValuePtr error(new Common::Redis::RespValue()); + error->type(Common::Redis::RespType::Error); + error->asString() = "ERR Client sent AUTH, but no password is set"; + EXPECT_CALL(*encoder_, encode(Eq(ByRef(*error)), _)); + EXPECT_CALL(filter_callbacks_.connection_, write(_, _)); + callbacks.onAuth("foo"); + // callbacks cannot be accessed now. + EXPECT_TRUE(filter_->connectionAllowed()); + return nullptr; + })); + + EXPECT_EQ(Network::FilterStatus::Continue, filter_->onData(fake_data, false)); +} + +const std::string downstream_auth_password_config = R"EOF( + { + "cluster_name": "fake_cluster", + "stat_prefix": "foo", + "conn_pool": { "op_timeout_ms" : 10 }, + "downstream_auth_password": { "inline_string": "somepassword" } + } + )EOF"; + +class RedisProxyFilterWithAuthPasswordTest : public RedisProxyFilterTest { +public: + RedisProxyFilterWithAuthPasswordTest() : RedisProxyFilterTest(downstream_auth_password_config) {} +}; + +TEST_F(RedisProxyFilterWithAuthPasswordTest, AuthPasswordCorrect) { + InSequence s; + + Buffer::OwnedImpl fake_data; + Common::Redis::RespValuePtr request(new Common::Redis::RespValue()); + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + decoder_callbacks_->onRespValue(std::move(request)); + })); + EXPECT_CALL(splitter_, makeRequest_(Ref(*request), _)) + .WillOnce( + Invoke([&](const Common::Redis::RespValue&, + CommandSplitter::SplitCallbacks& callbacks) -> CommandSplitter::SplitRequest* { + EXPECT_FALSE(callbacks.connectionAllowed()); + Common::Redis::RespValuePtr reply(new Common::Redis::RespValue()); + reply->type(Common::Redis::RespType::SimpleString); + reply->asString() = "OK"; + EXPECT_CALL(*encoder_, encode(Eq(ByRef(*reply)), _)); + EXPECT_CALL(filter_callbacks_.connection_, write(_, _)); + callbacks.onAuth("somepassword"); + // callbacks cannot be accessed now. + EXPECT_TRUE(filter_->connectionAllowed()); + return nullptr; + })); + + EXPECT_EQ(Network::FilterStatus::Continue, filter_->onData(fake_data, false)); +} + +TEST_F(RedisProxyFilterWithAuthPasswordTest, AuthPasswordIncorrect) { + InSequence s; + + Buffer::OwnedImpl fake_data; + Common::Redis::RespValuePtr request(new Common::Redis::RespValue()); + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + decoder_callbacks_->onRespValue(std::move(request)); + })); + EXPECT_CALL(splitter_, makeRequest_(Ref(*request), _)) + .WillOnce( + Invoke([&](const Common::Redis::RespValue&, + CommandSplitter::SplitCallbacks& callbacks) -> CommandSplitter::SplitRequest* { + EXPECT_FALSE(callbacks.connectionAllowed()); + Common::Redis::RespValuePtr reply(new Common::Redis::RespValue()); + reply->type(Common::Redis::RespType::Error); + reply->asString() = "ERR invalid password"; + EXPECT_CALL(*encoder_, encode(Eq(ByRef(*reply)), _)); + EXPECT_CALL(filter_callbacks_.connection_, write(_, _)); + callbacks.onAuth("wrongpassword"); + // callbacks cannot be accessed now. + EXPECT_FALSE(filter_->connectionAllowed()); + return nullptr; + })); + + EXPECT_EQ(Network::FilterStatus::Continue, filter_->onData(fake_data, false)); +} + } // namespace RedisProxy } // namespace NetworkFilters } // namespace Extensions diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index a51671035e807..c2f521dcaa43e 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -153,6 +153,83 @@ const std::string CONFIG_WITH_ROUTES = R"EOF( cluster: cluster_2 )EOF"; +const std::string CONFIG_WITH_DOWNSTREAM_AUTH_PASSWORD_SET = CONFIG + R"EOF( + downstream_auth_password: { inline_string: somepassword } +)EOF"; + +const std::string CONFIG_WITH_ROUTES_AND_AUTH_PASSWORDS = R"EOF( +admin: + access_log_path: /dev/null + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +static_resources: + clusters: + - name: cluster_0 + type: STATIC + extension_protocol_options: + envoy.redis_proxy: { auth_password: { inline_string: cluster_0_password }} + lb_policy: RANDOM + load_assignment: + cluster_name: cluster_0 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + - name: cluster_1 + type: STATIC + lb_policy: RANDOM + extension_protocol_options: + envoy.redis_proxy: { auth_password: { inline_string: cluster_1_password }} + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1 + - name: cluster_2 + type: STATIC + extension_protocol_options: + envoy.redis_proxy: { auth_password: { inline_string: cluster_2_password }} + lb_policy: RANDOM + load_assignment: + cluster_name: cluster_2 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 2 + listeners: + name: listener_0 + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + filters: + name: envoy.redis_proxy + config: + stat_prefix: redis_stats + settings: + op_timeout: 5s + prefix_routes: + catch_all_cluster: cluster_0 + routes: + - prefix: "foo:" + cluster: cluster_1 + - prefix: "baz:" + cluster: cluster_2 +)EOF"; + // This function encodes commands as an array of bulkstrings as transmitted by Redis clients to // Redis servers, according to the Redis protocol. std::string makeBulkStringArray(std::vector&& command_strings) { @@ -223,6 +300,30 @@ class RedisProxyIntegrationTest : public testing::TestWithParamclearData(); redis_client->write(request); - FakeRawConnectionPtr fake_upstream_connection; - EXPECT_TRUE(upstream->waitForRawConnection(fake_upstream_connection)); - EXPECT_TRUE(fake_upstream_connection->waitForData(request.size(), &proxy_to_server)); - // The original request should be the same as the data received by the server. - EXPECT_EQ(request, proxy_to_server); + if (fake_upstream_connection.get() == nullptr) { + expect_auth_command = (!auth_password.empty()); + EXPECT_TRUE(upstream->waitForRawConnection(fake_upstream_connection)); + } + + if (expect_auth_command) { + std::string auth_command = makeBulkStringArray({"auth", auth_password}); + EXPECT_TRUE(fake_upstream_connection->waitForData(auth_command.size() + request.size(), + &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(auth_command + request, proxy_to_server); + // Send back an OK for the auth command. + EXPECT_TRUE(fake_upstream_connection->write(ok)); + + } else { + EXPECT_TRUE(fake_upstream_connection->waitForData(request.size(), &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(request, proxy_to_server); + } EXPECT_TRUE(fake_upstream_connection->write(response)); redis_client->waitForData(response); // The original response should be received by the fake Redis client. EXPECT_EQ(response, redis_client->data()); +} + +void RedisProxyIntegrationTest::simpleRoundtripToUpstream(FakeUpstreamPtr& upstream, + const std::string& request, + const std::string& response) { + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + FakeRawConnectionPtr fake_upstream_connection; + + roundtripToUpstreamStep(upstream, request, response, redis_client, fake_upstream_connection, ""); redis_client->close(); EXPECT_TRUE(fake_upstream_connection->close()); } -void RedisProxyIntegrationTest::simpleProxyResponse(const std::string& request, - const std::string& proxy_response) { - IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); +void RedisProxyIntegrationTest::proxyResponseStep(const std::string& request, + const std::string& proxy_response, + IntegrationTcpClientPtr& redis_client) { + redis_client->clearData(); redis_client->write(request); redis_client->waitForData(proxy_response); // After sending the request to the proxy, the fake redis client should receive proxy_response. EXPECT_EQ(proxy_response, redis_client->data()); +} + +void RedisProxyIntegrationTest::simpleProxyResponse(const std::string& request, + const std::string& proxy_response) { + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + proxyResponseStep(request, proxy_response, redis_client); redis_client->close(); } @@ -408,6 +563,16 @@ TEST_P(RedisProxyIntegrationTest, RedirectWhenNotEnabled) { } } +// This test sends an AUTH command from the fake downstream client to +// the Envoy proxy. Envoy will respond with a no-password-set error since +// no downstream_auth_password has been set for the filter. + +TEST_P(RedisProxyIntegrationTest, DownstreamAuthWhenNoPasswordSet) { + initialize(); + simpleProxyResponse(makeBulkStringArray({"auth", "somepassword"}), + "-ERR Client sent AUTH, but no password is set\r\n"); +} + // This test sends a simple Redis command to a sequence of fake upstream // Redis servers. The first server replies with a MOVED or ASK redirection // error that specifies the second upstream server in the static configuration @@ -618,5 +783,67 @@ TEST_P(RedisProxyWithRoutesIntegrationTest, SimpleRequestAndResponseRoutedByPref "$3\r\nbar\r\n"); } +// This test verifies that a client connection cannot issue a command to an upstream +// server until it supplies a valid Redis AUTH command when downstream_auth_password +// is set for the redis_proxy filter. It also verifies the errors sent by the proxy +// when no password or the wrong password is received. + +TEST_P(RedisProxyWithDownstreamAuthIntegrationTest, ErrorsUntilCorrectPasswordSent) { + initialize(); + + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + FakeRawConnectionPtr fake_upstream_connection; + + proxyResponseStep(makeBulkStringArray({"get", "foo"}), "-NOAUTH Authentication required.\r\n", + redis_client); + + std::stringstream error_response; + error_response << "-" << RedisCmdSplitter::Response::get().InvalidRequest << "\r\n"; + proxyResponseStep(makeBulkStringArray({"auth"}), error_response.str(), redis_client); + + proxyResponseStep(makeBulkStringArray({"auth", "wrongpassword"}), "-ERR invalid password\r\n", + redis_client); + + proxyResponseStep(makeBulkStringArray({"get", "foo"}), "-NOAUTH Authentication required.\r\n", + redis_client); + + proxyResponseStep(makeBulkStringArray({"auth", "somepassword"}), "+OK\r\n", redis_client); + + roundtripToUpstreamStep(fake_upstreams_[0], makeBulkStringArray({"get", "foo"}), "$3\r\nbar\r\n", + redis_client, fake_upstream_connection, ""); + + redis_client->close(); + EXPECT_TRUE(fake_upstream_connection->close()); +} + +// This test verifies that upstream server connections are transparently authenticated if an +// auth_password is specified for each cluster. + +TEST_P(RedisProxyWithRoutesAndAuthPasswordsIntegrationTest, TransparentAuthentication) { + initialize(); + + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + std::array fake_upstream_connection; + + // roundtrip to cluster_0 (catch_all route) + roundtripToUpstreamStep(fake_upstreams_[0], makeBulkStringArray({"get", "toto"}), "$3\r\nbar\r\n", + redis_client, fake_upstream_connection[0], "cluster_0_password"); + + // roundtrip to cluster_1 (prefix "foo:" route) + roundtripToUpstreamStep(fake_upstreams_[1], makeBulkStringArray({"get", "foo:123"}), + "$3\r\nbar\r\n", redis_client, fake_upstream_connection[1], + "cluster_1_password"); + + // roundtrip to cluster_2 (prefix "baz:" route) + roundtripToUpstreamStep(fake_upstreams_[2], makeBulkStringArray({"get", "baz:123"}), + "$3\r\nbar\r\n", redis_client, fake_upstream_connection[2], + "cluster_2_password"); + + redis_client->close(); + EXPECT_TRUE(fake_upstream_connection[0]->close()); + EXPECT_TRUE(fake_upstream_connection[1]->close()); + EXPECT_TRUE(fake_upstream_connection[2]->close()); +} + } // namespace } // namespace Envoy diff --git a/test/extensions/health_checkers/redis/redis_test.cc b/test/extensions/health_checkers/redis/redis_test.cc index 9b572665021ec..362285be775f0 100644 --- a/test/extensions/health_checkers/redis/redis_test.cc +++ b/test/extensions/health_checkers/redis/redis_test.cc @@ -24,7 +24,6 @@ namespace Envoy { namespace Extensions { namespace HealthCheckers { namespace RedisHealthChecker { -namespace { class RedisHealthCheckerTest : public testing::Test, @@ -152,6 +151,21 @@ class RedisHealthCheckerTest EXPECT_CALL(*timeout_timer_, enableTimer(_)); } + void exerciseStubs() { + Upstream::HostSharedPtr host = Upstream::makeTestHost(cluster_->info_, "tcp://127.0.0.1:100"); + RedisHealthChecker::RedisActiveHealthCheckSessionPtr session = + std::make_unique(*health_checker_, host); + + EXPECT_TRUE(session->disableOutlierEvents()); + EXPECT_EQ(session->opTimeout(), + std::chrono::milliseconds(2000)); // Timeout is 1s is test configurations. + EXPECT_FALSE(session->enableHashtagging()); + EXPECT_TRUE(session->enableRedirection()); + EXPECT_EQ(session->maxBufferSizeBeforeFlush(), 0); + EXPECT_EQ(session->bufferFlushTimeoutInMs(), std::chrono::milliseconds(1)); + session->onDeferredDeleteBase(); // This must be called to pass assertions in the destructor. + } + std::shared_ptr cluster_; NiceMock dispatcher_; NiceMock runtime_; @@ -169,6 +183,9 @@ TEST_F(RedisHealthCheckerTest, PingAndVariousFailures) { InSequence s; setup(); + // Exercise stubbed out interfaces for coverage. + exerciseStubs(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { Upstream::makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; @@ -505,7 +522,6 @@ TEST_F(RedisHealthCheckerTest, NoConnectionReuse) { EXPECT_EQ(2UL, cluster_->info_->stats_store_.counter("health_check.network_failure").value()); } -} // namespace } // namespace RedisHealthChecker } // namespace HealthCheckers } // namespace Extensions diff --git a/test/integration/integration.h b/test/integration/integration.h index 90933d2940347..20ef89b818eda 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -98,6 +98,7 @@ class IntegrationTcpClient { void write(const std::string& data, bool end_stream = false, bool verify = true); const std::string& data() { return payload_reader_->data(); } bool connected() const { return !disconnected_; } + void clearData() { payload_reader_->clearData(); } private: struct ConnectionCallbacks : public Network::ConnectionCallbacks { diff --git a/test/integration/utility.h b/test/integration/utility.h index e39a295f682fe..2a3a3f1c016b7 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -183,6 +183,7 @@ class WaitForPayloadReader : public Network::ReadFilterBaseImpl { } const std::string& data() { return data_; } bool readLastByte() { return read_end_stream_; } + void clearData() { data_.clear(); } private: Event::Dispatcher& dispatcher_; diff --git a/tools/spelling_dictionary.txt b/tools/spelling_dictionary.txt index 3c75af4f8f1f6..7cc14ac27e390 100644 --- a/tools/spelling_dictionary.txt +++ b/tools/spelling_dictionary.txt @@ -157,6 +157,7 @@ NBF NBSP NDEBUG NGHTTP +NOAUTH NOLINT NOLINTNEXTLINE NONCES