diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index d00130d48d901..2ac7674e37668 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -131,7 +131,8 @@ class Instance { /** * Binds the account to be charged for resources used by the buffer. This - * should only be called once. + * should only be called when the buffer is empty as existing slices + * within the buffer won't retroactively get tagged. * * @param account a shared_ptr to the account to charge. */ @@ -449,6 +450,7 @@ class Instance { * @param watermark supplies the buffer high watermark size threshold, in bytes. */ virtual void setWatermarks(uint32_t watermark) PURE; + /** * Returns the configured high watermark. A return value of 0 indicates that watermark * functionality is disabled. diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 9e91a4d9df4a0..1fd8d2e6d42f3 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -379,6 +379,12 @@ class Stream { * small window updates as satisfying the idle timeout as this is a potential DoS vector. */ virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE; + + /** + * Sets the account for this stream, propagating it to all of its buffers. + * @param the account to assign this stream. + */ + virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE; }; /** diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 48a23a7e8e7b0..3d15d9735da19 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -6,6 +6,7 @@ #include #include "envoy/access_log/access_log.h" +#include "envoy/buffer/buffer.h" #include "envoy/common/scope_tracker.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/status.h" @@ -556,6 +557,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { */ virtual uint32_t decoderBufferLimit() PURE; + /** + * @return the account, if any, used by this stream. + */ + virtual Buffer::BufferMemoryAccountSharedPtr account() const PURE; + /** * Takes a stream, and acts as if the headers are newly arrived. * On success, this will result in a creating a new filter chain and likely diff --git a/include/envoy/router/router.h b/include/envoy/router/router.h index 95944beddb2b0..a143cd11490b1 100644 --- a/include/envoy/router/router.h +++ b/include/envoy/router/router.h @@ -1327,6 +1327,12 @@ class GenericUpstream { * @param reason supplies the reset reason. */ virtual void resetStream() PURE; + + /** + * Sets the upstream to use the following account. + * @param the account to assign the generic upstream. + */ + virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE; }; using GenericConnPoolPtr = std::unique_ptr; diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 6d2241171a135..3187cae8abe52 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -43,11 +43,11 @@ void OwnedImpl::addDrainTracker(std::function drain_tracker) { void OwnedImpl::bindAccount(BufferMemoryAccountSharedPtr account) { ASSERT(slices_.empty()); - // We don't yet have an account bound. - ASSERT(!account_); account_ = std::move(account); } +BufferMemoryAccountSharedPtr OwnedImpl::getAccountForTest() { return account_; } + void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); } void OwnedImpl::addBufferFragment(BufferFragment& fragment) { diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index a794b0e020514..35d2c16b8079c 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -710,6 +710,11 @@ class OwnedImpl : public LibEventInstance { */ virtual void appendSliceForTest(absl::string_view data); + /** + * @return the BufferMemoryAccount bound to this buffer, if any. + */ + BufferMemoryAccountSharedPtr getAccountForTest(); + // Does not implement watermarking. // TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer // implementations. Also, make high-watermark config a constructor argument. diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 68d260b155320..c8700cc0ca151 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -47,7 +47,7 @@ class WatermarkBuffer : public OwnedImpl { protected: virtual void checkHighAndOverflowWatermarks(); - void checkLowWatermark(); + virtual void checkLowWatermark(); private: void commit(uint64_t length, absl::Span slices, diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 798907b0763be..a37babf467f98 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -365,6 +365,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; } void clearRouteCache() override {} uint64_t streamId() const override { return stream_id_; } + // TODO(kbaichoo): Plumb account from owning request filter. + Buffer::BufferMemoryAccountSharedPtr account() const override { return nullptr; } Tracing::Span& activeSpan() override { return active_span_; } const Tracing::Config& tracingConfig() override { return tracing_config_; } void continueDecoding() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 87fd8073de1fb..b18102f31b5b7 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -259,7 +259,18 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod } ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection()); - ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit())); + + // Set the account to start accounting if enabled. This is still a + // work-in-progress, and will be removed when other features using the + // accounting are implemented. + Buffer::BufferMemoryAccountSharedPtr downstream_request_account; + if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) { + downstream_request_account = std::make_shared(); + response_encoder.getStream().setAccount(downstream_request_account); + } + ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), + std::move(downstream_request_account))); + new_stream->state_.is_internally_created_ = is_internally_created; new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); @@ -571,13 +582,14 @@ void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestSrdsUpdate( } ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager, - uint32_t buffer_limit) + uint32_t buffer_limit, + Buffer::BufferMemoryAccountSharedPtr account) : connection_manager_(connection_manager), stream_id_(connection_manager.random_generator_.random()), filter_manager_(*this, connection_manager_.read_callbacks_->connection().dispatcher(), connection_manager_.read_callbacks_->connection(), stream_id_, - connection_manager_.config_.proxy100Continue(), buffer_limit, - connection_manager_.config_.filterFactory(), + std::move(account), connection_manager_.config_.proxy100Continue(), + buffer_limit, connection_manager_.config_.filterFactory(), connection_manager_.config_.localReply(), connection_manager_.codec_->protocol(), connection_manager_.timeSource(), connection_manager_.read_callbacks_->connection().streamInfo().filterState(), diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index ce8c6648a84e7..7a9156c787101 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -156,7 +156,8 @@ class ConnectionManagerImpl : Logger::Loggable, public Tracing::Config, public ScopeTrackedObject, public FilterManagerCallbacks { - ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit); + ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit, + Buffer::BufferMemoryAccountSharedPtr account); void completeRequest(); const Network::Connection* connection(); diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 982dd80a864cf..a962365e10f36 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1578,5 +1578,9 @@ void ActiveStreamFilterBase::resetStream() { parent_.filter_manager_callbacks_.r uint64_t ActiveStreamFilterBase::streamId() const { return parent_.streamId(); } +Buffer::BufferMemoryAccountSharedPtr ActiveStreamDecoderFilter::account() const { + return parent_.account(); +} + } // namespace Http } // namespace Envoy diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index c861d08a228fc..2f65055c77839 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -3,6 +3,7 @@ #include #include +#include "envoy/buffer/buffer.h" #include "envoy/common/optref.h" #include "envoy/extensions/filters/common/matcher/action/v3/skip_action.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" @@ -277,6 +278,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) override; Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override; + Buffer::BufferMemoryAccountSharedPtr account() const override; // Each decoder filter instance checks if the request passed to the filter is gRPC // so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders() @@ -640,15 +642,16 @@ class FilterManager : public ScopeTrackedObject, Logger::Loggable { public: FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher, - const Network::Connection& connection, uint64_t stream_id, bool proxy_100_continue, + const Network::Connection& connection, uint64_t stream_id, + Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue, uint32_t buffer_limit, FilterChainFactory& filter_chain_factory, const LocalReply::LocalReply& local_reply, Http::Protocol protocol, TimeSource& time_source, StreamInfo::FilterStateSharedPtr parent_filter_state, StreamInfo::FilterState::LifeSpan filter_state_life_span) : filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher), - connection_(connection), stream_id_(stream_id), proxy_100_continue_(proxy_100_continue), - buffer_limit_(buffer_limit), filter_chain_factory_(filter_chain_factory), - local_reply_(local_reply), + connection_(connection), stream_id_(stream_id), account_(std::move(account)), + proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit), + filter_chain_factory_(filter_chain_factory), local_reply_(local_reply), stream_info_(protocol, time_source, connection.addressProviderSharedPtr(), parent_filter_state, filter_state_life_span) {} ~FilterManager() override { @@ -913,6 +916,7 @@ class FilterManager : public ScopeTrackedObject, const Network::Connection* connection() const { return &connection_; } uint64_t streamId() const { return stream_id_; } + Buffer::BufferMemoryAccountSharedPtr account() const { return account_; } Buffer::InstancePtr& bufferedRequestData() { return buffered_request_data_; } @@ -986,6 +990,7 @@ class FilterManager : public ScopeTrackedObject, Event::Dispatcher& dispatcher_; const Network::Connection& connection_; const uint64_t stream_id_; + Buffer::BufferMemoryAccountSharedPtr account_; const bool proxy_100_continue_; std::list decoder_filters_; diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 031d972a5a08b..4cfd38b5b1ec9 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -69,6 +69,10 @@ class StreamEncoderImpl : public virtual StreamEncoder, // require a flush timeout not already covered by other timeouts. } + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + // TODO(kbaichoo): implement account tracking for H1. + } + void setIsResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; } void setIsResponseToConnectRequest(bool value) { is_response_to_connect_request_ = value; } void setDetails(absl::string_view details) { details_ = details; } diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 758dea9d326fc..5f3cc901a7684 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -130,8 +130,17 @@ template static T* removeConst(const void* object) { } ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : parent_(parent), local_end_stream_sent_(false), remote_end_stream_(false), - data_deferred_(false), received_noninformational_headers_(false), + : parent_(parent), + pending_recv_data_(parent_.connection_.dispatcher().getWatermarkFactory().create( + [this]() -> void { this->pendingRecvBufferLowWatermark(); }, + [this]() -> void { this->pendingRecvBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + pending_send_data_(parent_.connection_.dispatcher().getWatermarkFactory().create( + [this]() -> void { this->pendingSendBufferLowWatermark(); }, + [this]() -> void { this->pendingSendBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false), + received_noninformational_headers_(false), pending_receive_buffer_high_watermark_called_(false), pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) { parent_.stats_.streams_active_.inc(); @@ -145,7 +154,7 @@ ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr void ConnectionImpl::StreamImpl::destroy() { disarmStreamIdleTimer(); parent_.stats_.streams_active_.dec(); - parent_.stats_.pending_send_bytes_.sub(pending_send_data_.length()); + parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); } static void insertHeader(std::vector& headers, const HeaderEntry& header) { @@ -253,7 +262,7 @@ void ConnectionImpl::ServerStreamImpl::encodeHeaders(const ResponseHeaderMap& he void ConnectionImpl::StreamImpl::encodeTrailersBase(const HeaderMap& trailers) { ASSERT(!local_end_stream_); local_end_stream_ = true; - if (pending_send_data_.length() > 0) { + if (pending_send_data_->length() > 0) { // In this case we want trailers to come after we release all pending body data that is // waiting on window updates. We need to save the trailers so that we can emit them later. // However, for empty trailers, we don't need to to save the trailers. @@ -409,13 +418,13 @@ void ConnectionImpl::StreamImpl::submitMetadata(uint8_t flags) { } ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) { - if (pending_send_data_.length() == 0 && !local_end_stream_) { + if (pending_send_data_->length() == 0 && !local_end_stream_) { ASSERT(!data_deferred_); data_deferred_ = true; return NGHTTP2_ERR_DEFERRED; } else { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; - if (local_end_stream_ && pending_send_data_.length() <= length) { + if (local_end_stream_ && pending_send_data_->length() <= length) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; if (pending_trailers_to_encode_) { // We need to tell the library to not set end stream so that we can emit the trailers. @@ -425,7 +434,7 @@ ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* } } - return std::min(length, pending_send_data_.length()); + return std::min(length, pending_send_data_->length()); } } @@ -446,7 +455,7 @@ void ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t } parent_.stats_.pending_send_bytes_.sub(length); - output.move(pending_send_data_, length); + output.move(*pending_send_data_, length); parent_.connection_.write(output, false); } @@ -491,7 +500,9 @@ void ConnectionImpl::StreamImpl::onPendingFlushTimer() { void ConnectionImpl::StreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { ASSERT(!local_end_stream_); - encodeDataHelper(data, end_stream, /*skip_encoding_empty_trailers=*/false); + encodeDataHelper(data, end_stream, + /*skip_encoding_empty_trailers=*/ + false); } void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool end_stream, @@ -502,7 +513,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e local_end_stream_ = end_stream; parent_.stats_.pending_send_bytes_.add(data.length()); - pending_send_data_.move(data); + pending_send_data_->move(data); if (data_deferred_) { int rc = nghttp2_session_resume_data(parent_.session_, stream_id_); ASSERT(rc == 0); @@ -514,7 +525,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e // Intended to check through coverage that this error case is tested return; } - if (local_end_stream_ && pending_send_data_.length() > 0) { + if (local_end_stream_ && pending_send_data_->length() > 0) { createPendingFlushTimer(); } } @@ -576,6 +587,12 @@ void ConnectionImpl::StreamImpl::onMetadataDecoded(MetadataMapPtr&& metadata_map } } +void ConnectionImpl::StreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) { + buffer_memory_account_ = account; + pending_recv_data_->bindAccount(buffer_memory_account_); + pending_send_data_->bindAccount(buffer_memory_account_); +} + ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stats, Random::RandomGenerator& random_generator, const envoy::config::core::v3::Http2ProtocolOptions& http2_options, @@ -707,7 +724,7 @@ int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) { StreamImpl* stream = getStream(stream_id); // If this results in buffering too much data, the watermark buffer will call // pendingRecvBufferHighWatermark, resulting in ++read_disable_count_ - stream->pending_recv_data_.add(data, len); + stream->pending_recv_data_->add(data, len); // Update the window to the peer unless some consumer of this stream's data has hit a flow control // limit and disabled reads on this stream if (!stream->buffersOverrun()) { @@ -862,10 +879,10 @@ Status ConnectionImpl::onFrameReceived(const nghttp2_frame* frame) { // It's possible that we are waiting to send a deferred reset, so only raise data if local // is not complete. if (!stream->deferred_reset_) { - stream->decoder().decodeData(stream->pending_recv_data_, stream->remote_end_stream_); + stream->decoder().decodeData(*stream->pending_recv_data_, stream->remote_end_stream_); } - stream->pending_recv_data_.drain(stream->pending_recv_data_.length()); + stream->pending_recv_data_->drain(stream->pending_recv_data_->length()); break; } case NGHTTP2_RST_STREAM: { diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 6f4a3a6f0270e..9a767e1c8337b 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -8,6 +8,7 @@ #include #include +#include "envoy/buffer/buffer.h" #include "envoy/common/random_generator.h" #include "envoy/common/scope_tracker.h" #include "envoy/config/core/v3/protocol.pb.h" @@ -17,6 +18,7 @@ #include "common/buffer/buffer_impl.h" #include "common/buffer/watermark_buffer.h" +#include "common/common/assert.h" #include "common/common/linked_object.h" #include "common/common/logger.h" #include "common/common/statusor.h" @@ -229,7 +231,7 @@ class ConnectionImpl : public virtual Connection, void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacksHelper(callbacks); } void resetStream(StreamResetReason reason) override; void readDisable(bool disable) override; - uint32_t bufferLimit() override { return pending_recv_data_.highWatermark(); } + uint32_t bufferLimit() override { return pending_recv_data_->highWatermark(); } const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override { return parent_.connection_.addressProvider().localAddress(); } @@ -237,6 +239,7 @@ class ConnectionImpl : public virtual Connection, void setFlushTimeout(std::chrono::milliseconds timeout) override { stream_idle_timeout_ = timeout; } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override; // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; @@ -258,8 +261,8 @@ class ConnectionImpl : public virtual Connection, } void setWriteBufferWatermarks(uint32_t high_watermark) { - pending_recv_data_.setWatermarks(high_watermark); - pending_send_data_.setWatermarks(high_watermark); + pending_recv_data_->setWatermarks(high_watermark); + pending_send_data_->setWatermarks(high_watermark); } // If the receive buffer encounters watermark callbacks, enable/disable reads on this stream. @@ -293,19 +296,14 @@ class ConnectionImpl : public virtual Connection, uint32_t unconsumed_bytes_{0}; uint32_t read_disable_count_{0}; + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; // Note that in current implementation the watermark callbacks of the pending_recv_data_ are // never called. The watermark value is set to the size of the stream window. As a result this // watermark can never overflow because the peer can never send more bytes than the stream // window without triggering protocol error and this buffer is drained after each DATA frame was // dispatched through the filter chain. See source/docs/flow_control.md for more information. - Buffer::WatermarkBuffer pending_recv_data_{ - [this]() -> void { this->pendingRecvBufferLowWatermark(); }, - [this]() -> void { this->pendingRecvBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }}; - Buffer::WatermarkBuffer pending_send_data_{ - [this]() -> void { this->pendingSendBufferLowWatermark(); }, - [this]() -> void { this->pendingSendBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }}; + Buffer::InstancePtr pending_recv_data_; + Buffer::InstancePtr pending_send_data_; HeaderMapPtr pending_trailers_to_encode_; std::unique_ptr metadata_decoder_; std::unique_ptr metadata_encoder_; diff --git a/source/common/quic/envoy_quic_client_stream.h b/source/common/quic/envoy_quic_client_stream.h index a722accc6bfc2..b7e3f21d0d05f 100644 --- a/source/common/quic/envoy_quic_client_stream.h +++ b/source/common/quic/envoy_quic_client_stream.h @@ -1,5 +1,7 @@ #pragma once +#include "envoy/buffer/buffer.h" + #if defined(__GNUC__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -46,6 +48,10 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, // Http::Stream void resetStream(Http::StreamResetReason reason) override; void setFlushTimeout(std::chrono::milliseconds) override {} + + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + // TODO(kbaichoo): implement account tracking for QUIC. + } // quic::QuicSpdyStream void OnBodyAvailable() override; void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; diff --git a/source/common/quic/envoy_quic_server_stream.h b/source/common/quic/envoy_quic_server_stream.h index f86857cc800df..ac44a083b1f4b 100644 --- a/source/common/quic/envoy_quic_server_stream.h +++ b/source/common/quic/envoy_quic_server_stream.h @@ -54,6 +54,10 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, void setFlushTimeout(std::chrono::milliseconds) override { // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. } + + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + // TODO(kbaichoo): implement account tracking for QUIC. + } // quic::QuicSpdyStream void OnBodyAvailable() override; bool OnStopSending(quic::QuicRstStreamErrorCode error) override; diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 3213f156aa812..355f577a38bc7 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -371,6 +371,9 @@ void UpstreamRequest::onPoolReady( ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks()); upstream_ = std::move(upstream); + // Have the upstream use the account of the downstream. + upstream_->setAccount(parent_.callbacks()->account()); + if (parent_.requestVcluster()) { // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 0f8995fdb3f12..542eb056ca251 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -110,6 +110,8 @@ constexpr const char* disabled_runtime_features[] = { "envoy.reloadable_features.remove_legacy_json", // Sentinel and test flag. "envoy.reloadable_features.test_feature_false", + // TODO(kbaichoo): Remove when this is no longer test only. + "envoy.test_only.per_stream_buffer_accounting", // Allows the use of ExtensionWithMatcher to wrap a HTTP filter with a match tree. "envoy.reloadable_features.experimental_matching_api", }; diff --git a/source/extensions/upstreams/http/http/upstream_request.h b/source/extensions/upstreams/http/http/upstream_request.h index 94f5a0a0fda6f..c36dbb4f1a97a 100644 --- a/source/extensions/upstreams/http/http/upstream_request.h +++ b/source/extensions/upstreams/http/http/upstream_request.h @@ -82,6 +82,10 @@ class HttpUpstream : public Router::GenericUpstream, public Envoy::Http::StreamC request_encoder_->getStream().resetStream(Envoy::Http::StreamResetReason::LocalReset); } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { + request_encoder_->getStream().setAccount(std::move(account)); + } + // Http::StreamCallbacks void onResetStream(Envoy::Http::StreamResetReason reason, absl::string_view transport_failure_reason) override { diff --git a/source/extensions/upstreams/http/tcp/upstream_request.h b/source/extensions/upstreams/http/tcp/upstream_request.h index 3a002f4b7a128..bffc7ee447d1d 100644 --- a/source/extensions/upstreams/http/tcp/upstream_request.h +++ b/source/extensions/upstreams/http/tcp/upstream_request.h @@ -74,6 +74,7 @@ class TcpUpstream : public Router::GenericUpstream, void encodeTrailers(const Envoy::Http::RequestTrailerMap&) override; void readDisable(bool disable) override; void resetStream() override; + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override {} // Tcp::ConnectionPool::UpstreamCallbacks void onUpstreamData(Buffer::Instance& data, bool end_stream) override; diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 493f31b8b31db..1e8faab75182f 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -193,6 +193,7 @@ class StringBuffer : public Buffer::Instance { // WatermarkBuffer implementations. ASSERT(false); } + uint32_t highWatermark() const override { return 0; } bool highWatermarkTriggered() const override { return false; } diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h index c8e174853fb4e..674b7818d53a1 100644 --- a/test/common/http/conn_manager_impl_test_base.h +++ b/test/common/http/conn_manager_impl_test_base.h @@ -197,7 +197,7 @@ class HttpConnectionManagerImplTest : public testing::Test, public ConnectionMan std::make_shared>()}; TracingConnectionManagerConfigPtr tracing_config_; SlowDateProviderImpl date_provider_{test_time_.timeSystem()}; - MockStream stream_; + NiceMock stream_; Http::StreamCallbacks* stream_callbacks_{nullptr}; NiceMock cluster_manager_; NiceMock overload_manager_; diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index b90a099367efb..477184462d001 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -28,8 +28,8 @@ class FilterManagerTest : public testing::Test { public: void initialize() { filter_manager_ = std::make_unique( - filter_manager_callbacks_, dispatcher_, connection_, 0, true, 10000, filter_factory_, - local_reply_, protocol_, time_source_, filter_state_, + filter_manager_callbacks_, dispatcher_, connection_, 0, nullptr, true, 10000, + filter_factory_, local_reply_, protocol_, time_source_, filter_state_, StreamInfo::FilterState::LifeSpan::Connection); } diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 5572f3f5dcb09..0752c0796fff6 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -1278,7 +1278,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { // Now that the flow control window is full, further data causes the send buffer to back up. Buffer::OwnedImpl more_long_data(std::string(initial_stream_window, 'a')); request_encoder_->encodeData(more_long_data, false); - EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(initial_stream_window, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); EXPECT_EQ(initial_stream_window, server_->getStream(1)->unconsumed_bytes_); @@ -1287,7 +1287,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); Buffer::OwnedImpl last_byte("!"); request_encoder_->encodeData(last_byte, false); - EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(initial_stream_window + 1, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); @@ -1332,7 +1332,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks2, onBelowWriteBufferLowWatermark()).Times(0); EXPECT_CALL(callbacks3, onBelowWriteBufferLowWatermark()); server_->getStream(1)->readDisable(false); - EXPECT_EQ(0, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(0, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); // The extra 1 byte sent won't trigger another window update, so the final window should be the // initial window minus the last 1 byte flush from the client to server. diff --git a/test/integration/BUILD b/test/integration/BUILD index 41c7e0d44e2e6..1dfbc527eae75 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -320,12 +320,12 @@ envoy_cc_test( deps = [ ":autonomous_upstream_lib", ":http_integration_lib", + ":socket_interface_swap_lib", ":tracked_watermark_buffer_lib", "//test/common/http/http2:http2_frame", "//test/integration/filters:backpressure_filter_config_lib", "//test/integration/filters:set_response_code_filter_config_proto_cc_proto", "//test/integration/filters:set_response_code_filter_lib", - "//test/integration/filters:test_socket_interface_lib", "//test/mocks/http:http_mocks", "//test/test_common:utility_lib", "@com_google_absl//absl/synchronization", @@ -366,6 +366,37 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "buffer_accounting_integration_test", + srcs = [ + "buffer_accounting_integration_test.cc", + ], + deps = [ + ":http_integration_lib", + ":socket_interface_swap_lib", + ":tracked_watermark_buffer_lib", + "//test/mocks/http:http_mocks", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + ], +) + +envoy_cc_test_library( + name = "socket_interface_swap_lib", + srcs = [ + "socket_interface_swap.cc", + ], + hdrs = [ + "socket_interface_swap.h", + ], + deps = [ + "//test/integration/filters:test_socket_interface_lib", + "@com_google_absl//absl/synchronization", + ], +) + envoy_cc_test( name = "http_subset_lb_integration_test", srcs = [ @@ -1295,6 +1326,7 @@ envoy_cc_test_library( ], deps = [ "//source/common/buffer:watermark_buffer_lib", + "//test/test_common:utility_lib", ], ) diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc new file mode 100644 index 0000000000000..1ccfdb1121336 --- /dev/null +++ b/test/integration/buffer_accounting_integration_test.cc @@ -0,0 +1,227 @@ +#include + +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/network/address.h" + +#include "common/buffer/buffer_impl.h" + +#include "test/integration/autonomous_upstream.h" +#include "test/integration/tracked_watermark_buffer.h" +#include "test/integration/utility.h" +#include "test/mocks/http/mocks.h" + +#include "fake_upstream.h" +#include "gtest/gtest.h" +#include "http_integration.h" +#include "integration_stream_decoder.h" +#include "socket_interface_swap.h" + +namespace Envoy { +namespace { +std::string ipVersionAndBufferAccountingTestParamsToString( + const ::testing::TestParamInfo>& params) { + return fmt::format( + "{}_{}", + TestUtility::ipTestParamsToString(::testing::TestParamInfo( + std::get<0>(params.param), params.index)), + std::get<1>(params.param) ? "with_per_stream_buffer_accounting" + : "without_per_stream_buffer_accounting"); +} +} // namespace + +class HttpBufferWatermarksTest + : public SocketInterfaceSwap, + public testing::TestWithParam>, + public HttpIntegrationTest { +public: + std::vector + sendRequests(uint32_t num_responses, uint32_t request_body_size, uint32_t response_body_size) { + std::vector responses; + + Http::TestRequestHeaderMapImpl header_map{ + {"response_data_blocks", absl::StrCat(1)}, + {"response_size_bytes", absl::StrCat(response_body_size)}, + {"no_trailers", "0"}}; + header_map.copyFrom(default_request_headers_); + header_map.setContentLength(request_body_size); + + for (uint32_t idx = 0; idx < num_responses; ++idx) { + responses.emplace_back(codec_client_->makeRequestWithBody(header_map, request_body_size)); + } + + return responses; + } + + // TODO(kbaichoo): Parameterize on the client codec type when other protocols + // (H1, H3) support buffer accounting. + HttpBufferWatermarksTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) { + config_helper_.addRuntimeOverride("envoy.test_only.per_stream_buffer_accounting", + streamBufferAccounting() ? "true" : "false"); + setServerBufferFactory(buffer_factory_); + setDownstreamProtocol(Http::CodecClient::Type::HTTP2); + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); + } + +protected: + std::shared_ptr buffer_factory_ = + std::make_shared(); + + bool streamBufferAccounting() { return std::get<1>(GetParam()); } + + std::string printAccounts() { + std::stringstream stream; + auto print_map = + [&stream](Buffer::TrackedWatermarkBufferFactory::AccountToBoundBuffersMap& map) { + stream << "Printing Account map. Size: " << map.size() << '\n'; + for (auto& entry : map) { + // This runs in the context of the worker thread, so we can access + // the balance. + stream << " Account: " << entry.first << '\n'; + stream << " Balance:" + << static_cast(entry.first.get())->balance() + << '\n'; + stream << " Number of associated buffers: " << entry.second.size() << '\n'; + } + }; + buffer_factory_->inspectAccounts(print_map, test_server_->server()); + return stream.str(); + } +}; + +INSTANTIATE_TEST_SUITE_P( + IpVersions, HttpBufferWatermarksTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), + ipVersionAndBufferAccountingTestParamsToString); + +// We should create four buffers each billing the same downstream request's +// account which originated the chain. +TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { + FakeStreamPtr upstream_request1; + FakeStreamPtr upstream_request2; + default_request_headers_.setContentLength(1000); + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends the first request. + auto response1 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request1 = std::move(upstream_request_); + + // Check the expected number of buffers per account + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); + } else { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + } + + // Send the second request. + auto response2 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request2 = std::move(upstream_request_); + + // Check the expected number of buffers per account + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 8)); + } else { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + } + + // Respond to the first request and wait for complete + upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request1->encodeData(1000, true); + ASSERT_TRUE(response1->waitForEndStream()); + ASSERT_TRUE(upstream_request1->complete()); + + // Check the expected number of buffers per account + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); + } else { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + } + + // Respond to the second request and wait for complete + upstream_request2->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request2->encodeData(1000, true); + ASSERT_TRUE(response2->waitForEndStream()); + ASSERT_TRUE(upstream_request2->complete()); + + // Check the expected number of buffers per account + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); +} + +TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { + const int num_requests = 5; + const uint32_t request_body_size = 4096; + const uint32_t response_body_size = 4096; + + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + initialize(); + + buffer_factory_->setExpectedAccountBalance(request_body_size, num_requests); + + // Makes us have Envoy's writes to upstream return EAGAIN + writev_matcher_->setDestinationPort(fake_upstreams_[0]->localAddress()->ip()->port()); + writev_matcher_->setWritevReturnsEgain(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto responses = sendRequests(num_requests, request_body_size, response_body_size); + + // Wait for all requests to have accounted for the requests we've sent. + if (streamBufferAccounting()) { + EXPECT_TRUE( + buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); + } + + writev_matcher_->setResumeWrites(); + + for (auto& response : responses) { + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + } +} + +TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { + const int num_requests = 5; + const uint32_t request_body_size = 4096; + const uint32_t response_body_size = 16384; + + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + initialize(); + + buffer_factory_->setExpectedAccountBalance(response_body_size, num_requests); + writev_matcher_->setSourcePort(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Simulate TCP push back on the Envoy's downstream network socket, so that outbound frames + // start to accumulate in the transport socket buffer. + writev_matcher_->setWritevReturnsEgain(); + + auto responses = sendRequests(num_requests, request_body_size, response_body_size); + + // Wait for all requests to buffered the response from upstream. + if (streamBufferAccounting()) { + EXPECT_TRUE( + buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); + } + + writev_matcher_->setResumeWrites(); + + // Wait for streams to terminate. + for (auto& response : responses) { + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + } +} + +} // namespace Envoy diff --git a/test/integration/filters/test_socket_interface.h b/test/integration/filters/test_socket_interface.h index e32f8fe4a773d..07980b5a9e59a 100644 --- a/test/integration/filters/test_socket_interface.h +++ b/test/integration/filters/test_socket_interface.h @@ -27,12 +27,30 @@ class TestIoSocketHandle : public IoSocketHandleImpl { bool socket_v6only = false, absl::optional domain = absl::nullopt) : IoSocketHandleImpl(fd, socket_v6only, domain), writev_override_(writev_override_proc) {} + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override { + absl::MutexLock lock(&mutex_); + dispatcher_ = &dispatcher; + IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events); + } + + // Schedule resumption on the IoHandle by posting a callback to the IoHandle's dispatcher. Note + // that this operation is inherently racy, nothing guarantees that the TestIoSocketHandle is not + // deleted before the posted callback executes. + void activateInDispatcherThread(uint32_t events) { + absl::MutexLock lock(&mutex_); + RELEASE_ASSERT(dispatcher_ != nullptr, "null dispatcher"); + dispatcher_->post([this, events]() { activateFileEvents(events); }); + } + private: IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; IoHandlePtr duplicate() override; const WritevOverrideProc writev_override_; + absl::Mutex mutex_; + Event::Dispatcher* dispatcher_ ABSL_GUARDED_BY(mutex_) = nullptr; }; /** diff --git a/test/integration/http2_flood_integration_test.cc b/test/integration/http2_flood_integration_test.cc index 9d900353cb615..29875cbbec408 100644 --- a/test/integration/http2_flood_integration_test.cc +++ b/test/integration/http2_flood_integration_test.cc @@ -11,8 +11,8 @@ #include "common/network/socket_option_impl.h" #include "test/integration/autonomous_upstream.h" -#include "test/integration/filters/test_socket_interface.h" #include "test/integration/http_integration.h" +#include "test/integration/socket_interface_swap.h" #include "test/integration/tracked_watermark_buffer.h" #include "test/integration/utility.h" #include "test/mocks/http/mocks.h" @@ -31,70 +31,6 @@ const uint32_t ControlFrameFloodLimit = 100; const uint32_t AllFrameFloodLimit = 1000; } // namespace -class SocketInterfaceSwap { -public: - // Object of this class hold the state determining the IoHandle which - // should return EAGAIN from the `writev` call. - struct IoHandleMatcher { - bool shouldReturnEgain(uint32_t src_port, uint32_t dst_port) const { - absl::ReaderMutexLock lock(&mutex_); - return writev_returns_egain_ && (src_port == src_port_ || dst_port == dst_port_); - } - - // Source port to match. The port specified should be associated with a listener. - void setSourcePort(uint32_t port) { - absl::WriterMutexLock lock(&mutex_); - src_port_ = port; - } - - // Destination port to match. The port specified should be associated with a listener. - void setDestinationPort(uint32_t port) { - absl::WriterMutexLock lock(&mutex_); - dst_port_ = port; - } - - void setWritevReturnsEgain() { - absl::WriterMutexLock lock(&mutex_); - ASSERT(src_port_ != 0 || dst_port_ != 0); - writev_returns_egain_ = true; - } - - private: - mutable absl::Mutex mutex_; - uint32_t src_port_ ABSL_GUARDED_BY(mutex_) = 0; - uint32_t dst_port_ ABSL_GUARDED_BY(mutex_) = 0; - bool writev_returns_egain_ ABSL_GUARDED_BY(mutex_) = false; - }; - - SocketInterfaceSwap() { - Envoy::Network::SocketInterfaceSingleton::clear(); - test_socket_interface_loader_ = std::make_unique( - std::make_unique( - [writev_matcher = writev_matcher_]( - Envoy::Network::TestIoSocketHandle* io_handle, const Buffer::RawSlice*, - uint64_t) -> absl::optional { - if (writev_matcher->shouldReturnEgain(io_handle->localAddress()->ip()->port(), - io_handle->peerAddress()->ip()->port())) { - return Api::IoCallUint64Result( - 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), - Network::IoSocketError::deleteIoError)); - } - return absl::nullopt; - })); - } - - ~SocketInterfaceSwap() { - test_socket_interface_loader_.reset(); - Envoy::Network::SocketInterfaceSingleton::initialize(previous_socket_interface_); - } - -protected: - Envoy::Network::SocketInterface* const previous_socket_interface_{ - Envoy::Network::SocketInterfaceSingleton::getExisting()}; - std::shared_ptr writev_matcher_{std::make_shared()}; - std::unique_ptr test_socket_interface_loader_; -}; - // It is important that the new socket interface is installed before any I/O activity starts and // the previous one is restored after all I/O activity stops. Since the HttpIntegrationTest // destructor stops Envoy the SocketInterfaceSwap destructor needs to run after it. This order of @@ -396,7 +332,7 @@ TEST_P(Http2FloodMitigationTest, Data) { // The factory will be used to create 4 buffers: the input and output buffers for request and // response pipelines. - EXPECT_EQ(4, buffer_factory->numBuffersCreated()); + EXPECT_EQ(8, buffer_factory->numBuffersCreated()); // Expect at least 1000 1 byte data frames in the output buffer. Each data frame comes with a // 9-byte frame header; 10 bytes per data frame, 10000 bytes total. The output buffer should also @@ -411,7 +347,7 @@ TEST_P(Http2FloodMitigationTest, Data) { EXPECT_GE(22000, buffer_factory->sumMaxBufferSizes()); // Verify that all buffers have watermarks set. EXPECT_THAT(buffer_factory->highWatermarkRange(), - testing::Pair(1024 * 1024 * 1024, 1024 * 1024 * 1024)); + testing::Pair(256 * 1024 * 1024, 1024 * 1024 * 1024)); } // Verify that the server can detect flood triggered by a DATA frame from a decoder filter call diff --git a/test/integration/socket_interface_swap.cc b/test/integration/socket_interface_swap.cc new file mode 100644 index 0000000000000..521153817a256 --- /dev/null +++ b/test/integration/socket_interface_swap.cc @@ -0,0 +1,30 @@ +#include "test/integration/socket_interface_swap.h" + +namespace Envoy { + +SocketInterfaceSwap::SocketInterfaceSwap() { + Envoy::Network::SocketInterfaceSingleton::clear(); + test_socket_interface_loader_ = std::make_unique( + std::make_unique( + [writev_matcher = writev_matcher_](Envoy::Network::TestIoSocketHandle* io_handle, + const Buffer::RawSlice*, + uint64_t) -> absl::optional { + if (writev_matcher->shouldReturnEgain(io_handle)) { + return Api::IoCallUint64Result( + 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)); + } + return absl::nullopt; + })); +} + +void SocketInterfaceSwap::IoHandleMatcher::setResumeWrites() { + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition( + +[](Network::TestIoSocketHandle** matched_iohandle) { return *matched_iohandle != nullptr; }, + &matched_iohandle_)); + writev_returns_egain_ = false; + matched_iohandle_->activateInDispatcherThread(Event::FileReadyType::Write); +} + +} // namespace Envoy diff --git a/test/integration/socket_interface_swap.h b/test/integration/socket_interface_swap.h new file mode 100644 index 0000000000000..77e4fbca342dd --- /dev/null +++ b/test/integration/socket_interface_swap.h @@ -0,0 +1,73 @@ +#pragma once + +#include "common/network/socket_interface.h" + +#include "test/integration/filters/test_socket_interface.h" + +namespace Envoy { + +// Enables control at the socket level to stop and resume writes. +// Useful for tests want to temporarily stop Envoy from draining data. + +class SocketInterfaceSwap { +public: + // Object of this class hold the state determining the IoHandle which + // should return EAGAIN from the `writev` call. + struct IoHandleMatcher { + bool shouldReturnEgain(Envoy::Network::TestIoSocketHandle* io_handle) { + absl::MutexLock lock(&mutex_); + if (writev_returns_egain_ && (io_handle->localAddress()->ip()->port() == src_port_ || + io_handle->peerAddress()->ip()->port() == dst_port_)) { + ASSERT(matched_iohandle_ == nullptr || matched_iohandle_ == io_handle, + "Matched multiple io_handles, expected at most one to match."); + matched_iohandle_ = io_handle; + return true; + } + return false; + } + + // Source port to match. The port specified should be associated with a listener. + void setSourcePort(uint32_t port) { + absl::WriterMutexLock lock(&mutex_); + dst_port_ = 0; + src_port_ = port; + } + + // Destination port to match. The port specified should be associated with a listener. + void setDestinationPort(uint32_t port) { + absl::WriterMutexLock lock(&mutex_); + src_port_ = 0; + dst_port_ = port; + } + + void setWritevReturnsEgain() { + absl::WriterMutexLock lock(&mutex_); + ASSERT(src_port_ != 0 || dst_port_ != 0); + writev_returns_egain_ = true; + } + + void setResumeWrites(); + + private: + mutable absl::Mutex mutex_; + uint32_t src_port_ ABSL_GUARDED_BY(mutex_) = 0; + uint32_t dst_port_ ABSL_GUARDED_BY(mutex_) = 0; + bool writev_returns_egain_ ABSL_GUARDED_BY(mutex_) = false; + Network::TestIoSocketHandle* matched_iohandle_{}; + }; + + SocketInterfaceSwap(); + + ~SocketInterfaceSwap() { + test_socket_interface_loader_.reset(); + Envoy::Network::SocketInterfaceSingleton::initialize(previous_socket_interface_); + } + +protected: + Envoy::Network::SocketInterface* const previous_socket_interface_{ + Envoy::Network::SocketInterfaceSingleton::getExisting()}; + std::shared_ptr writev_matcher_{std::make_shared()}; + std::unique_ptr test_socket_interface_loader_; +}; + +} // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index e67acf088f3ae..3bc07cb7a9252 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -1,5 +1,11 @@ #include "test/integration/tracked_watermark_buffer.h" +#include "envoy/thread/thread.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/thread_local/thread_local_object.h" + +#include "common/common/assert.h" + namespace Envoy { namespace Buffer { @@ -18,18 +24,52 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, return std::make_unique( [this, &buffer_info](uint64_t current_size) { absl::MutexLock lock(&mutex_); + total_buffer_size_ = total_buffer_size_ + current_size - buffer_info.current_size_; if (buffer_info.max_size_ < current_size) { buffer_info.max_size_ = current_size; } + buffer_info.current_size_ = current_size; + + checkIfExpectedBalancesMet(); }, [this, &buffer_info](uint32_t watermark) { absl::MutexLock lock(&mutex_); buffer_info.watermark_ = watermark; }, - [this]() { + [this, &buffer_info](TrackedWatermarkBuffer* buffer) { absl::MutexLock lock(&mutex_); ASSERT(active_buffer_count_ > 0); --active_buffer_count_; + total_buffer_size_ -= buffer_info.current_size_; + buffer_info.current_size_ = 0; + + // Remove bound account tracking. + auto account = buffer->getAccountForTest(); + if (account) { + auto& set = account_infos_[account]; + RELEASE_ASSERT(set.erase(buffer) == 1, "Expected to remove buffer from account_infos."); + RELEASE_ASSERT(actively_bound_buffers_.erase(buffer) == 1, + "Did not find buffer in actively_bound_buffers_."); + // Erase account entry if there are no active bound buffers, and + // there's no other pointers to the account besides the local account + // pointer and within the map. + // + // It's possible for an account to no longer be bound to a buffer in + // the case that the H2 stream completes, but the data hasn't flushed + // at TCP. + if (set.empty() && account.use_count() == 2) { + RELEASE_ASSERT(account_infos_.erase(account) == 1, + "Expected to remove account from account_infos."); + } + } + }, + [this](BufferMemoryAccountSharedPtr& account, TrackedWatermarkBuffer* buffer) { + absl::MutexLock lock(&mutex_); + // Only track non-null accounts. + if (account) { + account_infos_[account].emplace(buffer); + actively_bound_buffers_.emplace(buffer); + } }, below_low_watermark, above_high_watermark, above_overflow_watermark); } @@ -44,6 +84,11 @@ uint64_t TrackedWatermarkBufferFactory::numBuffersActive() const { return active_buffer_count_; } +uint64_t TrackedWatermarkBufferFactory::totalBufferSize() const { + absl::MutexLock lock(&mutex_); + return total_buffer_size_; +} + uint64_t TrackedWatermarkBufferFactory::maxBufferSize() const { absl::MutexLock lock(&mutex_); uint64_t val = 0; @@ -94,5 +139,107 @@ std::pair TrackedWatermarkBufferFactory::highWatermarkRange( return std::make_pair(min_watermark, max_watermark); } +bool TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds( + uint64_t byte_size, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, byte_size]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + return total_buffer_size_ >= byte_size; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); +} + +void TrackedWatermarkBufferFactory::removeDanglingAccounts() { + auto accounts_it = account_infos_.begin(); + while (accounts_it != account_infos_.end()) { + auto next = std::next(accounts_it); + + // Remove all "dangling" accounts. + if (accounts_it->first.use_count() == 1) { + ASSERT(accounts_it->second.empty()); + account_infos_.erase(accounts_it); + } + + accounts_it = next; + } +} + +void TrackedWatermarkBufferFactory::inspectAccounts( + std::function func, Server::Instance& server) { + absl::Notification done_notification; + ThreadLocal::TypedSlotPtr<> slot; + Envoy::Thread::ThreadId main_tid; + + server.dispatcher().post([&] { + slot = ThreadLocal::TypedSlot<>::makeUnique(server.threadLocal()); + slot->set( + [](Envoy::Event::Dispatcher&) -> std::shared_ptr { + return nullptr; + }); + + main_tid = server.api().threadFactory().currentThreadId(); + + slot->runOnAllThreads( + [main_tid, &server, &func, this](OptRef) { + // Run on the worker thread. + if (server.api().threadFactory().currentThreadId() != main_tid) { + absl::MutexLock lock(&(this->mutex_)); + func(this->account_infos_); + } + }, + [&slot, &done_notification] { + slot.reset(nullptr); + done_notification.Notify(); + }); + }); + + done_notification.WaitForNotification(); +} + +void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size_per_account, + uint32_t num_accounts) { + absl::MutexLock lock(&mutex_); + ASSERT(!expected_balances_.has_value()); + expected_balances_.emplace(byte_size_per_account, num_accounts); +} + +bool TrackedWatermarkBufferFactory::waitForExpectedAccountBalanceWithTimeout( + std::chrono::milliseconds timeout) { + return expected_balances_met_.WaitForNotificationWithTimeout(absl::FromChrono(timeout)); +} + +bool TrackedWatermarkBufferFactory::waitUntilExpectedNumberOfAccountsAndBoundBuffers( + uint32_t num_accounts, uint32_t num_bound_buffers, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, num_accounts, num_bound_buffers]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + removeDanglingAccounts(); + return num_bound_buffers == actively_bound_buffers_.size() && + num_accounts == account_infos_.size(); + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::FromChrono(timeout)); +} + +void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { + if (!expected_balances_ || expected_balances_met_.HasBeenNotified()) { + return; + } + + removeDanglingAccounts(); + + if (account_infos_.size() == expected_balances_->num_accounts_) { + // This is thread safe since this function should run on the only Envoy worker + // thread. + for (auto& acc : account_infos_) { + if (static_cast(acc.first.get())->balance() < + expected_balances_->balance_per_account_) { + return; + } + } + + expected_balances_met_.Notify(); + } +} + } // namespace Buffer } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 64d8473dbf104..bc05db5d4ca03 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -1,9 +1,16 @@ #pragma once +#include "envoy/buffer/buffer.h" +#include "envoy/server/instance.h" + +#include "common/buffer/buffer_impl.h" #include "common/buffer/watermark_buffer.h" +#include "test/test_common/utility.h" + #include "absl/container/node_hash_map.h" #include "absl/synchronization/mutex.h" +#include "absl/synchronization/notification.h" namespace Envoy { namespace Buffer { @@ -11,31 +18,44 @@ namespace Buffer { // WatermarkBuffer subclass that hooks into updates to buffer size and buffer high watermark config. class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { public: - TrackedWatermarkBuffer(std::function update_max_size, - std::function update_high_watermark, - std::function on_delete, std::function below_low_watermark, - std::function above_high_watermark, - std::function above_overflow_watermark) + TrackedWatermarkBuffer( + std::function update_size, + std::function update_high_watermark, + std::function on_delete, + std::function on_bind, + std::function below_low_watermark, std::function above_high_watermark, + std::function above_overflow_watermark) : WatermarkBuffer(below_low_watermark, above_high_watermark, above_overflow_watermark), - update_max_size_(update_max_size), update_high_watermark_(update_high_watermark), - on_delete_(on_delete) {} - ~TrackedWatermarkBuffer() override { on_delete_(); } + update_size_(update_size), update_high_watermark_(update_high_watermark), + on_delete_(on_delete), on_bind_(on_bind) {} + ~TrackedWatermarkBuffer() override { on_delete_(this); } void setWatermarks(uint32_t watermark) override { update_high_watermark_(watermark); WatermarkBuffer::setWatermarks(watermark); } + void bindAccount(BufferMemoryAccountSharedPtr account) override { + on_bind_(account, this); + WatermarkBuffer::bindAccount(account); + } + protected: void checkHighAndOverflowWatermarks() override { - update_max_size_(length()); + update_size_(length()); WatermarkBuffer::checkHighAndOverflowWatermarks(); } + void checkLowWatermark() override { + update_size_(length()); + WatermarkBuffer::checkLowWatermark(); + } + private: - std::function update_max_size_; + std::function update_size_; std::function update_high_watermark_; - std::function on_delete_; + std::function on_delete_; + std::function on_bind_; }; // Factory that tracks how the created buffers are used. @@ -52,6 +72,8 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t numBuffersCreated() const; // Number of buffers still in use. uint64_t numBuffersActive() const; + // Total bytes buffered. + uint64_t totalBufferSize() const; // Size of the largest buffer. uint64_t maxBufferSize() const; // Sum of the max size of all known buffers. @@ -60,19 +82,84 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // functionality is disabled. std::pair highWatermarkRange() const; + // Total bytes currently buffered across all known buffers. + uint64_t totalBytesBuffered() const { + absl::MutexLock lock(&mutex_); + return total_buffer_size_; + } + + // Wait until total bytes buffered exceeds the a given size. + bool + waitUntilTotalBufferedExceeds(uint64_t byte_size, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + // Set the expected account balance, prior to sending requests. + // The test thread can then wait for this condition to be true. + // This is separated so that the test thread can spin up requests however it + // desires in between. + // + // The Envoy worker thread will notify the test thread once the condition is + // met. + void setExpectedAccountBalance(uint64_t byte_size_per_account, uint32_t num_accounts); + bool waitForExpectedAccountBalanceWithTimeout( + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + // Wait for the expected number of accounts and number of bound buffers. + // + // Due to deferred deletion, it possible that the Envoy hasn't cleaned up on + // its end, but the stream has been completed. This avoids that by awaiting + // for the side effect of the deletion to have occurred. + bool waitUntilExpectedNumberOfAccountsAndBoundBuffers( + uint32_t num_accounts, uint32_t num_bound_buffers, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + using AccountToBoundBuffersMap = + absl::flat_hash_map>; + void inspectAccounts(std::function func, + Server::Instance& server); + private: + // Remove "dangling" accounts; accounts where the account_info map is the only + // entity still pointing to the account. + void removeDanglingAccounts() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + // Should only be executed on the Envoy's worker thread. Otherwise, we have a + // possible race condition. + void checkIfExpectedBalancesMet() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + struct BufferInfo { uint32_t watermark_ = 0; + uint64_t current_size_ = 0; uint64_t max_size_ = 0; }; + struct AccountBalanceExpectations { + AccountBalanceExpectations(uint64_t balance_per_account, uint32_t num_accounts) + : balance_per_account_(balance_per_account), num_accounts_(num_accounts) {} + + uint64_t balance_per_account_ = 0; + uint32_t num_accounts_ = 0; + }; + mutable absl::Mutex mutex_; // Id of the next buffer to create. uint64_t next_idx_ ABSL_GUARDED_BY(mutex_) = 0; // Number of buffers currently in existence. uint64_t active_buffer_count_ ABSL_GUARDED_BY(mutex_) = 0; + // total bytes buffered across all buffers. + uint64_t total_buffer_size_ ABSL_GUARDED_BY(mutex_) = 0; // Info about the buffer, by buffer idx. absl::node_hash_map buffer_infos_ ABSL_GUARDED_BY(mutex_); + // The expected balances for the accounts. If set, when a buffer updates its + // size, it also checks whether the expected_balances has been satisfied, and + // notifies waiters of expected_balances_met_. + absl::optional expected_balances_ ABSL_GUARDED_BY(mutex_); + absl::Notification expected_balances_met_; + // Map from accounts to buffers bound to that account. + AccountToBoundBuffersMap account_infos_ ABSL_GUARDED_BY(mutex_); + // Set of actively bound buffers. Used for asserting that buffers are bound + // only once. + absl::flat_hash_set actively_bound_buffers_ ABSL_GUARDED_BY(mutex_); }; } // namespace Buffer diff --git a/test/integration/tracked_watermark_buffer_test.cc b/test/integration/tracked_watermark_buffer_test.cc index 9d4eb6110d9ea..f38a2968dcc37 100644 --- a/test/integration/tracked_watermark_buffer_test.cc +++ b/test/integration/tracked_watermark_buffer_test.cc @@ -1,6 +1,14 @@ +#include +#include + +#include "envoy/buffer/buffer.h" + +#include "common/buffer/buffer_impl.h" + #include "test/integration/tracked_watermark_buffer.h" #include "test/mocks/common.h" #include "test/test_common/test_runtime.h" +#include "test/test_common/thread_factory_for_test.h" #include "gtest/gtest.h" @@ -59,29 +67,31 @@ TEST_F(TrackedWatermarkBufferTest, BufferSizes) { auto buffer = factory_.create([]() {}, []() {}, []() {}); buffer->setWatermarks(100); auto buffer2 = factory_.create([]() {}, []() {}, []() {}); - EXPECT_EQ(2, factory_.numBuffersCreated()); EXPECT_EQ(2, factory_.numBuffersActive()); - // Add some bytes to the buffers, and verify max and sum(max). buffer->add("abcde"); buffer2->add("a"); EXPECT_EQ(5, factory_.maxBufferSize()); EXPECT_EQ(6, factory_.sumMaxBufferSizes()); + EXPECT_EQ(6, factory_.totalBytesBuffered()); // Add more bytes and drain the buffer. Verify that max is latched. buffer->add(std::string(1000, 'a')); EXPECT_TRUE(buffer->highWatermarkTriggered()); + EXPECT_EQ(1006, factory_.totalBytesBuffered()); buffer->drain(1005); EXPECT_EQ(0, buffer->length()); EXPECT_FALSE(buffer->highWatermarkTriggered()); EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1006, factory_.sumMaxBufferSizes()); + EXPECT_EQ(1, factory_.totalBytesBuffered()); buffer2->add("a"); EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1007, factory_.sumMaxBufferSizes()); + EXPECT_EQ(2, factory_.totalBytesBuffered()); // Verify cleanup tracking. buffer.reset(); @@ -90,12 +100,101 @@ TEST_F(TrackedWatermarkBufferTest, BufferSizes) { buffer2.reset(); EXPECT_EQ(2, factory_.numBuffersCreated()); EXPECT_EQ(0, factory_.numBuffersActive()); + // Bytes in deleted buffers are removed from the total. + EXPECT_EQ(0, factory_.totalBytesBuffered()); // Max sizes are remembered even after buffers are deleted. EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1007, factory_.sumMaxBufferSizes()); } +TEST_F(TrackedWatermarkBufferTest, WaitUntilTotalBufferedExceeds) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + + auto thread1 = Thread::threadFactoryForTest().createThread([&]() { buffer1->add("a"); }); + auto thread2 = Thread::threadFactoryForTest().createThread([&]() { buffer2->add("b"); }); + auto thread3 = Thread::threadFactoryForTest().createThread([&]() { buffer3->add("c"); }); + + factory_.waitUntilTotalBufferedExceeds(2, std::chrono::milliseconds(10000)); + thread1->join(); + thread2->join(); + thread3->join(); + + EXPECT_EQ(3, factory_.totalBytesBuffered()); + EXPECT_EQ(1, factory_.maxBufferSize()); +} + +TEST_F(TrackedWatermarkBufferTest, TracksNumberOfBuffersActivelyBound) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + BufferMemoryAccountSharedPtr account = std::make_shared(); + ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + + buffer1->bindAccount(account); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); + buffer2->bindAccount(account); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); + buffer3->bindAccount(account); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 3)); + + // Release test access to the account. + account.reset(); + + buffer3.reset(); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); + buffer2.reset(); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); + buffer1.reset(); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); +} + +TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + BufferMemoryAccountSharedPtr account1 = std::make_shared(); + ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + + buffer1->bindAccount(account1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); + buffer2->bindAccount(account1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); + + // Release test access to the account. + account1.reset(); + + buffer3->bindAccount(std::make_shared()); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 3)); + + buffer2.reset(); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 2)); + buffer1.reset(); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); + + buffer3.reset(); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); +} + +TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrueWhenConditionsMet) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + BufferMemoryAccountSharedPtr account1 = std::make_shared(); + BufferMemoryAccountSharedPtr account2 = std::make_shared(); + buffer1->bindAccount(account1); + buffer2->bindAccount(account2); + + factory_.setExpectedAccountBalance(4096, 2); + + buffer1->add("Need to wait on the other buffer to get data."); + EXPECT_FALSE(factory_.waitForExpectedAccountBalanceWithTimeout(std::chrono::seconds(0))); + + buffer2->add("Now we have expected balances!"); + EXPECT_TRUE(factory_.waitForExpectedAccountBalanceWithTimeout(std::chrono::seconds(0))); +} + } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 366170ba33b27..bf8eacb3c5218 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -270,6 +270,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, std::function modify_headers, const absl::optional grpc_status, absl::string_view details)); + MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, account, (), (const)); Buffer::InstancePtr buffer_; std::list callbacks_{}; diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 25e8e14b25441..1a3d4e8bcae67 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -23,6 +23,10 @@ MockStream::MockStream() { })); ON_CALL(*this, connectionLocalAddress()).WillByDefault(ReturnRef(connection_local_address_)); + + ON_CALL(*this, setAccount(_)) + .WillByDefault(Invoke( + [this](Buffer::BufferMemoryAccountSharedPtr account) -> void { account_ = account; })); } MockStream::~MockStream() = default; diff --git a/test/mocks/http/stream.h b/test/mocks/http/stream.h index ce6579f13ad4e..f456ade6fc96b 100644 --- a/test/mocks/http/stream.h +++ b/test/mocks/http/stream.h @@ -21,9 +21,11 @@ class MockStream : public Stream { MOCK_METHOD(uint32_t, bufferLimit, ()); MOCK_METHOD(const Network::Address::InstanceConstSharedPtr&, connectionLocalAddress, ()); MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout)); + MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr)); std::list callbacks_{}; Network::Address::InstanceConstSharedPtr connection_local_address_; + Buffer::BufferMemoryAccountSharedPtr account_; void runHighWatermarkCallbacks() { for (auto* callback : callbacks_) {