From 8c99d4f0d7ae30e57b537e9ba8ce7c829d1a756d Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Tue, 13 Apr 2021 18:01:39 +0000 Subject: [PATCH 01/14] Implemented H2 stream level buffer accounting. Signed-off-by: Kevin Baichoo --- include/envoy/http/codec.h | 13 + include/envoy/http/filter.h | 6 + include/envoy/router/router.h | 6 + source/common/buffer/buffer_impl.cc | 2 + source/common/buffer/buffer_impl.h | 5 + source/common/buffer/watermark_buffer.cc | 5 +- source/common/buffer/watermark_buffer.h | 5 +- source/common/http/async_client_impl.h | 2 + source/common/http/conn_manager_impl.cc | 10 +- source/common/http/conn_manager_impl.h | 3 +- source/common/http/filter_manager.cc | 4 + source/common/http/filter_manager.h | 13 +- source/common/http/http1/codec_impl.h | 8 + source/common/http/http2/codec_impl.cc | 52 ++-- source/common/http/http2/codec_impl.h | 43 +-- source/common/quic/envoy_quic_client_stream.h | 10 + source/common/quic/envoy_quic_server_stream.h | 8 + source/common/router/upstream_request.cc | 3 + .../upstreams/http/http/upstream_request.h | 4 + .../upstreams/http/tcp/upstream_request.h | 1 + .../common/http/conn_manager_impl_test_base.h | 2 +- test/common/http/filter_manager_test.cc | 4 +- test/integration/BUILD | 33 ++- .../buffer_accounting_integration_test.cc | 254 ++++++++++++++++++ .../filters/test_socket_interface.h | 18 ++ .../http2_flood_integration_test.cc | 70 +---- test/integration/socket_interface_swap.cc | 30 +++ test/integration/socket_interface_swap.h | 73 +++++ test/integration/tracked_watermark_buffer.cc | 95 ++++++- test/integration/tracked_watermark_buffer.h | 77 +++++- .../tracked_watermark_buffer_test.cc | 85 +++++- test/mocks/http/mocks.h | 1 + test/mocks/http/stream.cc | 6 + test/mocks/http/stream.h | 3 + 34 files changed, 823 insertions(+), 131 deletions(-) create mode 100644 test/integration/buffer_accounting_integration_test.cc create mode 100644 test/integration/socket_interface_swap.cc create mode 100644 test/integration/socket_interface_swap.h diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 70544787ca44d..6dfea0f3c51b9 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -380,6 +380,19 @@ class Stream { * small window updates as satisfying the idle timeout as this is a potential DoS vector. */ virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE; + + /** + * @return the account associated with this stream. + */ + virtual Buffer::BufferMemoryAccountSharedPtr getAccount() const PURE; + + /** + * Sets the account for this stream, propagating it to all of its buffers. + * This should only be called on client streams since server streams create + * their own account. + * @param the account to assign this stream. + */ + virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE; }; /** diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index f26348cd5e0a3..b1fc1e6ee1354 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -6,6 +6,7 @@ #include #include "envoy/access_log/access_log.h" +#include "envoy/buffer/buffer.h" #include "envoy/common/scope_tracker.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/status.h" @@ -544,6 +545,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { */ virtual uint32_t decoderBufferLimit() PURE; + /** + * @return the account, if any, used by this stream. + */ + virtual Buffer::BufferMemoryAccountSharedPtr account() const PURE; + /** * Takes a stream, and acts as if the headers are newly arrived. * On success, this will result in a creating a new filter chain and likely diff --git a/include/envoy/router/router.h b/include/envoy/router/router.h index 95944beddb2b0..a143cd11490b1 100644 --- a/include/envoy/router/router.h +++ b/include/envoy/router/router.h @@ -1327,6 +1327,12 @@ class GenericUpstream { * @param reason supplies the reset reason. */ virtual void resetStream() PURE; + + /** + * Sets the upstream to use the following account. + * @param the account to assign the generic upstream. + */ + virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE; }; using GenericConnPoolPtr = std::unique_ptr; diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 6d2241171a135..de5e923d2a4ad 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -48,6 +48,8 @@ void OwnedImpl::bindAccount(BufferMemoryAccountSharedPtr account) { account_ = std::move(account); } +BufferMemoryAccountSharedPtr OwnedImpl::getAccountForTest() { return account_; } + void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); } void OwnedImpl::addBufferFragment(BufferFragment& fragment) { diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index a794b0e020514..35d2c16b8079c 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -710,6 +710,11 @@ class OwnedImpl : public LibEventInstance { */ virtual void appendSliceForTest(absl::string_view data); + /** + * @return the BufferMemoryAccount bound to this buffer, if any. + */ + BufferMemoryAccountSharedPtr getAccountForTest(); + // Does not implement watermarking. // TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer // implementations. Also, make high-watermark config a constructor argument. diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index f7d95e5183f4e..eefa6022c8ef0 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -90,8 +90,7 @@ void WatermarkBuffer::appendSliceForTest(absl::string_view data) { appendSliceForTest(data.data(), data.size()); } -void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) { - ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0)); +void WatermarkBuffer::setWatermarks(uint32_t high_watermark) { uint32_t overflow_watermark_multiplier = Runtime::getInteger("envoy.buffer.overflow_multiplier", 0); if (overflow_watermark_multiplier > 0 && @@ -101,7 +100,7 @@ void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_waterm "high_watermark is overflowing. Disabling overflow watermark."); overflow_watermark_multiplier = 0; } - low_watermark_ = low_watermark; + low_watermark_ = high_watermark / 2; high_watermark_ = high_watermark; overflow_watermark_ = overflow_watermark_multiplier * high_watermark; checkHighAndOverflowWatermarks(); diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 9150cdaf54f94..046173d49159b 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -39,8 +39,7 @@ class WatermarkBuffer : public OwnedImpl { void appendSliceForTest(const void* data, uint64_t size) override; void appendSliceForTest(absl::string_view data) override; - void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); } - void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); + void setWatermarks(uint32_t watermark) override; uint32_t highWatermark() const override { return high_watermark_; } // Returns true if the high watermark callbacks have been called more recently // than the low watermark callbacks. @@ -48,7 +47,7 @@ class WatermarkBuffer : public OwnedImpl { protected: virtual void checkHighAndOverflowWatermarks(); - void checkLowWatermark(); + virtual void checkLowWatermark(); private: void commit(uint64_t length, absl::Span slices, diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 5aea06d97056b..7055253f2af29 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -365,6 +365,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; } void clearRouteCache() override {} uint64_t streamId() const override { return stream_id_; } + // TODO(kbaichoo): Implement this? + Buffer::BufferMemoryAccountSharedPtr account() const override { return nullptr; } Tracing::Span& activeSpan() override { return active_span_; } const Tracing::Config& tracingConfig() override { return tracing_config_; } void continueDecoding() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index e85b4b07b3c4e..f2dffbe1746be 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -259,7 +259,8 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod } ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection()); - ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit())); + ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), + response_encoder.getStream().getAccount())); new_stream->state_.is_internally_created_ = is_internally_created; new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); @@ -571,13 +572,14 @@ void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestSrdsUpdate( } ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager, - uint32_t buffer_limit) + uint32_t buffer_limit, + Buffer::BufferMemoryAccountSharedPtr account) : connection_manager_(connection_manager), stream_id_(connection_manager.random_generator_.random()), filter_manager_(*this, connection_manager_.read_callbacks_->connection().dispatcher(), connection_manager_.read_callbacks_->connection(), stream_id_, - connection_manager_.config_.proxy100Continue(), buffer_limit, - connection_manager_.config_.filterFactory(), + std::move(account), connection_manager_.config_.proxy100Continue(), + buffer_limit, connection_manager_.config_.filterFactory(), connection_manager_.config_.localReply(), connection_manager_.codec_->protocol(), connection_manager_.timeSource(), connection_manager_.read_callbacks_->connection().streamInfo().filterState(), diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 033947b6f729d..9b6a6f10825a6 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -156,7 +156,8 @@ class ConnectionManagerImpl : Logger::Loggable, public Tracing::Config, public ScopeTrackedObject, public FilterManagerCallbacks { - ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit); + ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit, + Buffer::BufferMemoryAccountSharedPtr account); void completeRequest(); const Network::Connection* connection(); diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 8101dc2007cff..b1809df50eceb 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1558,5 +1558,9 @@ void ActiveStreamFilterBase::resetStream() { parent_.filter_manager_callbacks_.r uint64_t ActiveStreamFilterBase::streamId() const { return parent_.streamId(); } +Buffer::BufferMemoryAccountSharedPtr ActiveStreamDecoderFilter::account() const { + return parent_.account(); +} + } // namespace Http } // namespace Envoy diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 322e4026adb42..726f5c6332e52 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -2,6 +2,7 @@ #include +#include "envoy/buffer/buffer.h" #include "envoy/common/optref.h" #include "envoy/extensions/filters/common/matcher/action/v3/skip_action.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" @@ -275,6 +276,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) override; Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override; + Buffer::BufferMemoryAccountSharedPtr account() const override; // Each decoder filter instance checks if the request passed to the filter is gRPC // so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders() @@ -638,15 +640,16 @@ class FilterManager : public ScopeTrackedObject, Logger::Loggable { public: FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher, - const Network::Connection& connection, uint64_t stream_id, bool proxy_100_continue, + const Network::Connection& connection, uint64_t stream_id, + Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue, uint32_t buffer_limit, FilterChainFactory& filter_chain_factory, const LocalReply::LocalReply& local_reply, Http::Protocol protocol, TimeSource& time_source, StreamInfo::FilterStateSharedPtr parent_filter_state, StreamInfo::FilterState::LifeSpan filter_state_life_span) : filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher), - connection_(connection), stream_id_(stream_id), proxy_100_continue_(proxy_100_continue), - buffer_limit_(buffer_limit), filter_chain_factory_(filter_chain_factory), - local_reply_(local_reply), + connection_(connection), stream_id_(stream_id), account_(std::move(account)), + proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit), + filter_chain_factory_(filter_chain_factory), local_reply_(local_reply), stream_info_(protocol, time_source, connection.addressProviderSharedPtr(), parent_filter_state, filter_state_life_span) {} ~FilterManager() override { @@ -911,6 +914,7 @@ class FilterManager : public ScopeTrackedObject, const Network::Connection* connection() const { return &connection_; } uint64_t streamId() const { return stream_id_; } + Buffer::BufferMemoryAccountSharedPtr account() const { return account_; } Buffer::InstancePtr& bufferedRequestData() { return buffered_request_data_; } @@ -982,6 +986,7 @@ class FilterManager : public ScopeTrackedObject, Event::Dispatcher& dispatcher_; const Network::Connection& connection_; const uint64_t stream_id_; + Buffer::BufferMemoryAccountSharedPtr account_; const bool proxy_100_continue_; std::list decoder_filters_; diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 031d972a5a08b..ac7d7564a1173 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -68,6 +68,14 @@ class StreamEncoderImpl : public virtual StreamEncoder, // connection, invoking any watermarks as necessary. There is no internal buffering that would // require a flush timeout not already covered by other timeouts. } + Buffer::BufferMemoryAccountSharedPtr getAccount() const override { + // TODO(kbaichoo): implement account tracking for H1. + return nullptr; + } + + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + // TODO(kbaichoo): implement account tracking for H1. + } void setIsResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; } void setIsResponseToConnectRequest(bool value) { is_response_to_connect_request_ = value; } diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 574d9ef9d40c6..bdb8b3892d912 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -130,13 +130,23 @@ template static T* removeConst(const void* object) { } ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : parent_(parent), local_end_stream_sent_(false), remote_end_stream_(false), - data_deferred_(false), received_noninformational_headers_(false), + : parent_(parent), + pending_recv_data_(parent_.connection_.dispatcher().getWatermarkFactory().create( + [this]() -> void { this->pendingRecvBufferLowWatermark(); }, + [this]() -> void { this->pendingRecvBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + pending_send_data_(parent_.connection_.dispatcher().getWatermarkFactory().create( + [this]() -> void { this->pendingSendBufferLowWatermark(); }, + [this]() -> void { this->pendingSendBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false), + received_noninformational_headers_(false), pending_receive_buffer_high_watermark_called_(false), pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) { parent_.stats_.streams_active_.inc(); if (buffer_limit > 0) { - setWriteBufferWatermarks(buffer_limit / 2, buffer_limit); + pending_recv_data_->setWatermarks(buffer_limit); + pending_send_data_->setWatermarks(buffer_limit); } } @@ -145,7 +155,7 @@ ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr void ConnectionImpl::StreamImpl::destroy() { disarmStreamIdleTimer(); parent_.stats_.streams_active_.dec(); - parent_.stats_.pending_send_bytes_.sub(pending_send_data_.length()); + parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); } static void insertHeader(std::vector& headers, const HeaderEntry& header) { @@ -253,7 +263,7 @@ void ConnectionImpl::ServerStreamImpl::encodeHeaders(const ResponseHeaderMap& he void ConnectionImpl::StreamImpl::encodeTrailersBase(const HeaderMap& trailers) { ASSERT(!local_end_stream_); local_end_stream_ = true; - if (pending_send_data_.length() > 0) { + if (pending_send_data_->length() > 0) { // In this case we want trailers to come after we release all pending body data that is // waiting on window updates. We need to save the trailers so that we can emit them later. // However, for empty trailers, we don't need to to save the trailers. @@ -390,7 +400,9 @@ void ConnectionImpl::StreamImpl::submitTrailers(const HeaderMap& trailers) { // Instead of submitting empty trailers, we send empty data instead. Buffer::OwnedImpl empty_buffer; - encodeDataHelper(empty_buffer, /*end_stream=*/true, skip_encoding_empty_trailers); + encodeDataHelper(empty_buffer, + /*end_stream=*/ + true, skip_encoding_empty_trailers); return; } @@ -409,13 +421,13 @@ void ConnectionImpl::StreamImpl::submitMetadata(uint8_t flags) { } ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) { - if (pending_send_data_.length() == 0 && !local_end_stream_) { + if (pending_send_data_->length() == 0 && !local_end_stream_) { ASSERT(!data_deferred_); data_deferred_ = true; return NGHTTP2_ERR_DEFERRED; } else { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; - if (local_end_stream_ && pending_send_data_.length() <= length) { + if (local_end_stream_ && pending_send_data_->length() <= length) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; if (pending_trailers_to_encode_) { // We need to tell the library to not set end stream so that we can emit the trailers. @@ -425,7 +437,7 @@ ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* } } - return std::min(length, pending_send_data_.length()); + return std::min(length, pending_send_data_->length()); } } @@ -446,7 +458,7 @@ void ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t } parent_.stats_.pending_send_bytes_.sub(length); - output.move(pending_send_data_, length); + output.move(*pending_send_data_, length); parent_.connection_.write(output, false); } @@ -491,7 +503,9 @@ void ConnectionImpl::StreamImpl::onPendingFlushTimer() { void ConnectionImpl::StreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { ASSERT(!local_end_stream_); - encodeDataHelper(data, end_stream, /*skip_encoding_empty_trailers=*/false); + encodeDataHelper(data, end_stream, + /*skip_encoding_empty_trailers=*/ + false); } void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool end_stream, @@ -502,7 +516,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e local_end_stream_ = end_stream; parent_.stats_.pending_send_bytes_.add(data.length()); - pending_send_data_.move(data); + pending_send_data_->move(data); if (data_deferred_) { int rc = nghttp2_session_resume_data(parent_.session_, stream_id_); ASSERT(rc == 0); @@ -514,7 +528,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e // Intended to check through coverage that this error case is tested return; } - if (local_end_stream_ && pending_send_data_.length() > 0) { + if (local_end_stream_ && pending_send_data_->length() > 0) { createPendingFlushTimer(); } } @@ -707,7 +721,7 @@ int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) { StreamImpl* stream = getStream(stream_id); // If this results in buffering too much data, the watermark buffer will call // pendingRecvBufferHighWatermark, resulting in ++read_disable_count_ - stream->pending_recv_data_.add(data, len); + stream->pending_recv_data_->add(data, len); // Update the window to the peer unless some consumer of this stream's data has hit a flow control // limit and disabled reads on this stream if (!stream->buffersOverrun()) { @@ -862,10 +876,10 @@ Status ConnectionImpl::onFrameReceived(const nghttp2_frame* frame) { // It's possible that we are waiting to send a deferred reset, so only raise data if local // is not complete. if (!stream->deferred_reset_) { - stream->decoder().decodeData(stream->pending_recv_data_, stream->remote_end_stream_); + stream->decoder().decodeData(*stream->pending_recv_data_, stream->remote_end_stream_); } - stream->pending_recv_data_.drain(stream->pending_recv_data_.length()); + stream->pending_recv_data_->drain(stream->pending_recv_data_->length()); break; } case NGHTTP2_RST_STREAM: { @@ -1530,6 +1544,12 @@ void ConnectionImpl::ServerStreamImpl::dumpState(std::ostream& os, int indent_le } } +void ConnectionImpl::ClientStreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) { + buffer_memory_account_ = account; + pending_recv_data_->bindAccount(buffer_memory_account_); + pending_send_data_->bindAccount(buffer_memory_account_); +} + ClientConnectionImpl::ClientConnectionImpl( Network::Connection& connection, Http::ConnectionCallbacks& callbacks, CodecStats& stats, Random::RandomGenerator& random_generator, diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 1f73d14f423f5..e95ba799bb1b1 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -8,6 +8,7 @@ #include #include +#include "envoy/buffer/buffer.h" #include "envoy/common/random_generator.h" #include "envoy/common/scope_tracker.h" #include "envoy/config/core/v3/protocol.pb.h" @@ -17,6 +18,7 @@ #include "common/buffer/buffer_impl.h" #include "common/buffer/watermark_buffer.h" +#include "common/common/assert.h" #include "common/common/linked_object.h" #include "common/common/logger.h" #include "common/common/statusor.h" @@ -229,7 +231,7 @@ class ConnectionImpl : public virtual Connection, void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacksHelper(callbacks); } void resetStream(StreamResetReason reason) override; void readDisable(bool disable) override; - uint32_t bufferLimit() override { return pending_recv_data_.highWatermark(); } + uint32_t bufferLimit() override { return pending_recv_data_->highWatermark(); } const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override { return parent_.connection_.addressProvider().localAddress(); } @@ -257,11 +259,6 @@ class ConnectionImpl : public virtual Connection, } } - void setWriteBufferWatermarks(uint32_t low_watermark, uint32_t high_watermark) { - pending_recv_data_.setWatermarks(low_watermark, high_watermark); - pending_send_data_.setWatermarks(low_watermark, high_watermark); - } - // If the receive buffer encounters watermark callbacks, enable/disable reads on this stream. void pendingRecvBufferHighWatermark(); void pendingRecvBufferLowWatermark(); @@ -298,14 +295,8 @@ class ConnectionImpl : public virtual Connection, // watermark can never overflow because the peer can never send more bytes than the stream // window without triggering protocol error and this buffer is drained after each DATA frame was // dispatched through the filter chain. See source/docs/flow_control.md for more information. - Buffer::WatermarkBuffer pending_recv_data_{ - [this]() -> void { this->pendingRecvBufferLowWatermark(); }, - [this]() -> void { this->pendingRecvBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }}; - Buffer::WatermarkBuffer pending_send_data_{ - [this]() -> void { this->pendingSendBufferLowWatermark(); }, - [this]() -> void { this->pendingSendBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }}; + Buffer::InstancePtr pending_recv_data_; + Buffer::InstancePtr pending_send_data_; HeaderMapPtr pending_trailers_to_encode_; std::unique_ptr metadata_decoder_; std::unique_ptr metadata_encoder_; @@ -375,6 +366,13 @@ class ConnectionImpl : public virtual Connection, // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; + // Http::Stream + Buffer::BufferMemoryAccountSharedPtr getAccount() const override { + return buffer_memory_account_; + } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override; + + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; ResponseDecoder& response_decoder_; absl::variant headers_or_trailers_; std::string upgrade_type_; @@ -387,7 +385,11 @@ class ConnectionImpl : public virtual Connection, */ struct ServerStreamImpl : public StreamImpl, public ResponseEncoder { ServerStreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()) {} + : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()), + buffer_memory_account_(std::make_shared()) { + pending_recv_data_->bindAccount(buffer_memory_account_); + pending_send_data_->bindAccount(buffer_memory_account_); + } // StreamImpl void submitHeaders(const std::vector& final_headers, @@ -420,8 +422,19 @@ class ConnectionImpl : public virtual Connection, // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; + // Http::Stream + Buffer::BufferMemoryAccountSharedPtr getAccount() const override { + return buffer_memory_account_; + } + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + RELEASE_ASSERT( + false, + "Server Stream creates an account during construction. This should not be called."); + } + RequestDecoder* request_decoder_{}; absl::variant headers_or_trailers_; + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; bool streamErrorOnInvalidHttpMessage() const override { return parent_.stream_error_on_invalid_http_messaging_; diff --git a/source/common/quic/envoy_quic_client_stream.h b/source/common/quic/envoy_quic_client_stream.h index 73b162feabe85..a43a854fa5980 100644 --- a/source/common/quic/envoy_quic_client_stream.h +++ b/source/common/quic/envoy_quic_client_stream.h @@ -1,5 +1,7 @@ #pragma once +#include "envoy/buffer/buffer.h" + #if defined(__GNUC__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -46,6 +48,14 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, // Http::Stream void resetStream(Http::StreamResetReason reason) override; void setFlushTimeout(std::chrono::milliseconds) override {} + Buffer::BufferMemoryAccountSharedPtr getAccount() const override { + // TODO(kbaichoo): implement account tracking for QUIC. + return nullptr; + } + + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + // TODO(kbaichoo): implement account tracking for QUIC. + } // quic::QuicSpdyStream void OnBodyAvailable() override; void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; diff --git a/source/common/quic/envoy_quic_server_stream.h b/source/common/quic/envoy_quic_server_stream.h index e9059861c5e28..b9a09a831f1eb 100644 --- a/source/common/quic/envoy_quic_server_stream.h +++ b/source/common/quic/envoy_quic_server_stream.h @@ -54,6 +54,14 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, void setFlushTimeout(std::chrono::milliseconds) override { // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. } + Buffer::BufferMemoryAccountSharedPtr getAccount() const override { + // TODO(kbaichoo): implement account tracking for QUIC. + return nullptr; + } + + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { + // TODO(kbaichoo): implement account tracking for QUIC. + } // quic::QuicSpdyStream void OnBodyAvailable() override; bool OnStopSending(quic::QuicRstStreamErrorCode error) override; diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 3213f156aa812..355f577a38bc7 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -371,6 +371,9 @@ void UpstreamRequest::onPoolReady( ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks()); upstream_ = std::move(upstream); + // Have the upstream use the account of the downstream. + upstream_->setAccount(parent_.callbacks()->account()); + if (parent_.requestVcluster()) { // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat diff --git a/source/extensions/upstreams/http/http/upstream_request.h b/source/extensions/upstreams/http/http/upstream_request.h index 94f5a0a0fda6f..c36dbb4f1a97a 100644 --- a/source/extensions/upstreams/http/http/upstream_request.h +++ b/source/extensions/upstreams/http/http/upstream_request.h @@ -82,6 +82,10 @@ class HttpUpstream : public Router::GenericUpstream, public Envoy::Http::StreamC request_encoder_->getStream().resetStream(Envoy::Http::StreamResetReason::LocalReset); } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { + request_encoder_->getStream().setAccount(std::move(account)); + } + // Http::StreamCallbacks void onResetStream(Envoy::Http::StreamResetReason reason, absl::string_view transport_failure_reason) override { diff --git a/source/extensions/upstreams/http/tcp/upstream_request.h b/source/extensions/upstreams/http/tcp/upstream_request.h index 3a002f4b7a128..bffc7ee447d1d 100644 --- a/source/extensions/upstreams/http/tcp/upstream_request.h +++ b/source/extensions/upstreams/http/tcp/upstream_request.h @@ -74,6 +74,7 @@ class TcpUpstream : public Router::GenericUpstream, void encodeTrailers(const Envoy::Http::RequestTrailerMap&) override; void readDisable(bool disable) override; void resetStream() override; + void setAccount(Buffer::BufferMemoryAccountSharedPtr) override {} // Tcp::ConnectionPool::UpstreamCallbacks void onUpstreamData(Buffer::Instance& data, bool end_stream) override; diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h index 76f9fc86740ea..841d95c483dbe 100644 --- a/test/common/http/conn_manager_impl_test_base.h +++ b/test/common/http/conn_manager_impl_test_base.h @@ -186,7 +186,7 @@ class HttpConnectionManagerImplTest : public testing::Test, public ConnectionMan std::make_shared>()}; TracingConnectionManagerConfigPtr tracing_config_; SlowDateProviderImpl date_provider_{test_time_.timeSystem()}; - MockStream stream_; + NiceMock stream_; Http::StreamCallbacks* stream_callbacks_{nullptr}; NiceMock cluster_manager_; NiceMock overload_manager_; diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index daf8a95a12ea5..f41f1867eaea1 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -28,8 +28,8 @@ class FilterManagerTest : public testing::Test { public: void initialize() { filter_manager_ = std::make_unique( - filter_manager_callbacks_, dispatcher_, connection_, 0, true, 10000, filter_factory_, - local_reply_, protocol_, time_source_, filter_state_, + filter_manager_callbacks_, dispatcher_, connection_, 0, nullptr, true, 10000, + filter_factory_, local_reply_, protocol_, time_source_, filter_state_, StreamInfo::FilterState::LifeSpan::Connection); } diff --git a/test/integration/BUILD b/test/integration/BUILD index 8ab9096c0c194..4374b4e5d31df 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -319,12 +319,12 @@ envoy_cc_test( deps = [ ":autonomous_upstream_lib", ":http_integration_lib", + ":socket_interface_swap_lib", ":tracked_watermark_buffer_lib", "//test/common/http/http2:http2_frame", "//test/integration/filters:backpressure_filter_config_lib", "//test/integration/filters:set_response_code_filter_config_proto_cc_proto", "//test/integration/filters:set_response_code_filter_lib", - "//test/integration/filters:test_socket_interface_lib", "//test/mocks/http:http_mocks", "//test/test_common:utility_lib", "@com_google_absl//absl/synchronization", @@ -365,6 +365,37 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "buffer_accounting_integration_test", + srcs = [ + "buffer_accounting_integration_test.cc", + ], + deps = [ + ":http_integration_lib", + ":socket_interface_swap_lib", + ":tracked_watermark_buffer_lib", + "//test/mocks/http:http_mocks", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + ], +) + +envoy_cc_test_library( + name = "socket_interface_swap_lib", + srcs = [ + "socket_interface_swap.cc", + ], + hdrs = [ + "socket_interface_swap.h", + ], + deps = [ + "//test/integration/filters:test_socket_interface_lib", + "@com_google_absl//absl/synchronization", + ], +) + envoy_cc_test( name = "http_subset_lb_integration_test", srcs = [ diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc new file mode 100644 index 0000000000000..ca9af0c5f9350 --- /dev/null +++ b/test/integration/buffer_accounting_integration_test.cc @@ -0,0 +1,254 @@ +#include + +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/network/address.h" + +#include "common/buffer/buffer_impl.h" + +#include "test/integration/autonomous_upstream.h" +#include "test/integration/tracked_watermark_buffer.h" +#include "test/integration/utility.h" +#include "test/mocks/http/mocks.h" + +#include "fake_upstream.h" +#include "gtest/gtest.h" +#include "http_integration.h" +#include "integration_stream_decoder.h" +#include "socket_interface_swap.h" + +namespace Envoy { + +class HttpBufferWatermarksTest : public SocketInterfaceSwap, + public testing::TestWithParam, + public HttpIntegrationTest { +public: + struct BufferParams { + const uint32_t connection_watermark; + const uint32_t downstream_h2_stream_window; + const uint32_t downstream_h2_conn_window; + const uint32_t upstream_h2_stream_window; + const uint32_t upstream_h2_conn_window; + }; + + // Configures the buffers with the given parameters. + void initializeWithBufferConfig(const BufferParams& buffer_params, uint32_t num_responses) { + config_helper_.setBufferLimits(buffer_params.connection_watermark, + buffer_params.connection_watermark); + + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { + auto* h2_options = hcm.mutable_http2_protocol_options(); + h2_options->mutable_max_concurrent_streams()->set_value(num_responses); + h2_options->mutable_initial_stream_window_size()->set_value( + buffer_params.downstream_h2_stream_window); + h2_options->mutable_initial_connection_window_size()->set_value( + buffer_params.downstream_h2_conn_window); + }); + + config_helper_.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + ConfigHelper::HttpProtocolOptions protocol_options; + auto* upstream_h2_options = + protocol_options.mutable_explicit_http_config()->mutable_http2_protocol_options(); + upstream_h2_options->mutable_max_concurrent_streams()->set_value(100); + upstream_h2_options->mutable_initial_stream_window_size()->set_value( + buffer_params.upstream_h2_stream_window); + upstream_h2_options->mutable_initial_connection_window_size()->set_value( + buffer_params.upstream_h2_conn_window); + for (auto& cluster_config : *bootstrap.mutable_static_resources()->mutable_clusters()) { + ConfigHelper::setProtocolOptions(cluster_config, protocol_options); + } + }); + + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + + initialize(); + } + + std::vector + sendRequests(uint32_t num_responses, uint32_t request_body_size, uint32_t response_body_size) { + std::vector responses; + + Http::TestRequestHeaderMapImpl header_map{ + {"response_data_blocks", absl::StrCat(1)}, + {"response_size_bytes", absl::StrCat(response_body_size)}, + {"no_trailers", "0"}}; + header_map.copyFrom(default_request_headers_); + header_map.setContentLength(request_body_size); + + for (uint32_t idx = 0; idx < num_responses; ++idx) { + responses.emplace_back(codec_client_->makeRequestWithBody(header_map, request_body_size)); + } + + return responses; + } + + // TODO(kbaichoo): Parameterize on the client codec type when other protocols + // (H1, H3) support buffer accounting. + HttpBufferWatermarksTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, GetParam()) { + setServerBufferFactory(buffer_factory_); + setDownstreamProtocol(Http::CodecClient::Type::HTTP2); + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); + } + +protected: + std::shared_ptr buffer_factory_ = + std::make_shared(); + + std::string printAccounts() { + std::stringstream stream; + auto print_map = + [&stream](Buffer::TrackedWatermarkBufferFactory::AccountToBoundBuffersMap& map) { + stream << "Printing Account map. Size: " << map.size() << '\n'; + for (auto& entry : map) { + stream << " Account: " << entry.first << " Charged Amount: " + << static_cast(entry.first.get())->balance() + << '\n'; + for (auto& buffer : entry.second) { + stream << " Buffer: " << buffer << '\n'; + } + } + }; + + buffer_factory_->inspectAccounts(print_map); + return stream.str(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, HttpBufferWatermarksTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// We should create four buffers each billing the same downstream request's +// account which originated the chain. +TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { + FakeStreamPtr upstream_request1; + FakeStreamPtr upstream_request2; + default_request_headers_.setContentLength(1000); + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends the first request. + auto response1 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request1 = std::move(upstream_request_); + + // Check the expected number of buffers per account + EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 4); + EXPECT_EQ(buffer_factory_->numAccountsActive(), 1); + + // Send the second request. + auto response2 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request2 = std::move(upstream_request_); + + // Check the expected number of buffers per account + EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 8); + EXPECT_EQ(buffer_factory_->numAccountsActive(), 2); + + // Respond to the first request and wait for complete + upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request1->encodeData(1000, true); + ASSERT_TRUE(response1->waitForEndStream()); + ASSERT_TRUE(upstream_request1->complete()); + + // Check the expected number of buffers per account + EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 4); + EXPECT_EQ(buffer_factory_->numAccountsActive(), 1); + + // Respond to the second request and wait for complete + upstream_request2->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request2->encodeData(1000, true); + ASSERT_TRUE(response2->waitForEndStream()); + ASSERT_TRUE(upstream_request2->complete()); + + // Check the expected number of buffers per account + EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 0); + EXPECT_EQ(buffer_factory_->numAccountsActive(), 0); +} + +TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { + const int num_requests = 5; + const uint32_t connection_watermark = 32768; + const uint32_t downstream_h2_stream_window = 512 * 1024; + const uint32_t downstream_h2_conn_window = 64 * 1024; + const uint32_t upstream_h2_stream_window = 64 * 1024; + const uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + + initializeWithBufferConfig({connection_watermark, downstream_h2_stream_window, + downstream_h2_conn_window, upstream_h2_stream_window, + upstream_h2_conn_window}, + num_requests); + + // Makes us have Envoy's writes to upstream return EAGAIN + writev_matcher_->setDestinationPort(fake_upstreams_[0]->localAddress()->ip()->port()); + writev_matcher_->setWritevReturnsEgain(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const uint32_t request_body_size = 4096; + const uint32_t response_body_size = 4096; + auto responses = sendRequests(num_requests, request_body_size, response_body_size); + + // Wait for all requests to have accounted for the requests we've sent. + ASSERT_TRUE(buffer_factory_->waitUntilEachAccountChargedAtleast(request_body_size, num_requests, + TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize() + << " active accounts: " << buffer_factory_->numAccountsActive() + << " active bound buffers: " << buffer_factory_->numBuffersActivelyBound() << printAccounts(); + + writev_matcher_->setResumeWrites(); + + for (auto& response : responses) { + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + } +} + +TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { + const int num_requests = 5; + const uint32_t connection_watermark = 32768; + const uint32_t downstream_h2_stream_window = 512 * 1024; + const uint32_t downstream_h2_conn_window = 64 * 1024; + const uint32_t upstream_h2_stream_window = 64 * 1024; + const uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + + initializeWithBufferConfig({connection_watermark, downstream_h2_stream_window, + downstream_h2_conn_window, upstream_h2_stream_window, + upstream_h2_conn_window}, + num_requests); + writev_matcher_->setSourcePort(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Simulate TCP push back on the Envoy's downstream network socket, so that outbound frames + // start to accumulate in the transport socket buffer. + writev_matcher_->setWritevReturnsEgain(); + + const uint32_t request_body_size = 4096; + const uint32_t response_body_size = 16384; + auto responses = sendRequests(num_requests, request_body_size, response_body_size); + + // Wait for all requests to buffered the response from upstream. + ASSERT_TRUE(buffer_factory_->waitUntilEachAccountChargedAtleast(response_body_size, num_requests, + TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize() + << " active accounts: " << buffer_factory_->numAccountsActive() + << " active bound buffers: " << buffer_factory_->numBuffersActivelyBound() << printAccounts(); + + writev_matcher_->setResumeWrites(); + + // Wait for streams to terminate. + for (auto& response : responses) { + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + } +} + +} // namespace Envoy diff --git a/test/integration/filters/test_socket_interface.h b/test/integration/filters/test_socket_interface.h index e32f8fe4a773d..07980b5a9e59a 100644 --- a/test/integration/filters/test_socket_interface.h +++ b/test/integration/filters/test_socket_interface.h @@ -27,12 +27,30 @@ class TestIoSocketHandle : public IoSocketHandleImpl { bool socket_v6only = false, absl::optional domain = absl::nullopt) : IoSocketHandleImpl(fd, socket_v6only, domain), writev_override_(writev_override_proc) {} + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override { + absl::MutexLock lock(&mutex_); + dispatcher_ = &dispatcher; + IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events); + } + + // Schedule resumption on the IoHandle by posting a callback to the IoHandle's dispatcher. Note + // that this operation is inherently racy, nothing guarantees that the TestIoSocketHandle is not + // deleted before the posted callback executes. + void activateInDispatcherThread(uint32_t events) { + absl::MutexLock lock(&mutex_); + RELEASE_ASSERT(dispatcher_ != nullptr, "null dispatcher"); + dispatcher_->post([this, events]() { activateFileEvents(events); }); + } + private: IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; IoHandlePtr duplicate() override; const WritevOverrideProc writev_override_; + absl::Mutex mutex_; + Event::Dispatcher* dispatcher_ ABSL_GUARDED_BY(mutex_) = nullptr; }; /** diff --git a/test/integration/http2_flood_integration_test.cc b/test/integration/http2_flood_integration_test.cc index 9d900353cb615..29875cbbec408 100644 --- a/test/integration/http2_flood_integration_test.cc +++ b/test/integration/http2_flood_integration_test.cc @@ -11,8 +11,8 @@ #include "common/network/socket_option_impl.h" #include "test/integration/autonomous_upstream.h" -#include "test/integration/filters/test_socket_interface.h" #include "test/integration/http_integration.h" +#include "test/integration/socket_interface_swap.h" #include "test/integration/tracked_watermark_buffer.h" #include "test/integration/utility.h" #include "test/mocks/http/mocks.h" @@ -31,70 +31,6 @@ const uint32_t ControlFrameFloodLimit = 100; const uint32_t AllFrameFloodLimit = 1000; } // namespace -class SocketInterfaceSwap { -public: - // Object of this class hold the state determining the IoHandle which - // should return EAGAIN from the `writev` call. - struct IoHandleMatcher { - bool shouldReturnEgain(uint32_t src_port, uint32_t dst_port) const { - absl::ReaderMutexLock lock(&mutex_); - return writev_returns_egain_ && (src_port == src_port_ || dst_port == dst_port_); - } - - // Source port to match. The port specified should be associated with a listener. - void setSourcePort(uint32_t port) { - absl::WriterMutexLock lock(&mutex_); - src_port_ = port; - } - - // Destination port to match. The port specified should be associated with a listener. - void setDestinationPort(uint32_t port) { - absl::WriterMutexLock lock(&mutex_); - dst_port_ = port; - } - - void setWritevReturnsEgain() { - absl::WriterMutexLock lock(&mutex_); - ASSERT(src_port_ != 0 || dst_port_ != 0); - writev_returns_egain_ = true; - } - - private: - mutable absl::Mutex mutex_; - uint32_t src_port_ ABSL_GUARDED_BY(mutex_) = 0; - uint32_t dst_port_ ABSL_GUARDED_BY(mutex_) = 0; - bool writev_returns_egain_ ABSL_GUARDED_BY(mutex_) = false; - }; - - SocketInterfaceSwap() { - Envoy::Network::SocketInterfaceSingleton::clear(); - test_socket_interface_loader_ = std::make_unique( - std::make_unique( - [writev_matcher = writev_matcher_]( - Envoy::Network::TestIoSocketHandle* io_handle, const Buffer::RawSlice*, - uint64_t) -> absl::optional { - if (writev_matcher->shouldReturnEgain(io_handle->localAddress()->ip()->port(), - io_handle->peerAddress()->ip()->port())) { - return Api::IoCallUint64Result( - 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), - Network::IoSocketError::deleteIoError)); - } - return absl::nullopt; - })); - } - - ~SocketInterfaceSwap() { - test_socket_interface_loader_.reset(); - Envoy::Network::SocketInterfaceSingleton::initialize(previous_socket_interface_); - } - -protected: - Envoy::Network::SocketInterface* const previous_socket_interface_{ - Envoy::Network::SocketInterfaceSingleton::getExisting()}; - std::shared_ptr writev_matcher_{std::make_shared()}; - std::unique_ptr test_socket_interface_loader_; -}; - // It is important that the new socket interface is installed before any I/O activity starts and // the previous one is restored after all I/O activity stops. Since the HttpIntegrationTest // destructor stops Envoy the SocketInterfaceSwap destructor needs to run after it. This order of @@ -396,7 +332,7 @@ TEST_P(Http2FloodMitigationTest, Data) { // The factory will be used to create 4 buffers: the input and output buffers for request and // response pipelines. - EXPECT_EQ(4, buffer_factory->numBuffersCreated()); + EXPECT_EQ(8, buffer_factory->numBuffersCreated()); // Expect at least 1000 1 byte data frames in the output buffer. Each data frame comes with a // 9-byte frame header; 10 bytes per data frame, 10000 bytes total. The output buffer should also @@ -411,7 +347,7 @@ TEST_P(Http2FloodMitigationTest, Data) { EXPECT_GE(22000, buffer_factory->sumMaxBufferSizes()); // Verify that all buffers have watermarks set. EXPECT_THAT(buffer_factory->highWatermarkRange(), - testing::Pair(1024 * 1024 * 1024, 1024 * 1024 * 1024)); + testing::Pair(256 * 1024 * 1024, 1024 * 1024 * 1024)); } // Verify that the server can detect flood triggered by a DATA frame from a decoder filter call diff --git a/test/integration/socket_interface_swap.cc b/test/integration/socket_interface_swap.cc new file mode 100644 index 0000000000000..521153817a256 --- /dev/null +++ b/test/integration/socket_interface_swap.cc @@ -0,0 +1,30 @@ +#include "test/integration/socket_interface_swap.h" + +namespace Envoy { + +SocketInterfaceSwap::SocketInterfaceSwap() { + Envoy::Network::SocketInterfaceSingleton::clear(); + test_socket_interface_loader_ = std::make_unique( + std::make_unique( + [writev_matcher = writev_matcher_](Envoy::Network::TestIoSocketHandle* io_handle, + const Buffer::RawSlice*, + uint64_t) -> absl::optional { + if (writev_matcher->shouldReturnEgain(io_handle)) { + return Api::IoCallUint64Result( + 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)); + } + return absl::nullopt; + })); +} + +void SocketInterfaceSwap::IoHandleMatcher::setResumeWrites() { + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition( + +[](Network::TestIoSocketHandle** matched_iohandle) { return *matched_iohandle != nullptr; }, + &matched_iohandle_)); + writev_returns_egain_ = false; + matched_iohandle_->activateInDispatcherThread(Event::FileReadyType::Write); +} + +} // namespace Envoy diff --git a/test/integration/socket_interface_swap.h b/test/integration/socket_interface_swap.h new file mode 100644 index 0000000000000..77e4fbca342dd --- /dev/null +++ b/test/integration/socket_interface_swap.h @@ -0,0 +1,73 @@ +#pragma once + +#include "common/network/socket_interface.h" + +#include "test/integration/filters/test_socket_interface.h" + +namespace Envoy { + +// Enables control at the socket level to stop and resume writes. +// Useful for tests want to temporarily stop Envoy from draining data. + +class SocketInterfaceSwap { +public: + // Object of this class hold the state determining the IoHandle which + // should return EAGAIN from the `writev` call. + struct IoHandleMatcher { + bool shouldReturnEgain(Envoy::Network::TestIoSocketHandle* io_handle) { + absl::MutexLock lock(&mutex_); + if (writev_returns_egain_ && (io_handle->localAddress()->ip()->port() == src_port_ || + io_handle->peerAddress()->ip()->port() == dst_port_)) { + ASSERT(matched_iohandle_ == nullptr || matched_iohandle_ == io_handle, + "Matched multiple io_handles, expected at most one to match."); + matched_iohandle_ = io_handle; + return true; + } + return false; + } + + // Source port to match. The port specified should be associated with a listener. + void setSourcePort(uint32_t port) { + absl::WriterMutexLock lock(&mutex_); + dst_port_ = 0; + src_port_ = port; + } + + // Destination port to match. The port specified should be associated with a listener. + void setDestinationPort(uint32_t port) { + absl::WriterMutexLock lock(&mutex_); + src_port_ = 0; + dst_port_ = port; + } + + void setWritevReturnsEgain() { + absl::WriterMutexLock lock(&mutex_); + ASSERT(src_port_ != 0 || dst_port_ != 0); + writev_returns_egain_ = true; + } + + void setResumeWrites(); + + private: + mutable absl::Mutex mutex_; + uint32_t src_port_ ABSL_GUARDED_BY(mutex_) = 0; + uint32_t dst_port_ ABSL_GUARDED_BY(mutex_) = 0; + bool writev_returns_egain_ ABSL_GUARDED_BY(mutex_) = false; + Network::TestIoSocketHandle* matched_iohandle_{}; + }; + + SocketInterfaceSwap(); + + ~SocketInterfaceSwap() { + test_socket_interface_loader_.reset(); + Envoy::Network::SocketInterfaceSingleton::initialize(previous_socket_interface_); + } + +protected: + Envoy::Network::SocketInterface* const previous_socket_interface_{ + Envoy::Network::SocketInterfaceSingleton::getExisting()}; + std::shared_ptr writev_matcher_{std::make_shared()}; + std::unique_ptr test_socket_interface_loader_; +}; + +} // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index e67acf088f3ae..e4d9c06881ee8 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -18,18 +18,49 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, return std::make_unique( [this, &buffer_info](uint64_t current_size) { absl::MutexLock lock(&mutex_); + total_buffer_size_ = total_buffer_size_ + current_size - buffer_info.current_size_; if (buffer_info.max_size_ < current_size) { buffer_info.max_size_ = current_size; } + buffer_info.current_size_ = current_size; }, [this, &buffer_info](uint32_t watermark) { absl::MutexLock lock(&mutex_); buffer_info.watermark_ = watermark; }, - [this]() { + [this, &buffer_info](TrackedWatermarkBuffer* buffer) { absl::MutexLock lock(&mutex_); ASSERT(active_buffer_count_ > 0); --active_buffer_count_; + total_buffer_size_ -= buffer_info.current_size_; + buffer_info.current_size_ = 0; + + // Remove bound account tracking. + auto account = buffer->getAccountForTest(); + if (account) { + auto& set = account_infos_[account]; + // Erase buffer, one entry should be removed. + ASSERT(set.erase(buffer) == 1); + ASSERT(actively_bound_buffers_.erase(buffer) == 1); + + // Erase account entry if there are no active bound buffers, and + // there's no other pointers to the account besides the local account + // pointer and within the map. + // + // It's possible for an account to no longer be bound to a buffer in + // the case that the H2 stream completes, but the data hasn't flushed + // at TCP. + if (set.empty() && account.use_count() == 2) { + ASSERT(account_infos_.erase(account) == 1); + } + } + }, + [this](BufferMemoryAccountSharedPtr& account, TrackedWatermarkBuffer* buffer) { + absl::MutexLock lock(&mutex_); + // Buffers should only be bound once. + ASSERT(actively_bound_buffers_.find(buffer) == actively_bound_buffers_.end()); + account_infos_[account].emplace(buffer); + actively_bound_buffers_.emplace(buffer); }, below_low_watermark, above_high_watermark, above_overflow_watermark); } @@ -44,6 +75,11 @@ uint64_t TrackedWatermarkBufferFactory::numBuffersActive() const { return active_buffer_count_; } +uint64_t TrackedWatermarkBufferFactory::totalBufferSize() const { + absl::MutexLock lock(&mutex_); + return total_buffer_size_; +} + uint64_t TrackedWatermarkBufferFactory::maxBufferSize() const { absl::MutexLock lock(&mutex_); uint64_t val = 0; @@ -94,5 +130,62 @@ std::pair TrackedWatermarkBufferFactory::highWatermarkRange( return std::make_pair(min_watermark, max_watermark); } +bool TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds( + uint64_t byte_size, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, byte_size]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + return total_buffer_size_ >= byte_size; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); +} + +void TrackedWatermarkBufferFactory::removeDanglingAccounts() { + auto accounts_it = account_infos_.begin(); + while (accounts_it != account_infos_.end()) { + auto next = std::next(accounts_it); + + // Remove all "dangling" accounts. + if (accounts_it->first.use_count() == 1) { + account_infos_.erase(accounts_it); + } + + accounts_it = next; + } +} + +bool TrackedWatermarkBufferFactory::waitUntilEachAccountChargedAtleast( + uint64_t byte_size, uint32_t expected_num_accounts, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, byte_size, + expected_num_accounts]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + + removeDanglingAccounts(); + if (account_infos_.size() < expected_num_accounts) { + return false; + } + + for (auto& acc : account_infos_) { + if (static_cast(acc.first.get())->balance() < byte_size) { + return false; + } + } + return true; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); +} + +uint64_t TrackedWatermarkBufferFactory::numAccountsActive() { + absl::MutexLock lock(&mutex_); + removeDanglingAccounts(); + return account_infos_.size(); +} + +uint64_t TrackedWatermarkBufferFactory::numBuffersActivelyBound() const { + absl::MutexLock lock(&mutex_); + return actively_bound_buffers_.size(); +} + } // namespace Buffer } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 64d8473dbf104..455b846f01e72 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -1,5 +1,8 @@ #pragma once +#include "envoy/buffer/buffer.h" + +#include "common/buffer/buffer_impl.h" #include "common/buffer/watermark_buffer.h" #include "absl/container/node_hash_map.h" @@ -11,31 +14,44 @@ namespace Buffer { // WatermarkBuffer subclass that hooks into updates to buffer size and buffer high watermark config. class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { public: - TrackedWatermarkBuffer(std::function update_max_size, - std::function update_high_watermark, - std::function on_delete, std::function below_low_watermark, - std::function above_high_watermark, - std::function above_overflow_watermark) + TrackedWatermarkBuffer( + std::function update_size, + std::function update_high_watermark, + std::function on_delete, + std::function on_bind, + std::function below_low_watermark, std::function above_high_watermark, + std::function above_overflow_watermark) : WatermarkBuffer(below_low_watermark, above_high_watermark, above_overflow_watermark), - update_max_size_(update_max_size), update_high_watermark_(update_high_watermark), - on_delete_(on_delete) {} - ~TrackedWatermarkBuffer() override { on_delete_(); } + update_size_(update_size), update_high_watermark_(update_high_watermark), + on_delete_(on_delete), on_bind_(on_bind) {} + ~TrackedWatermarkBuffer() override { on_delete_(this); } void setWatermarks(uint32_t watermark) override { update_high_watermark_(watermark); WatermarkBuffer::setWatermarks(watermark); } + void bindAccount(BufferMemoryAccountSharedPtr account) override { + on_bind_(account, this); + WatermarkBuffer::bindAccount(account); + } + protected: void checkHighAndOverflowWatermarks() override { - update_max_size_(length()); + update_size_(length()); WatermarkBuffer::checkHighAndOverflowWatermarks(); } + void checkLowWatermark() override { + update_size_(length()); + WatermarkBuffer::checkLowWatermark(); + } + private: - std::function update_max_size_; + std::function update_size_; std::function update_high_watermark_; - std::function on_delete_; + std::function on_delete_; + std::function on_bind_; }; // Factory that tracks how the created buffers are used. @@ -52,6 +68,8 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t numBuffersCreated() const; // Number of buffers still in use. uint64_t numBuffersActive() const; + // Total bytes buffered. + uint64_t totalBufferSize() const; // Size of the largest buffer. uint64_t maxBufferSize() const; // Sum of the max size of all known buffers. @@ -60,9 +78,39 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // functionality is disabled. std::pair highWatermarkRange() const; + // Total bytes currently buffered across all known buffers. + uint64_t totalBytesBuffered() const { + absl::MutexLock lock(&mutex_); + return total_buffer_size_; + } + + // Wait until total bytes buffered exceeds the a given size. + bool waitUntilTotalBufferedExceeds(uint64_t byte_size, std::chrono::milliseconds timeout); + + bool waitUntilEachAccountChargedAtleast(uint64_t byte_size, uint32_t expected_num_accounts, + std::chrono::milliseconds timeout); + + // Number of accounts bound to a buffer that's still in use. + uint64_t numAccountsActive(); + // Number of active buffers that had a call to bind. + uint64_t numBuffersActivelyBound() const; + + using AccountToBoundBuffersMap = + absl::flat_hash_map>; + void inspectAccounts(std::function func) { + absl::MutexLock lock(&mutex_); + func(account_infos_); + } + private: + // Remove "dangling" accounts; accounts where the account_info map is the only + // entity still pointing to the account. + void removeDanglingAccounts() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + struct BufferInfo { uint32_t watermark_ = 0; + uint64_t current_size_ = 0; uint64_t max_size_ = 0; }; @@ -71,8 +119,15 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t next_idx_ ABSL_GUARDED_BY(mutex_) = 0; // Number of buffers currently in existence. uint64_t active_buffer_count_ ABSL_GUARDED_BY(mutex_) = 0; + // total bytes buffered across all buffers. + uint64_t total_buffer_size_ ABSL_GUARDED_BY(mutex_) = 0; // Info about the buffer, by buffer idx. absl::node_hash_map buffer_infos_ ABSL_GUARDED_BY(mutex_); + // Map from accounts to buffers bound to that account. + AccountToBoundBuffersMap account_infos_ ABSL_GUARDED_BY(mutex_); + // Set of actively bound buffers. Used for asserting that buffers are bound + // only once. + absl::flat_hash_set actively_bound_buffers_ ABSL_GUARDED_BY(mutex_); }; } // namespace Buffer diff --git a/test/integration/tracked_watermark_buffer_test.cc b/test/integration/tracked_watermark_buffer_test.cc index 9d4eb6110d9ea..2eff1b36ea183 100644 --- a/test/integration/tracked_watermark_buffer_test.cc +++ b/test/integration/tracked_watermark_buffer_test.cc @@ -1,6 +1,13 @@ +#include + +#include "envoy/buffer/buffer.h" + +#include "common/buffer/buffer_impl.h" + #include "test/integration/tracked_watermark_buffer.h" #include "test/mocks/common.h" #include "test/test_common/test_runtime.h" +#include "test/test_common/thread_factory_for_test.h" #include "gtest/gtest.h" @@ -59,29 +66,31 @@ TEST_F(TrackedWatermarkBufferTest, BufferSizes) { auto buffer = factory_.create([]() {}, []() {}, []() {}); buffer->setWatermarks(100); auto buffer2 = factory_.create([]() {}, []() {}, []() {}); - EXPECT_EQ(2, factory_.numBuffersCreated()); EXPECT_EQ(2, factory_.numBuffersActive()); - // Add some bytes to the buffers, and verify max and sum(max). buffer->add("abcde"); buffer2->add("a"); EXPECT_EQ(5, factory_.maxBufferSize()); EXPECT_EQ(6, factory_.sumMaxBufferSizes()); + EXPECT_EQ(6, factory_.totalBytesBuffered()); // Add more bytes and drain the buffer. Verify that max is latched. buffer->add(std::string(1000, 'a')); EXPECT_TRUE(buffer->highWatermarkTriggered()); + EXPECT_EQ(1006, factory_.totalBytesBuffered()); buffer->drain(1005); EXPECT_EQ(0, buffer->length()); EXPECT_FALSE(buffer->highWatermarkTriggered()); EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1006, factory_.sumMaxBufferSizes()); + EXPECT_EQ(1, factory_.totalBytesBuffered()); buffer2->add("a"); EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1007, factory_.sumMaxBufferSizes()); + EXPECT_EQ(2, factory_.totalBytesBuffered()); // Verify cleanup tracking. buffer.reset(); @@ -90,12 +99,84 @@ TEST_F(TrackedWatermarkBufferTest, BufferSizes) { buffer2.reset(); EXPECT_EQ(2, factory_.numBuffersCreated()); EXPECT_EQ(0, factory_.numBuffersActive()); + // Bytes in deleted buffers are removed from the total. + EXPECT_EQ(0, factory_.totalBytesBuffered()); // Max sizes are remembered even after buffers are deleted. EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1007, factory_.sumMaxBufferSizes()); } +TEST_F(TrackedWatermarkBufferTest, WaitUntilTotalBufferedExceeds) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + + auto thread1 = Thread::threadFactoryForTest().createThread([&]() { buffer1->add("a"); }); + auto thread2 = Thread::threadFactoryForTest().createThread([&]() { buffer2->add("b"); }); + auto thread3 = Thread::threadFactoryForTest().createThread([&]() { buffer3->add("c"); }); + + factory_.waitUntilTotalBufferedExceeds(2, std::chrono::milliseconds(10000)); + thread1->join(); + thread2->join(); + thread3->join(); + + EXPECT_EQ(3, factory_.totalBytesBuffered()); + EXPECT_EQ(1, factory_.maxBufferSize()); +} + +TEST_F(TrackedWatermarkBufferTest, TracksNumberOfBuffersActivelyBound) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + BufferMemoryAccountSharedPtr account = std::make_shared(); + ASSERT_EQ(factory_.numBuffersActivelyBound(), 0); + + buffer1->bindAccount(account); + EXPECT_EQ(factory_.numBuffersActivelyBound(), 1); + buffer2->bindAccount(account); + EXPECT_EQ(factory_.numBuffersActivelyBound(), 2); + buffer3->bindAccount(account); + EXPECT_EQ(factory_.numBuffersActivelyBound(), 3); + + // Release test access to the account. + account.reset(); + + buffer3.reset(); + EXPECT_EQ(factory_.numBuffersActivelyBound(), 2); + buffer2.reset(); + EXPECT_EQ(factory_.numBuffersActivelyBound(), 1); + buffer1.reset(); + EXPECT_EQ(factory_.numBuffersActivelyBound(), 0); +} + +TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + BufferMemoryAccountSharedPtr account1 = std::make_shared(); + ASSERT_EQ(factory_.numAccountsActive(), 0); + + buffer1->bindAccount(account1); + EXPECT_EQ(factory_.numAccountsActive(), 1); + buffer2->bindAccount(account1); + EXPECT_EQ(factory_.numAccountsActive(), 1); + + // Release test access to the account. + account1.reset(); + + buffer3->bindAccount(std::make_shared()); + EXPECT_EQ(factory_.numAccountsActive(), 2); + + buffer2.reset(); + EXPECT_EQ(factory_.numAccountsActive(), 2); + buffer1.reset(); + EXPECT_EQ(factory_.numAccountsActive(), 1); + + buffer3.reset(); + EXPECT_EQ(factory_.numAccountsActive(), 0); +} + } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 4a2d9997932cf..fe5b0e6a89d87 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -268,6 +268,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, std::function modify_headers, const absl::optional grpc_status, absl::string_view details)); + MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, account, (), (const)); Buffer::InstancePtr buffer_; std::list callbacks_{}; diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 25e8e14b25441..22dbb411aeade 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -2,6 +2,7 @@ using testing::_; using testing::Invoke; +using testing::Return; using testing::ReturnRef; namespace Envoy { @@ -23,6 +24,11 @@ MockStream::MockStream() { })); ON_CALL(*this, connectionLocalAddress()).WillByDefault(ReturnRef(connection_local_address_)); + + ON_CALL(*this, getAccount()).WillByDefault(Return(account_)); + ON_CALL(*this, setAccount(_)) + .WillByDefault(Invoke( + [this](Buffer::BufferMemoryAccountSharedPtr account) -> void { account_ = account; })); } MockStream::~MockStream() = default; diff --git a/test/mocks/http/stream.h b/test/mocks/http/stream.h index b155af4a121db..5822af169ce51 100644 --- a/test/mocks/http/stream.h +++ b/test/mocks/http/stream.h @@ -21,9 +21,12 @@ class MockStream : public Stream { MOCK_METHOD(uint32_t, bufferLimit, ()); MOCK_METHOD(const Network::Address::InstanceConstSharedPtr&, connectionLocalAddress, ()); MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout)); + MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, getAccount, (), (const)); + MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr)); std::list callbacks_{}; Network::Address::InstanceConstSharedPtr connection_local_address_; + Buffer::BufferMemoryAccountSharedPtr account_; void runHighWatermarkCallbacks() { for (auto* callback : callbacks_) { From 44a5e05d8a7d67b22a3bf70c8953627998812c0e Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Fri, 30 Apr 2021 13:14:27 +0000 Subject: [PATCH 02/14] Simplified stream API, H2 codec impl. Reverted setWatermark as change was unnecessary. Fixed potential race condition in checking balances. Signed-off-by: Kevin Baichoo --- include/envoy/http/codec.h | 5 -- source/common/buffer/watermark_buffer.cc | 5 +- source/common/buffer/watermark_buffer.h | 3 +- source/common/http/conn_manager_impl.cc | 8 +++- source/common/http/http1/codec_impl.h | 4 -- source/common/http/http2/codec_impl.cc | 6 +-- source/common/http/http2/codec_impl.h | 25 ++-------- source/common/quic/envoy_quic_client_stream.h | 4 -- source/common/quic/envoy_quic_server_stream.h | 4 -- .../buffer_accounting_integration_test.cc | 23 +++++----- test/integration/tracked_watermark_buffer.cc | 46 ++++++++++++------- test/integration/tracked_watermark_buffer.h | 28 ++++++++++- test/mocks/http/stream.cc | 1 - test/mocks/http/stream.h | 1 - 14 files changed, 85 insertions(+), 78 deletions(-) diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 6dfea0f3c51b9..00d243f185d6a 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -381,11 +381,6 @@ class Stream { */ virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE; - /** - * @return the account associated with this stream. - */ - virtual Buffer::BufferMemoryAccountSharedPtr getAccount() const PURE; - /** * Sets the account for this stream, propagating it to all of its buffers. * This should only be called on client streams since server streams create diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index eefa6022c8ef0..f7d95e5183f4e 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -90,7 +90,8 @@ void WatermarkBuffer::appendSliceForTest(absl::string_view data) { appendSliceForTest(data.data(), data.size()); } -void WatermarkBuffer::setWatermarks(uint32_t high_watermark) { +void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) { + ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0)); uint32_t overflow_watermark_multiplier = Runtime::getInteger("envoy.buffer.overflow_multiplier", 0); if (overflow_watermark_multiplier > 0 && @@ -100,7 +101,7 @@ void WatermarkBuffer::setWatermarks(uint32_t high_watermark) { "high_watermark is overflowing. Disabling overflow watermark."); overflow_watermark_multiplier = 0; } - low_watermark_ = high_watermark / 2; + low_watermark_ = low_watermark; high_watermark_ = high_watermark; overflow_watermark_ = overflow_watermark_multiplier * high_watermark; checkHighAndOverflowWatermarks(); diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 046173d49159b..7405928034e88 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -39,7 +39,8 @@ class WatermarkBuffer : public OwnedImpl { void appendSliceForTest(const void* data, uint64_t size) override; void appendSliceForTest(absl::string_view data) override; - void setWatermarks(uint32_t watermark) override; + void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); } + void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); uint32_t highWatermark() const override { return high_watermark_; } // Returns true if the high watermark callbacks have been called more recently // than the low watermark callbacks. diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index f2dffbe1746be..74290714e894d 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -259,8 +259,14 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod } ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection()); + + // Set the account to start accounting. + Buffer::BufferMemoryAccountSharedPtr downstream_request_account = + std::make_shared(); + response_encoder.getStream().setAccount(downstream_request_account); + ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), - response_encoder.getStream().getAccount())); + std::move(downstream_request_account))); new_stream->state_.is_internally_created_ = is_internally_created; new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index ac7d7564a1173..4cfd38b5b1ec9 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -68,10 +68,6 @@ class StreamEncoderImpl : public virtual StreamEncoder, // connection, invoking any watermarks as necessary. There is no internal buffering that would // require a flush timeout not already covered by other timeouts. } - Buffer::BufferMemoryAccountSharedPtr getAccount() const override { - // TODO(kbaichoo): implement account tracking for H1. - return nullptr; - } void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { // TODO(kbaichoo): implement account tracking for H1. diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index bdb8b3892d912..96c6889665ed8 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -400,9 +400,7 @@ void ConnectionImpl::StreamImpl::submitTrailers(const HeaderMap& trailers) { // Instead of submitting empty trailers, we send empty data instead. Buffer::OwnedImpl empty_buffer; - encodeDataHelper(empty_buffer, - /*end_stream=*/ - true, skip_encoding_empty_trailers); + encodeDataHelper(empty_buffer, /*end_stream=*/true, skip_encoding_empty_trailers); return; } @@ -1544,7 +1542,7 @@ void ConnectionImpl::ServerStreamImpl::dumpState(std::ostream& os, int indent_le } } -void ConnectionImpl::ClientStreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) { +void ConnectionImpl::StreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) { buffer_memory_account_ = account; pending_recv_data_->bindAccount(buffer_memory_account_); pending_send_data_->bindAccount(buffer_memory_account_); diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index e95ba799bb1b1..d8f28be2a7bb2 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -239,6 +239,7 @@ class ConnectionImpl : public virtual Connection, void setFlushTimeout(std::chrono::milliseconds timeout) override { stream_idle_timeout_ = timeout; } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override; // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; @@ -290,6 +291,7 @@ class ConnectionImpl : public virtual Connection, uint32_t unconsumed_bytes_{0}; uint32_t read_disable_count_{0}; + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; // Note that in current implementation the watermark callbacks of the pending_recv_data_ are // never called. The watermark value is set to the size of the stream window. As a result this // watermark can never overflow because the peer can never send more bytes than the stream @@ -366,13 +368,6 @@ class ConnectionImpl : public virtual Connection, // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; - // Http::Stream - Buffer::BufferMemoryAccountSharedPtr getAccount() const override { - return buffer_memory_account_; - } - void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override; - - Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; ResponseDecoder& response_decoder_; absl::variant headers_or_trailers_; std::string upgrade_type_; @@ -385,10 +380,7 @@ class ConnectionImpl : public virtual Connection, */ struct ServerStreamImpl : public StreamImpl, public ResponseEncoder { ServerStreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()), - buffer_memory_account_(std::make_shared()) { - pending_recv_data_->bindAccount(buffer_memory_account_); - pending_send_data_->bindAccount(buffer_memory_account_); + : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()) { } // StreamImpl @@ -422,19 +414,8 @@ class ConnectionImpl : public virtual Connection, // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; - // Http::Stream - Buffer::BufferMemoryAccountSharedPtr getAccount() const override { - return buffer_memory_account_; - } - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - RELEASE_ASSERT( - false, - "Server Stream creates an account during construction. This should not be called."); - } - RequestDecoder* request_decoder_{}; absl::variant headers_or_trailers_; - Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; bool streamErrorOnInvalidHttpMessage() const override { return parent_.stream_error_on_invalid_http_messaging_; diff --git a/source/common/quic/envoy_quic_client_stream.h b/source/common/quic/envoy_quic_client_stream.h index a43a854fa5980..894ec1dbf030c 100644 --- a/source/common/quic/envoy_quic_client_stream.h +++ b/source/common/quic/envoy_quic_client_stream.h @@ -48,10 +48,6 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, // Http::Stream void resetStream(Http::StreamResetReason reason) override; void setFlushTimeout(std::chrono::milliseconds) override {} - Buffer::BufferMemoryAccountSharedPtr getAccount() const override { - // TODO(kbaichoo): implement account tracking for QUIC. - return nullptr; - } void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { // TODO(kbaichoo): implement account tracking for QUIC. diff --git a/source/common/quic/envoy_quic_server_stream.h b/source/common/quic/envoy_quic_server_stream.h index b9a09a831f1eb..f9239ba14ba97 100644 --- a/source/common/quic/envoy_quic_server_stream.h +++ b/source/common/quic/envoy_quic_server_stream.h @@ -54,10 +54,6 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, void setFlushTimeout(std::chrono::milliseconds) override { // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. } - Buffer::BufferMemoryAccountSharedPtr getAccount() const override { - // TODO(kbaichoo): implement account tracking for QUIC. - return nullptr; - } void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { // TODO(kbaichoo): implement account tracking for QUIC. diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index ca9af0c5f9350..6d518a9ca3ea9 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -105,9 +105,8 @@ class HttpBufferWatermarksTest : public SocketInterfaceSwap, [&stream](Buffer::TrackedWatermarkBufferFactory::AccountToBoundBuffersMap& map) { stream << "Printing Account map. Size: " << map.size() << '\n'; for (auto& entry : map) { - stream << " Account: " << entry.first << " Charged Amount: " - << static_cast(entry.first.get())->balance() - << '\n'; + // We can't access the accounts balance in a thread safe way here. + stream << " Account: " << entry.first << '\n'; for (auto& buffer : entry.second) { stream << " Buffer: " << buffer << '\n'; } @@ -179,11 +178,14 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { const uint32_t downstream_h2_conn_window = 64 * 1024; const uint32_t upstream_h2_stream_window = 64 * 1024; const uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + const uint32_t request_body_size = 4096; + const uint32_t response_body_size = 4096; initializeWithBufferConfig({connection_watermark, downstream_h2_stream_window, downstream_h2_conn_window, upstream_h2_stream_window, upstream_h2_conn_window}, num_requests); + buffer_factory_->setExpectedAccountBalance(request_body_size, num_requests); // Makes us have Envoy's writes to upstream return EAGAIN writev_matcher_->setDestinationPort(fake_upstreams_[0]->localAddress()->ip()->port()); @@ -191,13 +193,11 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { codec_client_ = makeHttpConnection(lookupPort("http")); - const uint32_t request_body_size = 4096; - const uint32_t response_body_size = 4096; auto responses = sendRequests(num_requests, request_body_size, response_body_size); // Wait for all requests to have accounted for the requests we've sent. - ASSERT_TRUE(buffer_factory_->waitUntilEachAccountChargedAtleast(request_body_size, num_requests, - TestUtility::DefaultTimeout)) + ASSERT_TRUE( + buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) << "buffer total: " << buffer_factory_->totalBufferSize() << " buffer max: " << buffer_factory_->maxBufferSize() << " active accounts: " << buffer_factory_->numAccountsActive() @@ -218,11 +218,14 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { const uint32_t downstream_h2_conn_window = 64 * 1024; const uint32_t upstream_h2_stream_window = 64 * 1024; const uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + const uint32_t request_body_size = 4096; + const uint32_t response_body_size = 16384; initializeWithBufferConfig({connection_watermark, downstream_h2_stream_window, downstream_h2_conn_window, upstream_h2_stream_window, upstream_h2_conn_window}, num_requests); + buffer_factory_->setExpectedAccountBalance(response_body_size, num_requests); writev_matcher_->setSourcePort(lookupPort("http")); codec_client_ = makeHttpConnection(lookupPort("http")); @@ -230,13 +233,11 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { // start to accumulate in the transport socket buffer. writev_matcher_->setWritevReturnsEgain(); - const uint32_t request_body_size = 4096; - const uint32_t response_body_size = 16384; auto responses = sendRequests(num_requests, request_body_size, response_body_size); // Wait for all requests to buffered the response from upstream. - ASSERT_TRUE(buffer_factory_->waitUntilEachAccountChargedAtleast(response_body_size, num_requests, - TestUtility::DefaultTimeout)) + ASSERT_TRUE( + buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) << "buffer total: " << buffer_factory_->totalBufferSize() << " buffer max: " << buffer_factory_->maxBufferSize() << " active accounts: " << buffer_factory_->numAccountsActive() diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index e4d9c06881ee8..1265ff88148ef 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -23,6 +23,8 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, buffer_info.max_size_ = current_size; } buffer_info.current_size_ = current_size; + + checkIfExpectedBalancesMet(); }, [this, &buffer_info](uint32_t watermark) { absl::MutexLock lock(&mutex_); @@ -154,26 +156,38 @@ void TrackedWatermarkBufferFactory::removeDanglingAccounts() { } } -bool TrackedWatermarkBufferFactory::waitUntilEachAccountChargedAtleast( - uint64_t byte_size, uint32_t expected_num_accounts, std::chrono::milliseconds timeout) { +void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size, + uint32_t num_accounts) { absl::MutexLock lock(&mutex_); - auto predicate = [this, byte_size, - expected_num_accounts]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { - mutex_.AssertHeld(); + ASSERT(!expected_balances_.has_value()); + expected_balances_.emplace(byte_size, num_accounts); +} - removeDanglingAccounts(); - if (account_infos_.size() < expected_num_accounts) { - return false; - } +bool TrackedWatermarkBufferFactory::waitForExpectedAccountBalanceWithTimeout( + std::chrono::milliseconds timeout) { + return expected_balances_met_.WaitForNotificationWithTimeout(absl::FromChrono(timeout)); +} - for (auto& acc : account_infos_) { - if (static_cast(acc.first.get())->balance() < byte_size) { - return false; - } +void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { + if (!expected_balances_ || expected_balances_met_.HasBeenNotified()) { + return; + } + + removeDanglingAccounts(); + if (account_infos_.size() < expected_balances_->num_accounts_) { + return; + } + + // This is thread safe since this function should run on the only Envoy worker + // thread. + for (auto& acc : account_infos_) { + if (static_cast(acc.first.get())->balance() < + expected_balances_->balance_per_account_) { + return; } - return true; - }; - return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); + } + + expected_balances_met_.Notify(); } uint64_t TrackedWatermarkBufferFactory::numAccountsActive() { diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 455b846f01e72..67521f933152c 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -7,6 +7,7 @@ #include "absl/container/node_hash_map.h" #include "absl/synchronization/mutex.h" +#include "absl/synchronization/notification.h" namespace Envoy { namespace Buffer { @@ -87,8 +88,15 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // Wait until total bytes buffered exceeds the a given size. bool waitUntilTotalBufferedExceeds(uint64_t byte_size, std::chrono::milliseconds timeout); - bool waitUntilEachAccountChargedAtleast(uint64_t byte_size, uint32_t expected_num_accounts, - std::chrono::milliseconds timeout); + // Set the expected account balance, prior to sending requests. + // The test thread can then wait for this condition to be true. + // This is separated so that the test thread can spin up requests however it + // desires in between. + // + // The Envoy worker thread will notify the test thread once the condition is + // met. + void setExpectedAccountBalance(uint64_t byte_size, uint32_t num_accounts); + bool waitForExpectedAccountBalanceWithTimeout(std::chrono::milliseconds timeout); // Number of accounts bound to a buffer that's still in use. uint64_t numAccountsActive(); @@ -107,6 +115,9 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // Remove "dangling" accounts; accounts where the account_info map is the only // entity still pointing to the account. void removeDanglingAccounts() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + // Should only be executed on the Envoy's worker thread. Otherwise, we have a + // possible race condition. + void checkIfExpectedBalancesMet() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); struct BufferInfo { uint32_t watermark_ = 0; @@ -114,6 +125,14 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t max_size_ = 0; }; + struct AccountBalanceExpectations { + AccountBalanceExpectations(uint64_t balance_per_account, uint32_t num_accounts) + : balance_per_account_(balance_per_account), num_accounts_(num_accounts) {} + + uint64_t balance_per_account_ = 0; + uint32_t num_accounts_ = 0; + }; + mutable absl::Mutex mutex_; // Id of the next buffer to create. uint64_t next_idx_ ABSL_GUARDED_BY(mutex_) = 0; @@ -123,6 +142,11 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t total_buffer_size_ ABSL_GUARDED_BY(mutex_) = 0; // Info about the buffer, by buffer idx. absl::node_hash_map buffer_infos_ ABSL_GUARDED_BY(mutex_); + // The expected balances for the accounts. If set, when a buffer updates its + // size, it also checks whether the expected_balances has been satisfied, and + // notifies waiters of expected_balances_met_. + absl::optional expected_balances_ ABSL_GUARDED_BY(mutex_); + absl::Notification expected_balances_met_; // Map from accounts to buffers bound to that account. AccountToBoundBuffersMap account_infos_ ABSL_GUARDED_BY(mutex_); // Set of actively bound buffers. Used for asserting that buffers are bound diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 22dbb411aeade..4d29cb61787b2 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -25,7 +25,6 @@ MockStream::MockStream() { ON_CALL(*this, connectionLocalAddress()).WillByDefault(ReturnRef(connection_local_address_)); - ON_CALL(*this, getAccount()).WillByDefault(Return(account_)); ON_CALL(*this, setAccount(_)) .WillByDefault(Invoke( [this](Buffer::BufferMemoryAccountSharedPtr account) -> void { account_ = account; })); diff --git a/test/mocks/http/stream.h b/test/mocks/http/stream.h index 5822af169ce51..6fada557adef5 100644 --- a/test/mocks/http/stream.h +++ b/test/mocks/http/stream.h @@ -21,7 +21,6 @@ class MockStream : public Stream { MOCK_METHOD(uint32_t, bufferLimit, ()); MOCK_METHOD(const Network::Address::InstanceConstSharedPtr&, connectionLocalAddress, ()); MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout)); - MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, getAccount, (), (const)); MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr)); std::list callbacks_{}; From 6956e95150895e807760ee44066f830e7b642127 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Fri, 30 Apr 2021 20:55:48 +0000 Subject: [PATCH 03/14] Added test for Set and Wait for Expected Account Balance. Signed-off-by: Kevin Baichoo --- source/common/http/http2/codec_impl.h | 3 +-- .../tracked_watermark_buffer_test.cc | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index d8f28be2a7bb2..fa253a8b4677c 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -380,8 +380,7 @@ class ConnectionImpl : public virtual Connection, */ struct ServerStreamImpl : public StreamImpl, public ResponseEncoder { ServerStreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()) { - } + : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()) {} // StreamImpl void submitHeaders(const std::vector& final_headers, diff --git a/test/integration/tracked_watermark_buffer_test.cc b/test/integration/tracked_watermark_buffer_test.cc index 2eff1b36ea183..73036d662ed96 100644 --- a/test/integration/tracked_watermark_buffer_test.cc +++ b/test/integration/tracked_watermark_buffer_test.cc @@ -1,3 +1,4 @@ +#include #include #include "envoy/buffer/buffer.h" @@ -177,6 +178,23 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { EXPECT_EQ(factory_.numAccountsActive(), 0); } +TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrueWhenConditionsMet) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + BufferMemoryAccountSharedPtr account1 = std::make_shared(); + BufferMemoryAccountSharedPtr account2 = std::make_shared(); + buffer1->bindAccount(account1); + buffer2->bindAccount(account2); + + factory_.setExpectedAccountBalance(4096, 2); + + buffer1->add("Need to wait on the other buffer to get data."); + EXPECT_FALSE(factory_.waitForExpectedAccountBalanceWithTimeout(std::chrono::seconds(0))); + + buffer2->add("Now we have expected balances!"); + EXPECT_TRUE(factory_.waitForExpectedAccountBalanceWithTimeout(std::chrono::seconds(0))); +} + } // namespace } // namespace Buffer } // namespace Envoy From dbc66ad548e254661ca33499fe3e17b4827eac6c Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Mon, 3 May 2021 13:36:12 +0000 Subject: [PATCH 04/14] Fixing broken tests. Signed-off-by: Kevin Baichoo --- include/envoy/buffer/buffer.h | 10 ++++++++++ source/common/buffer/buffer_impl.h | 1 + source/common/buffer/watermark_buffer.h | 2 +- source/common/http/http2/codec_impl.cc | 3 +-- source/common/http/http2/codec_impl.h | 5 +++++ test/common/buffer/buffer_fuzz.cc | 6 ++++++ test/common/http/http2/codec_impl_test.cc | 6 +++--- 7 files changed, 27 insertions(+), 6 deletions(-) diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index d00130d48d901..8b283cd754703 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -449,6 +449,16 @@ class Instance { * @param watermark supplies the buffer high watermark size threshold, in bytes. */ virtual void setWatermarks(uint32_t watermark) PURE; + + /** + * Set the buffer's low and high watermark. + * The low_watermark should be < high_watermark if these are set. Set both + * watermarks to 0 to disable watermark functionality. + * @param low_watermark supplies the buffer low watermark size threshold, in bytes. + * @param high_watermark supplies the buffer high watermark size threshold, in bytes. + */ + virtual void setWatermarks(uint32_t low_watermark, uint32_t high_watermark) PURE; + /** * Returns the configured high watermark. A return value of 0 indicates that watermark * functionality is disabled. diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 35d2c16b8079c..50965b43dc651 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -719,6 +719,7 @@ class OwnedImpl : public LibEventInstance { // TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer // implementations. Also, make high-watermark config a constructor argument. void setWatermarks(uint32_t) override { ASSERT(false, "watermarks not implemented."); } + void setWatermarks(uint32_t, uint32_t) override { ASSERT(false, "watermarks not implemented."); } uint32_t highWatermark() const override { return 0; } bool highWatermarkTriggered() const override { return false; } diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 7405928034e88..c110eea761228 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -40,7 +40,7 @@ class WatermarkBuffer : public OwnedImpl { void appendSliceForTest(absl::string_view data) override; void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); } - void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); + void setWatermarks(uint32_t low_watermark, uint32_t high_watermark) override; uint32_t highWatermark() const override { return high_watermark_; } // Returns true if the high watermark callbacks have been called more recently // than the low watermark callbacks. diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 96c6889665ed8..1f26fbe1c43e5 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -145,8 +145,7 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) { parent_.stats_.streams_active_.inc(); if (buffer_limit > 0) { - pending_recv_data_->setWatermarks(buffer_limit); - pending_send_data_->setWatermarks(buffer_limit); + setWriteBufferWatermarks(buffer_limit / 2, buffer_limit); } } diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index fa253a8b4677c..4411728cf7363 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -260,6 +260,11 @@ class ConnectionImpl : public virtual Connection, } } + void setWriteBufferWatermarks(uint32_t low_watermark, uint32_t high_watermark) { + pending_recv_data_->setWatermarks(low_watermark, high_watermark); + pending_send_data_->setWatermarks(low_watermark, high_watermark); + } + // If the receive buffer encounters watermark callbacks, enable/disable reads on this stream. void pendingRecvBufferHighWatermark(); void pendingRecvBufferLowWatermark(); diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 493f31b8b31db..231e8025bcd23 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -193,6 +193,12 @@ class StringBuffer : public Buffer::Instance { // WatermarkBuffer implementations. ASSERT(false); } + void setWatermarks(uint32_t, uint32_t) override { + // Not implemented. + // TODO(antoniovicente) Implement and add fuzz coverage as we merge the Buffer::OwnedImpl and + // WatermarkBuffer implementations. + ASSERT(false); + } uint32_t highWatermark() const override { return 0; } bool highWatermarkTriggered() const override { return false; } diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index f6327caaa71bf..762daeb7043ae 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -1278,7 +1278,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { // Now that the flow control window is full, further data causes the send buffer to back up. Buffer::OwnedImpl more_long_data(std::string(initial_stream_window, 'a')); request_encoder_->encodeData(more_long_data, false); - EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(initial_stream_window, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); EXPECT_EQ(initial_stream_window, server_->getStream(1)->unconsumed_bytes_); @@ -1287,7 +1287,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); Buffer::OwnedImpl last_byte("!"); request_encoder_->encodeData(last_byte, false); - EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(initial_stream_window + 1, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); @@ -1332,7 +1332,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks2, onBelowWriteBufferLowWatermark()).Times(0); EXPECT_CALL(callbacks3, onBelowWriteBufferLowWatermark()); server_->getStream(1)->readDisable(false); - EXPECT_EQ(0, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(0, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); // The extra 1 byte sent won't trigger another window update, so the final window should be the // initial window minus the last 1 byte flush from the client to server. From 8abaa721070d4b394858795e2bd9768ebfaeab19 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Mon, 3 May 2021 16:14:35 +0000 Subject: [PATCH 05/14] Fixed broken extension test. Signed-off-by: Kevin Baichoo --- .../filters/network/postgres_proxy/postgres_decoder_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc index 9170a69e84844..f2c8a5737eaab 100644 --- a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc +++ b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc @@ -562,6 +562,7 @@ class FakeBuffer : public Buffer::Instance { MOCK_METHOD(bool, startsWith, (absl::string_view), (const, override)); MOCK_METHOD(std::string, toString, (), (const, override)); MOCK_METHOD(void, setWatermarks, (uint32_t), (override)); + MOCK_METHOD(void, setWatermarks, (uint32_t, uint32_t), (override)); MOCK_METHOD(uint32_t, highWatermark, (), (const, override)); MOCK_METHOD(bool, highWatermarkTriggered, (), (const, override)); }; From f7ca7dd3b913c88676efbd510e2885799c9c0a1a Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Tue, 4 May 2021 14:03:25 +0000 Subject: [PATCH 06/14] Moved location of method. Signed-off-by: Kevin Baichoo --- source/common/http/http2/codec_impl.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 1f26fbe1c43e5..ae7e32a1a664d 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -587,6 +587,12 @@ void ConnectionImpl::StreamImpl::onMetadataDecoded(MetadataMapPtr&& metadata_map } } +void ConnectionImpl::StreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) { + buffer_memory_account_ = account; + pending_recv_data_->bindAccount(buffer_memory_account_); + pending_send_data_->bindAccount(buffer_memory_account_); +} + ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stats, Random::RandomGenerator& random_generator, const envoy::config::core::v3::Http2ProtocolOptions& http2_options, @@ -1541,12 +1547,6 @@ void ConnectionImpl::ServerStreamImpl::dumpState(std::ostream& os, int indent_le } } -void ConnectionImpl::StreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) { - buffer_memory_account_ = account; - pending_recv_data_->bindAccount(buffer_memory_account_); - pending_send_data_->bindAccount(buffer_memory_account_); -} - ClientConnectionImpl::ClientConnectionImpl( Network::Connection& connection, Http::ConnectionCallbacks& callbacks, CodecStats& stats, Random::RandomGenerator& random_generator, From 0295ac9ea36551a6da780f3c044f3fd84a9575ba Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Thu, 6 May 2021 22:17:09 +0000 Subject: [PATCH 07/14] Use Release Assert in test code with side effects (i.e. erase). Signed-off-by: Kevin Baichoo --- test/integration/tracked_watermark_buffer.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 1265ff88148ef..0862dc6a8360a 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -1,5 +1,7 @@ #include "test/integration/tracked_watermark_buffer.h" +#include "common/common/assert.h" + namespace Envoy { namespace Buffer { @@ -41,10 +43,9 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, auto account = buffer->getAccountForTest(); if (account) { auto& set = account_infos_[account]; - // Erase buffer, one entry should be removed. - ASSERT(set.erase(buffer) == 1); - ASSERT(actively_bound_buffers_.erase(buffer) == 1); - + RELEASE_ASSERT(set.erase(buffer) == 1, "Expected to remove buffer from account_infos."); + RELEASE_ASSERT(actively_bound_buffers_.erase(buffer) == 1, + "Did not find buffer in actively_bound_buffers_."); // Erase account entry if there are no active bound buffers, and // there's no other pointers to the account besides the local account // pointer and within the map. @@ -53,7 +54,8 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, // the case that the H2 stream completes, but the data hasn't flushed // at TCP. if (set.empty() && account.use_count() == 2) { - ASSERT(account_infos_.erase(account) == 1); + RELEASE_ASSERT(account_infos_.erase(account) == 1, + "Expected to remove account from account_infos."); } } }, From 1644e660eaba8a53a7ae2fd42d4b001f7d3682ba Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Fri, 7 May 2021 20:08:07 +0000 Subject: [PATCH 08/14] Minor Fixes. Signed-off-by: Kevin Baichoo --- include/envoy/buffer/buffer.h | 3 +- source/common/buffer/buffer_impl.cc | 2 -- test/integration/BUILD | 1 + .../buffer_accounting_integration_test.cc | 24 ++++++---------- test/integration/tracked_watermark_buffer.cc | 23 +++++++-------- test/integration/tracked_watermark_buffer.h | 21 ++++++++++---- .../tracked_watermark_buffer_test.cc | 28 +++++++++---------- test/mocks/http/stream.cc | 1 - 8 files changed, 52 insertions(+), 51 deletions(-) diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index 88b1aed0b7734..2ac7674e37668 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -131,7 +131,8 @@ class Instance { /** * Binds the account to be charged for resources used by the buffer. This - * should only be called once. + * should only be called when the buffer is empty as existing slices + * within the buffer won't retroactively get tagged. * * @param account a shared_ptr to the account to charge. */ diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index de5e923d2a4ad..3187cae8abe52 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -43,8 +43,6 @@ void OwnedImpl::addDrainTracker(std::function drain_tracker) { void OwnedImpl::bindAccount(BufferMemoryAccountSharedPtr account) { ASSERT(slices_.empty()); - // We don't yet have an account bound. - ASSERT(!account_); account_ = std::move(account); } diff --git a/test/integration/BUILD b/test/integration/BUILD index 5171d1ac086be..bfb87dc77920e 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1324,6 +1324,7 @@ envoy_cc_test_library( ], deps = [ "//source/common/buffer:watermark_buffer_lib", + "//test/test_common:utility_lib", ], ) diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 6d518a9ca3ea9..5f90352720a45 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -138,8 +138,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { upstream_request1 = std::move(upstream_request_); // Check the expected number of buffers per account - EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 4); - EXPECT_EQ(buffer_factory_->numAccountsActive(), 1); + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); // Send the second request. auto response2 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); @@ -147,8 +146,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { upstream_request2 = std::move(upstream_request_); // Check the expected number of buffers per account - EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 8); - EXPECT_EQ(buffer_factory_->numAccountsActive(), 2); + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 8)); // Respond to the first request and wait for complete upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); @@ -157,8 +155,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { ASSERT_TRUE(upstream_request1->complete()); // Check the expected number of buffers per account - EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 4); - EXPECT_EQ(buffer_factory_->numAccountsActive(), 1); + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); // Respond to the second request and wait for complete upstream_request2->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); @@ -167,8 +164,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { ASSERT_TRUE(upstream_request2->complete()); // Check the expected number of buffers per account - EXPECT_EQ(buffer_factory_->numBuffersActivelyBound(), 0); - EXPECT_EQ(buffer_factory_->numAccountsActive(), 0); + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { @@ -196,12 +192,10 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { auto responses = sendRequests(num_requests, request_body_size, response_body_size); // Wait for all requests to have accounted for the requests we've sent. - ASSERT_TRUE( + EXPECT_TRUE( buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) << "buffer total: " << buffer_factory_->totalBufferSize() - << " buffer max: " << buffer_factory_->maxBufferSize() - << " active accounts: " << buffer_factory_->numAccountsActive() - << " active bound buffers: " << buffer_factory_->numBuffersActivelyBound() << printAccounts(); + << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); writev_matcher_->setResumeWrites(); @@ -236,12 +230,10 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { auto responses = sendRequests(num_requests, request_body_size, response_body_size); // Wait for all requests to buffered the response from upstream. - ASSERT_TRUE( + EXPECT_TRUE( buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) << "buffer total: " << buffer_factory_->totalBufferSize() - << " buffer max: " << buffer_factory_->maxBufferSize() - << " active accounts: " << buffer_factory_->numAccountsActive() - << " active bound buffers: " << buffer_factory_->numBuffersActivelyBound() << printAccounts(); + << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); writev_matcher_->setResumeWrites(); diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 0862dc6a8360a..b35e5ade77dec 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -170,6 +170,18 @@ bool TrackedWatermarkBufferFactory::waitForExpectedAccountBalanceWithTimeout( return expected_balances_met_.WaitForNotificationWithTimeout(absl::FromChrono(timeout)); } +bool TrackedWatermarkBufferFactory::waitUntilExpectedNumberOfAccountsAndBoundBuffers( + uint32_t num_accounts, uint32_t num_bound_buffers, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, num_accounts, num_bound_buffers]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + removeDanglingAccounts(); + return num_bound_buffers == actively_bound_buffers_.size() && + num_accounts == account_infos_.size(); + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::FromChrono(timeout)); +} + void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { if (!expected_balances_ || expected_balances_met_.HasBeenNotified()) { return; @@ -192,16 +204,5 @@ void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { expected_balances_met_.Notify(); } -uint64_t TrackedWatermarkBufferFactory::numAccountsActive() { - absl::MutexLock lock(&mutex_); - removeDanglingAccounts(); - return account_infos_.size(); -} - -uint64_t TrackedWatermarkBufferFactory::numBuffersActivelyBound() const { - absl::MutexLock lock(&mutex_); - return actively_bound_buffers_.size(); -} - } // namespace Buffer } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 67521f933152c..27166904f654b 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -5,6 +5,8 @@ #include "common/buffer/buffer_impl.h" #include "common/buffer/watermark_buffer.h" +#include "test/test_common/utility.h" + #include "absl/container/node_hash_map.h" #include "absl/synchronization/mutex.h" #include "absl/synchronization/notification.h" @@ -86,7 +88,9 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { } // Wait until total bytes buffered exceeds the a given size. - bool waitUntilTotalBufferedExceeds(uint64_t byte_size, std::chrono::milliseconds timeout); + bool + waitUntilTotalBufferedExceeds(uint64_t byte_size, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); // Set the expected account balance, prior to sending requests. // The test thread can then wait for this condition to be true. @@ -96,12 +100,17 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // The Envoy worker thread will notify the test thread once the condition is // met. void setExpectedAccountBalance(uint64_t byte_size, uint32_t num_accounts); - bool waitForExpectedAccountBalanceWithTimeout(std::chrono::milliseconds timeout); + bool waitForExpectedAccountBalanceWithTimeout( + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - // Number of accounts bound to a buffer that's still in use. - uint64_t numAccountsActive(); - // Number of active buffers that had a call to bind. - uint64_t numBuffersActivelyBound() const; + // Wait for the expected number of accounts and number of bound buffers. + // + // Due to deferred deletion, it possible that the Envoy hasn't cleaned up on + // its end, but the stream has been completed. This avoids that by awaiting + // for the side effect of the deletion to have occurred. + bool waitUntilExpectedNumberOfAccountsAndBoundBuffers( + uint32_t num_accounts, uint32_t num_bound_buffers, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); using AccountToBoundBuffersMap = absl::flat_hash_map(); - ASSERT_EQ(factory_.numBuffersActivelyBound(), 0); + ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); buffer1->bindAccount(account); - EXPECT_EQ(factory_.numBuffersActivelyBound(), 1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); buffer2->bindAccount(account); - EXPECT_EQ(factory_.numBuffersActivelyBound(), 2); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); buffer3->bindAccount(account); - EXPECT_EQ(factory_.numBuffersActivelyBound(), 3); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 3)); // Release test access to the account. account.reset(); buffer3.reset(); - EXPECT_EQ(factory_.numBuffersActivelyBound(), 2); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); buffer2.reset(); - EXPECT_EQ(factory_.numBuffersActivelyBound(), 1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); buffer1.reset(); - EXPECT_EQ(factory_.numBuffersActivelyBound(), 0); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { @@ -156,26 +156,26 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { auto buffer2 = factory_.create([]() {}, []() {}, []() {}); auto buffer3 = factory_.create([]() {}, []() {}, []() {}); BufferMemoryAccountSharedPtr account1 = std::make_shared(); - ASSERT_EQ(factory_.numAccountsActive(), 0); + ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); buffer1->bindAccount(account1); - EXPECT_EQ(factory_.numAccountsActive(), 1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); buffer2->bindAccount(account1); - EXPECT_EQ(factory_.numAccountsActive(), 1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); // Release test access to the account. account1.reset(); buffer3->bindAccount(std::make_shared()); - EXPECT_EQ(factory_.numAccountsActive(), 2); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 3)); buffer2.reset(); - EXPECT_EQ(factory_.numAccountsActive(), 2); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 2)); buffer1.reset(); - EXPECT_EQ(factory_.numAccountsActive(), 1); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); buffer3.reset(); - EXPECT_EQ(factory_.numAccountsActive(), 0); + EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrueWhenConditionsMet) { diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 4d29cb61787b2..1a3d4e8bcae67 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -2,7 +2,6 @@ using testing::_; using testing::Invoke; -using testing::Return; using testing::ReturnRef; namespace Envoy { From 091dc3e8df231eae226c568e6579215310342c60 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Mon, 10 May 2021 22:33:49 +0000 Subject: [PATCH 09/14] Added disabled by default test only runtime guard. Signed-off-by: Kevin Baichoo --- source/common/http/async_client_impl.h | 2 +- source/common/http/conn_manager_impl.cc | 14 ++-- source/common/runtime/runtime_features.cc | 2 + .../buffer_accounting_integration_test.cc | 70 ++++++++++++++----- test/integration/tracked_watermark_buffer.cc | 9 +-- 5 files changed, 69 insertions(+), 28 deletions(-) diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 7055253f2af29..0584e71c0f9c0 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -365,7 +365,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; } void clearRouteCache() override {} uint64_t streamId() const override { return stream_id_; } - // TODO(kbaichoo): Implement this? + // TODO(kbaichoo): Plumb account from owning request filter. Buffer::BufferMemoryAccountSharedPtr account() const override { return nullptr; } Tracing::Span& activeSpan() override { return active_span_; } const Tracing::Config& tracingConfig() override { return tracing_config_; } diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 5f521ec10b14c..84494860949fd 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -260,13 +260,17 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection()); - // Set the account to start accounting. - Buffer::BufferMemoryAccountSharedPtr downstream_request_account = - std::make_shared(); - response_encoder.getStream().setAccount(downstream_request_account); - + // Set the account to start accounting if enabled. This is still a + // work-in-progress, and will be removed when other features using the + // accounting are implemented. + Buffer::BufferMemoryAccountSharedPtr downstream_request_account; + if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) { + downstream_request_account = std::make_shared(); + response_encoder.getStream().setAccount(downstream_request_account); + } ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), std::move(downstream_request_account))); + new_stream->state_.is_internally_created_ = is_internally_created; new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index cbabc221f4b39..09d1380068cd4 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -114,6 +114,8 @@ constexpr const char* disabled_runtime_features[] = { "envoy.reloadable_features.remove_legacy_json", // Sentinel and test flag. "envoy.reloadable_features.test_feature_false", + // TODO(kbaichoo): Remove when this is no longer test only. + "envoy.test_only.per_stream_buffer_accounting", }; RuntimeFeatures::RuntimeFeatures() { diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 5f90352720a45..e640bea2e52c7 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -19,10 +19,22 @@ #include "socket_interface_swap.h" namespace Envoy { +namespace { +std::string IpVersionAndBufferAccountingTestParamsToString( + const ::testing::TestParamInfo>& params) { + return fmt::format( + "{}_{}", + TestUtility::ipTestParamsToString(::testing::TestParamInfo( + std::get<0>(params.param), params.index)), + std::get<1>(params.param) ? "with_per_stream_buffer_accounting" + : "without_per_stream_buffer_accounting"); +} +} // namespace -class HttpBufferWatermarksTest : public SocketInterfaceSwap, - public testing::TestWithParam, - public HttpIntegrationTest { +class HttpBufferWatermarksTest + : public SocketInterfaceSwap, + public testing::TestWithParam>, + public HttpIntegrationTest { public: struct BufferParams { const uint32_t connection_watermark; @@ -89,7 +101,10 @@ class HttpBufferWatermarksTest : public SocketInterfaceSwap, // TODO(kbaichoo): Parameterize on the client codec type when other protocols // (H1, H3) support buffer accounting. - HttpBufferWatermarksTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, GetParam()) { + HttpBufferWatermarksTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) { + config_helper_.addRuntimeOverride("envoy.test_only.per_stream_buffer_accounting", + streamBufferAccounting() ? "true" : "false"); setServerBufferFactory(buffer_factory_); setDownstreamProtocol(Http::CodecClient::Type::HTTP2); setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); @@ -99,6 +114,8 @@ class HttpBufferWatermarksTest : public SocketInterfaceSwap, std::shared_ptr buffer_factory_ = std::make_shared(); + bool streamBufferAccounting() { return std::get<1>(GetParam()); } + std::string printAccounts() { std::stringstream stream; auto print_map = @@ -118,9 +135,10 @@ class HttpBufferWatermarksTest : public SocketInterfaceSwap, } }; -INSTANTIATE_TEST_SUITE_P(IpVersions, HttpBufferWatermarksTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - TestUtility::ipTestParamsToString); +INSTANTIATE_TEST_SUITE_P( + IpVersions, HttpBufferWatermarksTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), + IpVersionAndBufferAccountingTestParamsToString); // We should create four buffers each billing the same downstream request's // account which originated the chain. @@ -138,7 +156,11 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { upstream_request1 = std::move(upstream_request_); // Check the expected number of buffers per account - EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); + } else { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + } // Send the second request. auto response2 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); @@ -146,7 +168,11 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { upstream_request2 = std::move(upstream_request_); // Check the expected number of buffers per account - EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 8)); + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 8)); + } else { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + } // Respond to the first request and wait for complete upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); @@ -155,7 +181,11 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { ASSERT_TRUE(upstream_request1->complete()); // Check the expected number of buffers per account - EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 4)); + } else { + EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); + } // Respond to the second request and wait for complete upstream_request2->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); @@ -192,10 +222,12 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { auto responses = sendRequests(num_requests, request_body_size, response_body_size); // Wait for all requests to have accounted for the requests we've sent. - EXPECT_TRUE( - buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) - << "buffer total: " << buffer_factory_->totalBufferSize() - << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); + if (streamBufferAccounting()) { + EXPECT_TRUE( + buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); + } writev_matcher_->setResumeWrites(); @@ -230,10 +262,12 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { auto responses = sendRequests(num_requests, request_body_size, response_body_size); // Wait for all requests to buffered the response from upstream. - EXPECT_TRUE( - buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) - << "buffer total: " << buffer_factory_->totalBufferSize() - << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); + if (streamBufferAccounting()) { + EXPECT_TRUE( + buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts(); + } writev_matcher_->setResumeWrites(); diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index b35e5ade77dec..91b0f115c8e65 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -61,10 +61,11 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, }, [this](BufferMemoryAccountSharedPtr& account, TrackedWatermarkBuffer* buffer) { absl::MutexLock lock(&mutex_); - // Buffers should only be bound once. - ASSERT(actively_bound_buffers_.find(buffer) == actively_bound_buffers_.end()); - account_infos_[account].emplace(buffer); - actively_bound_buffers_.emplace(buffer); + // Only track non-null accounts. + if (account) { + account_infos_[account].emplace(buffer); + actively_bound_buffers_.emplace(buffer); + } }, below_low_watermark, above_high_watermark, above_overflow_watermark); } From 3e5f947dcac796276705e7ba3817141194089ce2 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Tue, 11 May 2021 12:35:39 +0000 Subject: [PATCH 10/14] Clangtidy. Signed-off-by: Kevin Baichoo --- test/integration/buffer_accounting_integration_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index e640bea2e52c7..424884671eb48 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -20,7 +20,7 @@ namespace Envoy { namespace { -std::string IpVersionAndBufferAccountingTestParamsToString( +std::string ipVersionAndBufferAccountingTestParamsToString( const ::testing::TestParamInfo>& params) { return fmt::format( "{}_{}", @@ -138,7 +138,7 @@ class HttpBufferWatermarksTest INSTANTIATE_TEST_SUITE_P( IpVersions, HttpBufferWatermarksTest, testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), - IpVersionAndBufferAccountingTestParamsToString); + ipVersionAndBufferAccountingTestParamsToString); // We should create four buffers each billing the same downstream request's // account which originated the chain. From 2e65c5d73f0e17a0cbd5765c1af03e3d48a8d0e3 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Wed, 19 May 2021 20:40:35 +0000 Subject: [PATCH 11/14] Cleanup tests. Signed-off-by: Kevin Baichoo --- include/envoy/http/codec.h | 2 - .../buffer_accounting_integration_test.cc | 71 +++---------------- test/integration/tracked_watermark_buffer.cc | 26 +++---- test/integration/tracked_watermark_buffer.h | 2 +- 4 files changed, 22 insertions(+), 79 deletions(-) diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 00d243f185d6a..e878f79d49202 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -383,8 +383,6 @@ class Stream { /** * Sets the account for this stream, propagating it to all of its buffers. - * This should only be called on client streams since server streams create - * their own account. * @param the account to assign this stream. */ virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE; diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 424884671eb48..9a0ab7a26fffe 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -36,51 +36,6 @@ class HttpBufferWatermarksTest public testing::TestWithParam>, public HttpIntegrationTest { public: - struct BufferParams { - const uint32_t connection_watermark; - const uint32_t downstream_h2_stream_window; - const uint32_t downstream_h2_conn_window; - const uint32_t upstream_h2_stream_window; - const uint32_t upstream_h2_conn_window; - }; - - // Configures the buffers with the given parameters. - void initializeWithBufferConfig(const BufferParams& buffer_params, uint32_t num_responses) { - config_helper_.setBufferLimits(buffer_params.connection_watermark, - buffer_params.connection_watermark); - - config_helper_.addConfigModifier( - [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& - hcm) -> void { - auto* h2_options = hcm.mutable_http2_protocol_options(); - h2_options->mutable_max_concurrent_streams()->set_value(num_responses); - h2_options->mutable_initial_stream_window_size()->set_value( - buffer_params.downstream_h2_stream_window); - h2_options->mutable_initial_connection_window_size()->set_value( - buffer_params.downstream_h2_conn_window); - }); - - config_helper_.addConfigModifier( - [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { - ConfigHelper::HttpProtocolOptions protocol_options; - auto* upstream_h2_options = - protocol_options.mutable_explicit_http_config()->mutable_http2_protocol_options(); - upstream_h2_options->mutable_max_concurrent_streams()->set_value(100); - upstream_h2_options->mutable_initial_stream_window_size()->set_value( - buffer_params.upstream_h2_stream_window); - upstream_h2_options->mutable_initial_connection_window_size()->set_value( - buffer_params.upstream_h2_conn_window); - for (auto& cluster_config : *bootstrap.mutable_static_resources()->mutable_clusters()) { - ConfigHelper::setProtocolOptions(cluster_config, protocol_options); - } - }); - - autonomous_upstream_ = true; - autonomous_allow_incomplete_streams_ = true; - - initialize(); - } - std::vector sendRequests(uint32_t num_responses, uint32_t request_body_size, uint32_t response_body_size) { std::vector responses; @@ -199,18 +154,13 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { const int num_requests = 5; - const uint32_t connection_watermark = 32768; - const uint32_t downstream_h2_stream_window = 512 * 1024; - const uint32_t downstream_h2_conn_window = 64 * 1024; - const uint32_t upstream_h2_stream_window = 64 * 1024; - const uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited const uint32_t request_body_size = 4096; const uint32_t response_body_size = 4096; - initializeWithBufferConfig({connection_watermark, downstream_h2_stream_window, - downstream_h2_conn_window, upstream_h2_stream_window, - upstream_h2_conn_window}, - num_requests); + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + initialize(); + buffer_factory_->setExpectedAccountBalance(request_body_size, num_requests); // Makes us have Envoy's writes to upstream return EAGAIN @@ -239,18 +189,13 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { const int num_requests = 5; - const uint32_t connection_watermark = 32768; - const uint32_t downstream_h2_stream_window = 512 * 1024; - const uint32_t downstream_h2_conn_window = 64 * 1024; - const uint32_t upstream_h2_stream_window = 64 * 1024; - const uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited const uint32_t request_body_size = 4096; const uint32_t response_body_size = 16384; - initializeWithBufferConfig({connection_watermark, downstream_h2_stream_window, - downstream_h2_conn_window, upstream_h2_stream_window, - upstream_h2_conn_window}, - num_requests); + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + initialize(); + buffer_factory_->setExpectedAccountBalance(response_body_size, num_requests); writev_matcher_->setSourcePort(lookupPort("http")); codec_client_ = makeHttpConnection(lookupPort("http")); diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 91b0f115c8e65..5a49e24ea9a78 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -152,6 +152,7 @@ void TrackedWatermarkBufferFactory::removeDanglingAccounts() { // Remove all "dangling" accounts. if (accounts_it->first.use_count() == 1) { + ASSERT(accounts_it->second.empty()); account_infos_.erase(accounts_it); } @@ -159,11 +160,11 @@ void TrackedWatermarkBufferFactory::removeDanglingAccounts() { } } -void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size, +void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size_per_account, uint32_t num_accounts) { absl::MutexLock lock(&mutex_); ASSERT(!expected_balances_.has_value()); - expected_balances_.emplace(byte_size, num_accounts); + expected_balances_.emplace(byte_size_per_account, num_accounts); } bool TrackedWatermarkBufferFactory::waitForExpectedAccountBalanceWithTimeout( @@ -189,20 +190,19 @@ void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { } removeDanglingAccounts(); - if (account_infos_.size() < expected_balances_->num_accounts_) { - return; - } - // This is thread safe since this function should run on the only Envoy worker - // thread. - for (auto& acc : account_infos_) { - if (static_cast(acc.first.get())->balance() < - expected_balances_->balance_per_account_) { - return; + if (account_infos_.size() == expected_balances_->num_accounts_) { + // This is thread safe since this function should run on the only Envoy worker + // thread. + for (auto& acc : account_infos_) { + if (static_cast(acc.first.get())->balance() < + expected_balances_->balance_per_account_) { + return; + } } - } - expected_balances_met_.Notify(); + expected_balances_met_.Notify(); + } } } // namespace Buffer diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 27166904f654b..8963de177e43e 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -99,7 +99,7 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // // The Envoy worker thread will notify the test thread once the condition is // met. - void setExpectedAccountBalance(uint64_t byte_size, uint32_t num_accounts); + void setExpectedAccountBalance(uint64_t byte_size_per_account, uint32_t num_accounts); bool waitForExpectedAccountBalanceWithTimeout( std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); From c06b5d5285c7ecb7bc5908bd23b05d8713830692 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Thu, 20 May 2021 19:16:42 +0000 Subject: [PATCH 12/14] Additional test infra changes. Signed-off-by: Kevin Baichoo --- .../buffer_accounting_integration_test.cc | 13 +++---- test/integration/tracked_watermark_buffer.cc | 36 +++++++++++++++++++ test/integration/tracked_watermark_buffer.h | 7 ++-- test/per_file_coverage.sh | 2 +- 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 9a0ab7a26fffe..1ccfdb1121336 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -77,15 +77,16 @@ class HttpBufferWatermarksTest [&stream](Buffer::TrackedWatermarkBufferFactory::AccountToBoundBuffersMap& map) { stream << "Printing Account map. Size: " << map.size() << '\n'; for (auto& entry : map) { - // We can't access the accounts balance in a thread safe way here. + // This runs in the context of the worker thread, so we can access + // the balance. stream << " Account: " << entry.first << '\n'; - for (auto& buffer : entry.second) { - stream << " Buffer: " << buffer << '\n'; - } + stream << " Balance:" + << static_cast(entry.first.get())->balance() + << '\n'; + stream << " Number of associated buffers: " << entry.second.size() << '\n'; } }; - - buffer_factory_->inspectAccounts(print_map); + buffer_factory_->inspectAccounts(print_map, test_server_->server()); return stream.str(); } }; diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 5a49e24ea9a78..a6bbeb1b00816 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -1,5 +1,9 @@ #include "test/integration/tracked_watermark_buffer.h" +#include "envoy/thread/thread.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/thread_local/thread_local_object.h" + #include "common/common/assert.h" namespace Envoy { @@ -160,6 +164,38 @@ void TrackedWatermarkBufferFactory::removeDanglingAccounts() { } } +void TrackedWatermarkBufferFactory::inspectAccounts( + std::function func, Server::Instance& server) { + absl::Notification done_notification; + ThreadLocal::TypedSlotPtr<> slot; + Envoy::Thread::ThreadId main_tid; + + server.dispatcher().post([&] { + slot = ThreadLocal::TypedSlot<>::makeUnique(server.threadLocal()); + slot->set( + [](Envoy::Event::Dispatcher&) -> std::shared_ptr { + return nullptr; + }); + + main_tid = server.api().threadFactory().currentThreadId(); + + slot->runOnAllThreads( + [&main_tid, &server, &func, this](OptRef) { + // Run on the worker thread. + if (server.api().threadFactory().currentThreadId() != main_tid) { + absl::MutexLock lock(&(this->mutex_)); + func(this->account_infos_); + } + }, + [&slot, &done_notification] { + slot.reset(nullptr); + done_notification.Notify(); + }); + }); + + done_notification.WaitForNotification(); +} + void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size_per_account, uint32_t num_accounts) { absl::MutexLock lock(&mutex_); diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 8963de177e43e..bc05db5d4ca03 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/buffer/buffer.h" +#include "envoy/server/instance.h" #include "common/buffer/buffer_impl.h" #include "common/buffer/watermark_buffer.h" @@ -115,10 +116,8 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { using AccountToBoundBuffersMap = absl::flat_hash_map>; - void inspectAccounts(std::function func) { - absl::MutexLock lock(&mutex_); - func(account_infos_); - } + void inspectAccounts(std::function func, + Server::Instance& server); private: // Remove "dangling" accounts; accounts where the account_info map is the only diff --git a/test/per_file_coverage.sh b/test/per_file_coverage.sh index b0b6f6a11f3e4..b4e80d79304d0 100755 --- a/test/per_file_coverage.sh +++ b/test/per_file_coverage.sh @@ -18,7 +18,7 @@ declare -a KNOWN_LOW_COVERAGE=( "source/common/singleton:95.1" "source/common/thread:0.0" # Death tests don't report LCOV "source/common/matcher:93.3" -"source/common/quic:87.8" +"source/common/quic:87.7" "source/common/tracing:95.7" "source/common/watchdog:42.9" # Death tests don't report LCOV "source/exe:94.3" From 7b32a8df061c4d4b5d2e3e92785c4fc20fc83494 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Fri, 21 May 2021 13:06:30 +0000 Subject: [PATCH 13/14] Nit fix. Signed-off-by: Kevin Baichoo --- test/integration/tracked_watermark_buffer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index a6bbeb1b00816..3bc07cb7a9252 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -180,7 +180,7 @@ void TrackedWatermarkBufferFactory::inspectAccounts( main_tid = server.api().threadFactory().currentThreadId(); slot->runOnAllThreads( - [&main_tid, &server, &func, this](OptRef) { + [main_tid, &server, &func, this](OptRef) { // Run on the worker thread. if (server.api().threadFactory().currentThreadId() != main_tid) { absl::MutexLock lock(&(this->mutex_)); From 03e7a137f7ae0d830e253fabc1f767c95a5245b0 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Fri, 21 May 2021 13:07:56 +0000 Subject: [PATCH 14/14] Coverage revert. Signed-off-by: Kevin Baichoo --- test/per_file_coverage.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/per_file_coverage.sh b/test/per_file_coverage.sh index b4e80d79304d0..b0b6f6a11f3e4 100755 --- a/test/per_file_coverage.sh +++ b/test/per_file_coverage.sh @@ -18,7 +18,7 @@ declare -a KNOWN_LOW_COVERAGE=( "source/common/singleton:95.1" "source/common/thread:0.0" # Death tests don't report LCOV "source/common/matcher:93.3" -"source/common/quic:87.7" +"source/common/quic:87.8" "source/common/tracing:95.7" "source/common/watchdog:42.9" # Death tests don't report LCOV "source/exe:94.3"