diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index ca1b26f342281..c3bb76da80871 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -394,6 +394,26 @@ class Instance { template void writeBEInt(T value) { writeInt(value); } + + /** + * Set the buffer's high watermark. The buffer's low watermark is implicitly set to half the high + * watermark. Setting the high watermark to 0 disables watermark functionality. + * @param watermark supplies the buffer high watermark size threshold, in bytes. + */ + virtual void setWatermarks(uint32_t watermark) PURE; + /** + * Returns the configured high watermark. A return value of 0 indicates that watermark + * functionality is disabled. + */ + virtual uint32_t highWatermark() const PURE; + /** + * Determine if the buffer watermark trigger condition is currently set. The watermark trigger is + * set when the buffer size exceeds the configured high watermark and is cleared once the buffer + * size drops to the low watermark. + * @return true if the buffer size once exceeded the high watermark and hasn't since dropped to + * the low watermark. + */ + virtual bool highWatermarkTriggered() const PURE; }; using InstancePtr = std::unique_ptr; diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index be8018b22857c..40a43608bd27c 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -590,6 +590,13 @@ class OwnedImpl : public LibEventInstance { */ virtual void appendSliceForTest(absl::string_view data); + // Does not implement watermarking. + // TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer + // implementations. Also, make high-watermark config a constructor argument. + void setWatermarks(uint32_t) override { ASSERT(false, "watermarks not implemented."); } + uint32_t highWatermark() const override { return 0; } + bool highWatermarkTriggered() const override { return false; } + /** * Describe the in-memory representation of the slices in the buffer. For use * in tests that want to make assertions about the specific arrangement of diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 23c7c32854d23..9fd568b98f5d5 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -40,12 +40,12 @@ class WatermarkBuffer : public OwnedImpl { void appendSliceForTest(const void* data, uint64_t size) override; void appendSliceForTest(absl::string_view data) override; - void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); } + void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); } void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); - uint32_t highWatermark() const { return high_watermark_; } + uint32_t highWatermark() const override { return high_watermark_; } // Returns true if the high watermark callbacks have been called more recently // than the low watermark callbacks. - bool highWatermarkTriggered() const { return above_high_watermark_called_; } + bool highWatermarkTriggered() const override { return above_high_watermark_called_; } private: void checkHighAndOverflowWatermarks(); diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 36aa653a90062..78096043a6f19 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -250,8 +250,8 @@ bool ActiveStreamDecoderFilter::canContinue() { return !parent_.state_.local_complete_; } -Buffer::WatermarkBufferPtr ActiveStreamDecoderFilter::createBuffer() { - auto buffer = std::make_unique( +Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() { + auto buffer = dispatcher().getWatermarkFactory().create( [this]() -> void { this->requestDataDrained(); }, [this]() -> void { this->requestDataTooLarge(); }, []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }); @@ -259,7 +259,7 @@ Buffer::WatermarkBufferPtr ActiveStreamDecoderFilter::createBuffer() { return buffer; } -Buffer::WatermarkBufferPtr& ActiveStreamDecoderFilter::bufferedData() { +Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() { return parent_.buffered_request_data_; } @@ -1302,15 +1302,15 @@ absl::optional ActiveStreamDecoderFilter::routeCon return parent_.filter_manager_callbacks_.routeConfig(); } -Buffer::WatermarkBufferPtr ActiveStreamEncoderFilter::createBuffer() { - auto buffer = new Buffer::WatermarkBuffer( +Buffer::InstancePtr ActiveStreamEncoderFilter::createBuffer() { + auto buffer = dispatcher().getWatermarkFactory().create( [this]() -> void { this->responseDataDrained(); }, [this]() -> void { this->responseDataTooLarge(); }, []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }); buffer->setWatermarks(parent_.buffer_limit_); - return Buffer::WatermarkBufferPtr{buffer}; + return buffer; } -Buffer::WatermarkBufferPtr& ActiveStreamEncoderFilter::bufferedData() { +Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() { return parent_.buffered_response_data_; } bool ActiveStreamEncoderFilter::complete() { return parent_.state_.local_complete_; } diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 9f1de48ef3a4a..4920db67bc15c 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -45,8 +45,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks, void commonContinue(); virtual bool canContinue() PURE; - virtual Buffer::WatermarkBufferPtr createBuffer() PURE; - virtual Buffer::WatermarkBufferPtr& bufferedData() PURE; + virtual Buffer::InstancePtr createBuffer() PURE; + virtual Buffer::InstancePtr& bufferedData() PURE; virtual bool complete() PURE; virtual bool has100Continueheaders() PURE; virtual void do100ContinueHeaders() PURE; @@ -138,8 +138,8 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, // ActiveStreamFilterBase bool canContinue() override; - Buffer::WatermarkBufferPtr createBuffer() override; - Buffer::WatermarkBufferPtr& bufferedData() override; + Buffer::InstancePtr createBuffer() override; + Buffer::InstancePtr& bufferedData() override; bool complete() override; bool has100Continueheaders() override { return false; } void do100ContinueHeaders() override { NOT_REACHED_GCOVR_EXCL_LINE; } @@ -224,8 +224,8 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase, // ActiveStreamFilterBase bool canContinue() override { return true; } - Buffer::WatermarkBufferPtr createBuffer() override; - Buffer::WatermarkBufferPtr& bufferedData() override; + Buffer::InstancePtr createBuffer() override; + Buffer::InstancePtr& bufferedData() override; bool complete() override; bool has100Continueheaders() override; void do100ContinueHeaders() override; @@ -776,8 +776,8 @@ class FilterManager : public ScopeTrackedObject, // processing the next filter. The storage is created on demand. We need to store metadata // temporarily in the filter in case the filter has stopped all while processing headers. std::unique_ptr request_metadata_map_vector_; - Buffer::WatermarkBufferPtr buffered_response_data_; - Buffer::WatermarkBufferPtr buffered_request_data_; + Buffer::InstancePtr buffered_response_data_; + Buffer::InstancePtr buffered_request_data_; uint32_t buffer_limit_{0}; uint32_t high_watermark_count_{0}; std::list watermark_callbacks_; diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index ea8c464da5723..0e8a38eb0fce7 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -267,7 +267,7 @@ void StreamEncoderImpl::endEncode() { } } -void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) { +void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) { // It's messy and complicated to try to tag the final write of an HTTP response for response // tracking for flood protection. Instead, write an empty buffer fragment after the response, // to allow for tracking. @@ -297,20 +297,20 @@ void ConnectionImpl::flushOutput(bool end_encode) { if (end_encode) { // If this is an HTTP response in ServerConnectionImpl, track outbound responses for flood // protection - maybeAddSentinelBufferFragment(output_buffer_); + maybeAddSentinelBufferFragment(*output_buffer_); } - connection().write(output_buffer_, false); - ASSERT(0UL == output_buffer_.length()); + connection().write(*output_buffer_, false); + ASSERT(0UL == output_buffer_->length()); } -void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_.add(data); } +void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_->add(data); } -void ConnectionImpl::addCharToBuffer(char c) { output_buffer_.add(&c, 1); } +void ConnectionImpl::addCharToBuffer(char c) { output_buffer_->add(&c, 1); } -void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_.add(absl::StrCat(i)); } +void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_->add(absl::StrCat(i)); } void ConnectionImpl::copyToBuffer(const char* data, uint64_t length) { - output_buffer_.add(data, length); + output_buffer_->add(data, length); } void StreamEncoderImpl::resetStream(StreamResetReason reason) { @@ -477,12 +477,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat handling_upgrade_(false), reset_stream_called_(false), deferred_end_stream_headers_(false), strict_1xx_and_204_headers_(Runtime::runtimeFeatureEnabled( "envoy.reloadable_features.strict_1xx_and_204_response_headers")), - dispatching_(false), - output_buffer_([&]() -> void { this->onBelowLowWatermark(); }, - [&]() -> void { this->onAboveHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }), + dispatching_(false), output_buffer_(connection.dispatcher().getWatermarkFactory().create( + [&]() -> void { this->onBelowLowWatermark(); }, + [&]() -> void { this->onAboveHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) { - output_buffer_.setWatermarks(connection.bufferLimit()); + output_buffer_->setWatermarks(connection.bufferLimit()); http_parser_init(&parser_, type); parser_.allow_chunked_length = 1; parser_.data = this; diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 6b5bfee9cc5c0..deb08526a3ad6 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -197,7 +197,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggablelength()); } -void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_.add(data); } +void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_->add(data); } -void ConnectionImpl::addCharToBuffer(char c) { output_buffer_.add(&c, 1); } +void ConnectionImpl::addCharToBuffer(char c) { output_buffer_->add(&c, 1); } -void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_.add(absl::StrCat(i)); } +void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_->add(absl::StrCat(i)); } void ConnectionImpl::copyToBuffer(const char* data, uint64_t length) { - output_buffer_.add(data, length); + output_buffer_->add(data, length); } void StreamEncoderImpl::resetStream(StreamResetReason reason) { @@ -455,11 +455,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat handling_upgrade_(false), reset_stream_called_(false), deferred_end_stream_headers_(false), strict_1xx_and_204_headers_(Runtime::runtimeFeatureEnabled( "envoy.reloadable_features.strict_1xx_and_204_response_headers")), - output_buffer_([&]() -> void { this->onBelowLowWatermark(); }, - [&]() -> void { this->onAboveHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }), + output_buffer_(connection.dispatcher().getWatermarkFactory().create( + [&]() -> void { this->onBelowLowWatermark(); }, + [&]() -> void { this->onAboveHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) { - output_buffer_.setWatermarks(connection.bufferLimit()); + output_buffer_->setWatermarks(connection.bufferLimit()); http_parser_init(&parser_, type); parser_.allow_chunked_length = 1; parser_.data = this; diff --git a/source/common/http/http1/codec_impl_legacy.h b/source/common/http/http1/codec_impl_legacy.h index 0a468d286b53e..703de320637e7 100644 --- a/source/common/http/http1/codec_impl_legacy.h +++ b/source/common/http/http1/codec_impl_legacy.h @@ -201,7 +201,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable void { this->onReadBufferLowWatermark(); }, - [this]() -> void { this->onReadBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }), write_buffer_(dispatcher.getWatermarkFactory().create( [this]() -> void { this->onWriteBufferLowWatermark(); }, [this]() -> void { this->onWriteBufferHighWatermark(); }, []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + read_buffer_(dispatcher.getWatermarkFactory().create( + [this]() -> void { this->onReadBufferLowWatermark(); }, + [this]() -> void { this->onReadBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), write_buffer_above_high_watermark_(false), detect_early_close_(true), enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false), write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false), @@ -209,7 +210,7 @@ void ConnectionImpl::setTransportSocketIsReadable() { bool ConnectionImpl::filterChainWantsData() { return read_disable_count_ == 0 || - (read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered()); + (read_disable_count_ == 1 && read_buffer_->highWatermarkTriggered()); } void ConnectionImpl::closeSocket(ConnectionEvent close_type) { @@ -328,7 +329,7 @@ void ConnectionImpl::readDisable(bool disable) { ASSERT(state() == State::Open); ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this, - disable, read_disable_count_, static_cast(state()), read_buffer_.length()); + disable, read_disable_count_, static_cast(state()), read_buffer_->length()); // When we disable reads, we still allow for early close notifications (the equivalent of // `EPOLLRDHUP` for an epoll backend). For backends that support it, this allows us to apply @@ -370,10 +371,10 @@ void ConnectionImpl::readDisable(bool disable) { ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write); } - if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) { + if (filterChainWantsData() && (read_buffer_->length() > 0 || transport_wants_read_)) { // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read // buffer's high watermark has triggered. - ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0); + ASSERT(read_buffer_->length() > 0 || read_disable_count_ == 0); // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be // able to process additional bytes even if there is no data in the kernel to kick off the @@ -487,8 +488,8 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) { // bytes) would not trigger watermarks but a blocked socket (move |limit| bytes, flush 0 bytes) // would result in respecting the exact buffer limit. if (limit > 0) { - static_cast(write_buffer_.get())->setWatermarks(limit + 1); - read_buffer_.setWatermarks(limit + 1); + write_buffer_->setWatermarks(limit + 1); + read_buffer_->setWatermarks(limit + 1); } } @@ -583,7 +584,7 @@ void ConnectionImpl::onReadReady() { // Do not clear transport_wants_read_ when returning early; the early return skips the transport // socket doRead call. if (latched_dispatch_buffered_data && filterChainWantsData()) { - onRead(read_buffer_.length()); + onRead(read_buffer_->length()); } return; } @@ -593,8 +594,8 @@ void ConnectionImpl::onReadReady() { // reading from the transport if the read buffer is above high watermark at the start of the // method. transport_wants_read_ = false; - IoResult result = transport_socket_->doRead(read_buffer_); - uint64_t new_buffer_size = read_buffer_.length(); + IoResult result = transport_socket_->doRead(*read_buffer_); + uint64_t new_buffer_size = read_buffer_->length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); // If this connection doesn't have half-close semantics, translate end_stream into @@ -606,7 +607,7 @@ void ConnectionImpl::onReadReady() { read_end_stream_ |= result.end_stream_read_; if (result.bytes_processed_ != 0 || result.end_stream_read_ || - (latched_dispatch_buffered_data && read_buffer_.length() > 0)) { + (latched_dispatch_buffered_data && read_buffer_->length() > 0)) { // Skip onRead if no bytes were processed unless we explicitly want to force onRead for // buffered data. For instance, skip onRead if the connection was closed without producing // more data. diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 19553cf1c70ee..bde423cff1ec9 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -99,7 +99,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback void rawWrite(Buffer::Instance& data, bool end_stream) override; // Network::ReadBufferSource - StreamBuffer getReadBuffer() override { return {read_buffer_, read_end_stream_}; } + StreamBuffer getReadBuffer() override { return {*read_buffer_, read_end_stream_}; } // Network::WriteBufferSource StreamBuffer getWriteBuffer() override { return {*current_write_buffer_, current_write_end_stream_}; @@ -112,7 +112,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback void raiseEvent(ConnectionEvent event) override; // Should the read buffer be drained? bool shouldDrainReadBuffer() override { - return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_; + return read_buffer_limit_ > 0 && read_buffer_->length() >= read_buffer_limit_; } // Mark read buffer ready to read in the event loop. This is used when yielding following // shouldDrainReadBuffer(). @@ -149,14 +149,16 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback StreamInfo::StreamInfo& stream_info_; FilterManagerImpl filter_manager_; - // Ensure that if the consumer of the data from this connection isn't - // consuming, that the connection eventually stops reading from the wire. - Buffer::WatermarkBuffer read_buffer_; // This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has // a generic pointer. // It MUST be defined after the filter_manager_ as some filters may have callbacks that // write_buffer_ invokes during its clean up. + // This buffer is always allocated, never nullptr. Buffer::InstancePtr write_buffer_; + // Ensure that if the consumer of the data from this connection isn't + // consuming, that the connection eventually stops reading from the wire. + // This buffer is always allocated, never nullptr. + Buffer::InstancePtr read_buffer_; uint32_t read_buffer_limit_ = 0; bool connecting_{false}; ConnectionEvent immediate_error_event_{ConnectionEvent::Connected}; diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index f44825323bf34..e63d30ff59c79 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -207,7 +207,7 @@ void UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream) { if (!upstream_ || paused_for_connect_) { ENVOY_STREAM_LOG(trace, "buffering {} bytes", *parent_.callbacks(), data.length()); if (!buffered_request_body_) { - buffered_request_body_ = std::make_unique( + buffered_request_body_ = parent_.callbacks()->dispatcher().getWatermarkFactory().create( [this]() -> void { this->enableDataFromDownstreamForFlowControl(); }, [this]() -> void { this->disableDataFromDownstreamForFlowControl(); }, []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }); diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 5fc63dc212bf5..b22f90195c691 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -132,7 +132,7 @@ class UpstreamRequest : public Logger::Loggable, Event::TimerPtr per_try_timeout_; std::unique_ptr upstream_; absl::optional deferred_reset_reason_; - Buffer::WatermarkBufferPtr buffered_request_body_; + Buffer::InstancePtr buffered_request_body_; Upstream::HostDescriptionConstSharedPtr upstream_host_; DownstreamWatermarkManager downstream_watermark_manager_{*this}; Tracing::SpanPtr span_; diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 7dc34ca3e58fa..8fa5a2098dcc7 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -165,6 +165,15 @@ class StringBuffer : public Buffer::Instance { std::string toString() const override { return std::string(data_.data() + start_, size_); } + void setWatermarks(uint32_t) override { + // Not implemented. + // TODO(antoniovicente) Implement and add fuzz coverage as we merge the Buffer::OwnedImpl and + // WatermarkBuffer implementations. + ASSERT(false); + } + uint32_t highWatermark() const override { return 0; } + bool highWatermarkTriggered() const override { return false; } + absl::string_view asStringView() const { return {start(), size_}; } char* mutableStart() { return data_.data() + start_; } diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index d5185314e26a5..20e2dcb7cd6f0 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -119,7 +119,7 @@ TEST_P(ConnectionImplDeathTest, BadFd) { class TestClientConnectionImpl : public Network::ClientConnectionImpl { public: using ClientConnectionImpl::ClientConnectionImpl; - Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; } + Buffer::Instance& readBuffer() { return *read_buffer_; } }; class ConnectionImplTest : public testing::TestWithParam { diff --git a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc index 3398b1d70ad68..9f234951f8137 100644 --- a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc +++ b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc @@ -533,6 +533,9 @@ class FakeBuffer : public Buffer::Instance { MOCK_METHOD(ssize_t, search, (const void*, uint64_t, size_t, size_t), (const, override)); MOCK_METHOD(bool, startsWith, (absl::string_view), (const, override)); MOCK_METHOD(std::string, toString, (), (const, override)); + MOCK_METHOD(void, setWatermarks, (uint32_t), (override)); + MOCK_METHOD(uint32_t, highWatermark, (), (const, override)); + MOCK_METHOD(bool, highWatermarkTriggered, (), (const, override)); }; // Test verifies that decoder calls Buffer::linearize method diff --git a/test/extensions/transport_sockets/tls/ssl_socket_test.cc b/test/extensions/transport_sockets/tls/ssl_socket_test.cc index 3d4279edd82e6..32eed8ca68801 100644 --- a/test/extensions/transport_sockets/tls/ssl_socket_test.cc +++ b/test/extensions/transport_sockets/tls/ssl_socket_test.cc @@ -4906,7 +4906,7 @@ class SslReadBufferLimitTest : public SslSocketTest { // By default, expect 4 buffers to be created - the client and server read and write buffers. EXPECT_CALL(*factory, create_(_, _, _)) - .Times(2) + .Times(4) .WillOnce(Invoke([&](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { client_write_buffer = new MockWatermarkBuffer(below_low, above_high, above_overflow); diff --git a/test/integration/integration_tcp_client.cc b/test/integration/integration_tcp_client.cc index 39c040d4b3b09..500d26d42aece 100644 --- a/test/integration/integration_tcp_client.cc +++ b/test/integration/integration_tcp_client.cc @@ -42,6 +42,7 @@ IntegrationTcpClient::IntegrationTcpClient( : payload_reader_(new WaitForPayloadReader(dispatcher)), callbacks_(new ConnectionCallbacks(*this)) { EXPECT_CALL(factory, create_(_, _, _)) + .Times(AtLeast(1)) .WillOnce(Invoke([&](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { client_write_buffer_ = diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc index d55765f66950b..5bb07ac890930 100644 --- a/test/integration/tcp_proxy_integration_test.cc +++ b/test/integration/tcp_proxy_integration_test.cc @@ -24,6 +24,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::AtLeast; using testing::Invoke; using testing::MatchesRegex; using testing::NiceMock; @@ -1067,7 +1068,7 @@ void TcpProxySslIntegrationTest::setupConnections() { // buffer. This allows us to track the bytes actually written to the socket. EXPECT_CALL(*mock_buffer_factory_, create_(_, _, _)) - .Times(1) + .Times(AtLeast(1)) .WillOnce(Invoke([&](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { client_write_buffer_ = diff --git a/test/mocks/buffer/mocks.h b/test/mocks/buffer/mocks.h index 87d9c11f29426..18465118c4baf 100644 --- a/test/mocks/buffer/mocks.h +++ b/test/mocks/buffer/mocks.h @@ -76,7 +76,9 @@ class MockBufferFactory : public Buffer::WatermarkFactory { Buffer::InstancePtr create(std::function below_low, std::function above_high, std::function above_overflow) override { - return Buffer::InstancePtr{create_(below_low, above_high, above_overflow)}; + auto buffer = Buffer::InstancePtr{create_(below_low, above_high, above_overflow)}; + ASSERT(buffer != nullptr); + return buffer; } MOCK_METHOD(Buffer::Instance*, create_, diff --git a/test/mocks/event/mocks.cc b/test/mocks/event/mocks.cc index 215e7cafbdb4a..a8db4995abb3a 100644 --- a/test/mocks/event/mocks.cc +++ b/test/mocks/event/mocks.cc @@ -25,6 +25,12 @@ MockDispatcher::MockDispatcher(const std::string& name) : name_(name) { })); ON_CALL(*this, createTimer_(_)).WillByDefault(ReturnNew>()); ON_CALL(*this, post(_)).WillByDefault(Invoke([](PostCb cb) -> void { cb(); })); + + ON_CALL(buffer_factory_, create_(_, _, _)) + .WillByDefault(Invoke([](std::function below_low, std::function above_high, + std::function above_overflow) -> Buffer::Instance* { + return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); + })); } MockDispatcher::~MockDispatcher() = default; diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index 8aad73db5a324..8e29e84c3b326 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -139,7 +139,7 @@ class MockDispatcher : public Dispatcher { GlobalTimeSystem time_system_; std::list to_delete_; - MockBufferFactory buffer_factory_; + testing::NiceMock buffer_factory_; private: const std::string name_;