diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 16196cc07a3b1..eec8c3f409544 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -59,6 +59,27 @@ message RedisProxy { // need to be known to the cluster manager. If the command cannot be redirected, then the // original error is passed downstream unchanged. By default, this support is not enabled. bool enable_redirection = 3; + + // Maximum size of encoded request buffer before flush is triggered and encoded requests + // are sent upstream. If this is unset, the buffer flushes whenever it receives data + // and performs no batching. + // This feature makes it possible for multiple clients to send requests to Envoy and have + // them batched- for example if one is running several worker processes, each with its own + // Redis connection. There is no benefit to using this with a single downstream process. + // Recommended size (if enabled) is 1024 bytes. + uint32 max_buffer_size_before_flush = 4; + + // The encoded request buffer is flushed N milliseconds after the first request has been + // encoded, unless the buffer size has already exceeded `max_buffer_size_before_flush`. + // If `max_buffer_size_before_flush` is not set, this flush timer is not used. Otherwise, + // the timer should be set according to the number of clients, overall request rate and + // desired maximum latency for a single command. For example, if there are many requests + // being batched together at a high rate, the buffer will likely be filled before the timer + // fires. Alternatively, if the request rate is lower the buffer will not be filled as often + // before the timer fires. + // If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout` is not, the latter + // defaults to 3ms. + google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true]; } // Network settings for the connection pool to the upstream clusters. diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 3458bd0d5b55a..43be66773e8d1 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -10,6 +10,9 @@ Version history * http: mitigated a race condition with the :ref:`delayed_close_timeout` where it could trigger while actively flushing a pending write buffer for a downstream connection. * redis: added :ref:`prefix routing ` to enable routing commands based on their key's prefix to different upstream. * redis: add support for zpopmax and zpopmin commands. +* redis: added + :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and + :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. * router: added ability to control retry back-off intervals via :ref:`retry policy `. * upstream: added :ref:`upstream_cx_pool_overflow ` for the connection pool circuit breaker. diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 59c5c88080c9b..4a7c53912afc3 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "envoy/upstream/cluster_manager.h" #include "extensions/filters/network/common/redis/codec_impl.h" @@ -110,6 +112,16 @@ class Config { * processed. */ virtual bool enableRedirection() const PURE; + + /** + * @return buffer size for batching commands for a single upstream host. + */ + virtual uint32_t maxBufferSizeBeforeFlush() const PURE; + + /** + * @return timeout for batching commands for a single upstream host. + */ + virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE; }; /** diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 1040036560488..fa4bb4bb5c766 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -11,7 +11,14 @@ ConfigImpl::ConfigImpl( const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config) : op_timeout_(PROTOBUF_GET_MS_REQUIRED(config, op_timeout)), enable_hashtagging_(config.enable_hashtagging()), - enable_redirection_(config.enable_redirection()) {} + enable_redirection_(config.enable_redirection()), + max_buffer_size_before_flush_( + config.max_buffer_size_before_flush()), // This is a scalar, so default is zero. + buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT( + config, buffer_flush_timeout, + 3)) // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used + // as the buffer is flushed on each request immediately. +{} ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, @@ -31,7 +38,8 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config) : host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)), config_(config), - connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })) { + connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })), + flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) { host->cluster().stats().upstream_cx_total_.inc(); host->stats().cx_total_.inc(); host->cluster().stats().upstream_cx_active_.inc(); @@ -48,12 +56,27 @@ ClientImpl::~ClientImpl() { void ClientImpl::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } +void ClientImpl::flushBufferAndResetTimer() { + if (flush_timer_->enabled()) { + flush_timer_->disableTimer(); + } + connection_->write(encoder_buffer_, false); +} + PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& callbacks) { ASSERT(connection_->state() == Network::Connection::State::Open); + const bool empty_buffer = encoder_buffer_.length() == 0; + pending_requests_.emplace_back(*this, callbacks); encoder_->encode(request, encoder_buffer_); - connection_->write(encoder_buffer_, false); + + // If buffer is full, flush. If the the buffer was empty before the request, start the timer. + if (encoder_buffer_.length() >= config_.maxBufferSizeBeforeFlush()) { + flushBufferAndResetTimer(); + } else if (empty_buffer) { + flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs())); + } // Only boost the op timeout if: // - We are not already connected. Otherwise, we are governed by the connect timeout and the timer diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index 5a44d39e82687..fd9b7b7af7b80 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -41,11 +41,17 @@ class ConfigImpl : public Config { std::chrono::milliseconds opTimeout() const override { return op_timeout_; } bool enableHashtagging() const override { return enable_hashtagging_; } bool enableRedirection() const override { return enable_redirection_; } + uint32_t maxBufferSizeBeforeFlush() const override { return max_buffer_size_before_flush_; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return buffer_flush_timeout_; + } private: const std::chrono::milliseconds op_timeout_; const bool enable_hashtagging_; const bool enable_redirection_; + const uint32_t max_buffer_size_before_flush_; + const std::chrono::milliseconds buffer_flush_timeout_; }; class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks { @@ -62,6 +68,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne } void close() override; PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override; + void flushBufferAndResetTimer(); private: struct UpstreamReadFilter : public Network::ReadFilterBaseImpl { @@ -111,6 +118,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne std::list pending_requests_; Event::TimerPtr connect_or_op_timer_; bool connected_{}; + Event::TimerPtr flush_timer_; }; class ClientFactoryImpl : public ClientFactory { diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index 7c93b017b5a90..791171d2d83e2 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "envoy/config/health_checker/redis/v2/redis.pb.validate.h" #include "common/upstream/health_checker_base_impl.h" @@ -63,6 +65,14 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { return true; } // Redirection errors are treated as check successes. + // Batching + unsigned int maxBufferSizeBeforeFlush() const override { + return 0; + } // Forces an immediate flush + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return std::chrono::milliseconds(1); + } + // Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; void onFailure() override; diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index d5c0e2ac7fae6..c1d8269f5b024 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -64,6 +64,11 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF upstream_connection_ = new NiceMock(); Upstream::MockHost::MockCreateConnectionData conn_info; conn_info.connection_ = upstream_connection_; + + // Create timers in order they are created in client_impl.cc + connect_or_op_timer_ = new Event::MockTimer(&dispatcher_); + flush_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*connect_or_op_timer_, enableTimer(_)); EXPECT_CALL(*host_, createConnection_(_, _)).WillOnce(Return(conn_info)); EXPECT_CALL(*upstream_connection_, addReadFilter(_)) @@ -89,7 +94,8 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF const std::string cluster_name_{"foo"}; std::shared_ptr host_{new NiceMock()}; Event::MockDispatcher dispatcher_; - Event::MockTimer* connect_or_op_timer_{new Event::MockTimer(&dispatcher_)}; + Event::MockTimer* flush_timer_{}; + Event::MockTimer* connect_or_op_timer_{}; MockEncoder* encoder_{new MockEncoder()}; MockDecoder* decoder_{new MockDecoder()}; Common::Redis::DecoderCallbacks* callbacks_{}; @@ -99,6 +105,138 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF ClientPtr client_; }; +TEST_F(RedisClientImplTest, BatchWithZeroBufferAndTimeout) { + // Basic test with a single request, default buffer size (0) and timeout (0). + // This means we do not batch requests, and thus the flush timer is never enabled. + InSequence s; + + setup(); + + // Make the dummy request + Common::Redis::RespValue request1; + MockPoolCallbacks callbacks1; + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + + // Process the dummy request + Buffer::OwnedImpl fake_data; + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + InSequence s; + Common::Redis::RespValuePtr response1(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks1, onResponse_(Ref(response1))); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response1)); + })); + upstream_read_filter_->onData(fake_data, false); + + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + client_->close(); +} + +class ConfigBufferSizeGTSingleRequest : public Config { + bool disableOutlierEvents() const override { return false; } + std::chrono::milliseconds opTimeout() const override { return std::chrono::milliseconds(25); } + bool enableHashtagging() const override { return false; } + bool enableRedirection() const override { return false; } + unsigned int maxBufferSizeBeforeFlush() const override { return 8; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return std::chrono::milliseconds(1); + } +}; + +TEST_F(RedisClientImplTest, BatchWithTimerFiring) { + // With a flush buffer > single request length, the flush timer comes into play. + // In this test, we make a single request that doesn't fill the buffer, so we + // have to wait for the flush timer to fire. + InSequence s; + + setup(std::make_unique()); + + // Make the dummy request + Common::Redis::RespValue request1; + MockPoolCallbacks callbacks1; + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enableTimer(_)); + PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + + // Pretend the the flush timer fires. + // The timer callback is the general-purpose flush function, also used when + // the buffer is filled. If the buffer fills before the timer fires, we need + // to check if the timer is active and cancel it. However, if the timer fires + // the callback, this internal check returns false as the timer is finished. + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + flush_timer_->invokeCallback(); + + // Process the dummy request + Buffer::OwnedImpl fake_data; + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + InSequence s; + Common::Redis::RespValuePtr response1(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks1, onResponse_(Ref(response1))); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response1)); + })); + upstream_read_filter_->onData(fake_data, false); + + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + client_->close(); +} + +TEST_F(RedisClientImplTest, BatchWithTimerCancelledByBufferFlush) { + // Expanding on the previous test, let's the flush buffer is filled by two requests. + // In this test, we make a single request that doesn't fill the buffer, and the timer + // starts. However, a second request comes in, which should cancel the timer, such + // that it is never invoked. + InSequence s; + + setup(std::make_unique()); + + // Make the dummy request (doesn't fill buffer, starts timer) + Common::Redis::RespValue request1; + MockPoolCallbacks callbacks1; + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enableTimer(_)); + PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + + // Make a second dummy request (fills buffer, cancels timer) + Common::Redis::RespValue request2; + MockPoolCallbacks callbacks2; + EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(true)); + ; + EXPECT_CALL(*flush_timer_, disableTimer()); + PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); + EXPECT_NE(nullptr, handle2); + + // Process the dummy requests + Buffer::OwnedImpl fake_data; + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + InSequence s; + Common::Redis::RespValuePtr response1(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks1, onResponse_(Ref(response1))); + EXPECT_CALL(*connect_or_op_timer_, enableTimer(_)); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response1)); + + Common::Redis::RespValuePtr response2(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks2, onResponse_(Ref(response2))); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response2)); + })); + upstream_read_filter_->onData(fake_data, false); + + EXPECT_CALL(*upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + client_->close(); +} + TEST_F(RedisClientImplTest, Basic) { InSequence s; @@ -107,6 +245,7 @@ TEST_F(RedisClientImplTest, Basic) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -115,6 +254,7 @@ TEST_F(RedisClientImplTest, Basic) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -153,6 +293,7 @@ TEST_F(RedisClientImplTest, Cancel) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -161,6 +302,7 @@ TEST_F(RedisClientImplTest, Cancel) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -202,6 +344,7 @@ TEST_F(RedisClientImplTest, FailAll) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -228,6 +371,7 @@ TEST_F(RedisClientImplTest, FailAllWithCancel) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -252,6 +396,7 @@ TEST_F(RedisClientImplTest, ProtocolError) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -279,6 +424,7 @@ TEST_F(RedisClientImplTest, ConnectFail) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -296,6 +442,10 @@ class ConfigOutlierDisabled : public Config { std::chrono::milliseconds opTimeout() const override { return std::chrono::milliseconds(25); } bool enableHashtagging() const override { return false; } bool enableRedirection() const override { return false; } + unsigned int maxBufferSizeBeforeFlush() const override { return 0; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return std::chrono::milliseconds(0); + } }; TEST_F(RedisClientImplTest, OutlierDisabled) { @@ -306,6 +456,7 @@ TEST_F(RedisClientImplTest, OutlierDisabled) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -326,6 +477,7 @@ TEST_F(RedisClientImplTest, ConnectTimeout) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -347,6 +499,7 @@ TEST_F(RedisClientImplTest, OpTimeout) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -370,6 +523,7 @@ TEST_F(RedisClientImplTest, AskRedirection) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -378,6 +532,7 @@ TEST_F(RedisClientImplTest, AskRedirection) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -428,6 +583,7 @@ TEST_F(RedisClientImplTest, MovedRedirection) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -436,6 +592,7 @@ TEST_F(RedisClientImplTest, MovedRedirection) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -486,6 +643,7 @@ TEST_F(RedisClientImplTest, AskRedirectionNotEnabled) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -494,6 +652,7 @@ TEST_F(RedisClientImplTest, AskRedirectionNotEnabled) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -545,6 +704,7 @@ TEST_F(RedisClientImplTest, MovedRedirectionNotEnabled) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -553,6 +713,7 @@ TEST_F(RedisClientImplTest, MovedRedirectionNotEnabled) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index 07997b76c0f1d..a51671035e807 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -65,6 +65,12 @@ const std::string CONFIG_WITH_REDIRECTION = CONFIG + R"EOF( enable_redirection: true )EOF"; +// This is a configuration with batching enabled. +const std::string CONFIG_WITH_BATCHING = CONFIG + R"EOF( + max_buffer_size_before_flush: 1024 + buffer_flush_timeout: 0.003s +)EOF"; + const std::string CONFIG_WITH_ROUTES = R"EOF( admin: access_log_path: /dev/null @@ -242,6 +248,11 @@ class RedisProxyWithRedirectionIntegrationTest : public RedisProxyIntegrationTes const std::string& asking_response = "+OK\r\n"); }; +class RedisProxyWithBatchingIntegrationTest : public RedisProxyIntegrationTest { +public: + RedisProxyWithBatchingIntegrationTest() : RedisProxyIntegrationTest(CONFIG_WITH_BATCHING, 2) {} +}; + class RedisProxyWithRoutesIntegrationTest : public RedisProxyIntegrationTest { public: RedisProxyWithRoutesIntegrationTest() : RedisProxyIntegrationTest(CONFIG_WITH_ROUTES, 6) {} @@ -255,6 +266,10 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, RedisProxyWithRedirectionIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); +INSTANTIATE_TEST_SUITE_P(IpVersions, RedisProxyWithBatchingIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + INSTANTIATE_TEST_SUITE_P(IpVersions, RedisProxyWithRoutesIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); @@ -549,6 +564,42 @@ TEST_P(RedisProxyWithRedirectionIntegrationTest, IgnoreRedirectionForAsking) { asking_response.str()); } +// This test verifies that batching works properly. If batching is enabled, when multiple +// clients make a request to a Redis server within a certain time window, they will be batched +// together. The below example, two clients send "GET foo", and Redis receives those two as +// a single concatenated request. + +TEST_P(RedisProxyWithBatchingIntegrationTest, SimpleBatching) { + initialize(); + + const std::string& request = makeBulkStringArray({"get", "foo"}); + const std::string& response = "$3\r\nbar\r\n"; + + std::string proxy_to_server; + IntegrationTcpClientPtr redis_client_1 = makeTcpConnection(lookupPort("redis_proxy")); + IntegrationTcpClientPtr redis_client_2 = makeTcpConnection(lookupPort("redis_proxy")); + redis_client_1->write(request); + redis_client_2->write(request); + + FakeRawConnectionPtr fake_upstream_connection; + EXPECT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + EXPECT_TRUE(fake_upstream_connection->waitForData(request.size() * 2, &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(request + request, proxy_to_server); + + EXPECT_TRUE(fake_upstream_connection->write(response + response)); + redis_client_1->waitForData(response); + redis_client_2->waitForData(response); + // The original response should be received by the fake Redis client. + EXPECT_EQ(response, redis_client_1->data()); + EXPECT_EQ(response, redis_client_2->data()); + + redis_client_1->close(); + EXPECT_TRUE(fake_upstream_connection->close()); + redis_client_2->close(); + EXPECT_TRUE(fake_upstream_connection->close()); +} + // This test verifies that it's possible to route keys to 3 different upstream pools. TEST_P(RedisProxyWithRoutesIntegrationTest, SimpleRequestAndResponseRoutedByPrefix) {