From 93b58ed0bd897c16d42b30e2ef016a6d491e7e8b Mon Sep 17 00:00:00 2001 From: phlax Date: Fri, 30 Jul 2021 18:35:39 +0100 Subject: [PATCH 1/7] api: Revert #17536 as proto_format expects the header to be there (#17548) Signed-off-by: Ryan Northey --- api/BUILD | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/BUILD b/api/BUILD index 3e990fa9783a5..a70eae799d797 100644 --- a/api/BUILD +++ b/api/BUILD @@ -1,3 +1,5 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + load("@rules_proto//proto:defs.bzl", "proto_library") licenses(["notice"]) # Apache 2 From 252872bae41d5751f1176956e5588cca9c085ffb Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Fri, 30 Jul 2021 13:52:35 -0700 Subject: [PATCH 2/7] listener: active connection move only (#17551) Signed-off-by: Yuchen Dai --- source/server/BUILD | 4 ++ source/server/active_stream_listener_base.cc | 59 +++++++++++++++++++ source/server/active_stream_listener_base.h | 44 ++++++++++++++ source/server/active_tcp_listener.cc | 62 +------------------- source/server/active_tcp_listener.h | 45 -------------- 5 files changed, 110 insertions(+), 104 deletions(-) diff --git a/source/server/BUILD b/source/server/BUILD index 26825d04a6de3..630e61710f15a 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -157,6 +157,7 @@ envoy_cc_library( name = "active_tcp_listener_headers", hdrs = [ "active_stream_listener_base.h", + "active_tcp_listener.h", "active_tcp_socket.h", ], deps = [ @@ -171,7 +172,10 @@ envoy_cc_library( "//envoy/network:listen_socket_interface", "//envoy/network:listener_interface", "//envoy/server:listener_manager_interface", + "//source/common/common:assert_lib", "//source/common/common:linked_object", + "//source/common/network:connection_lib", + "//source/common/stats:timespan_lib", ], ) diff --git a/source/server/active_stream_listener_base.cc b/source/server/active_stream_listener_base.cc index 39d336034c5fc..8bec6788040a9 100644 --- a/source/server/active_stream_listener_base.cc +++ b/source/server/active_stream_listener_base.cc @@ -2,6 +2,9 @@ #include "envoy/network/filter.h" +#include "source/common/stats/timespan_impl.h" +#include "source/server/active_tcp_listener.h" + namespace Envoy { namespace Server { @@ -58,5 +61,61 @@ void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& sock newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info)); } +ActiveConnections::ActiveConnections(ActiveTcpListener& listener, + const Network::FilterChain& filter_chain) + : listener_(listener), filter_chain_(filter_chain) {} + +ActiveConnections::~ActiveConnections() { + // connections should be defer deleted already. + ASSERT(connections_.empty()); +} + +ActiveTcpConnection::ActiveTcpConnection(ActiveConnections& active_connections, + Network::ConnectionPtr&& new_connection, + TimeSource& time_source, + std::unique_ptr&& stream_info) + : stream_info_(std::move(stream_info)), active_connections_(active_connections), + connection_(std::move(new_connection)), + conn_length_(new Stats::HistogramCompletableTimespanImpl( + active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) { + // We just universally set no delay on connections. Theoretically we might at some point want + // to make this configurable. + connection_->noDelay(true); + auto& listener = active_connections_.listener_; + listener.stats_.downstream_cx_total_.inc(); + listener.stats_.downstream_cx_active_.inc(); + listener.per_worker_stats_.downstream_cx_total_.inc(); + listener.per_worker_stats_.downstream_cx_active_.inc(); + + // Active connections on the handler (not listener). The per listener connections have already + // been incremented at this point either via the connection balancer or in the socket accept + // path if there is no configured balancer. + listener.parent_.incNumConnections(); +} + +ActiveTcpConnection::~ActiveTcpConnection() { + ActiveStreamListenerBase::emitLogs(*active_connections_.listener_.config_, *stream_info_); + auto& listener = active_connections_.listener_; + listener.stats_.downstream_cx_active_.dec(); + listener.stats_.downstream_cx_destroy_.inc(); + listener.per_worker_stats_.downstream_cx_active_.dec(); + conn_length_->complete(); + + // Active listener connections (not handler). + listener.decNumConnections(); + + // Active handler connections (not listener). + listener.parent_.decNumConnections(); +} + +void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) { + ENVOY_LOG(trace, "[C{}] connection on event {}", connection_->id(), static_cast(event)); + // Any event leads to destruction of the connection. + if (event == Network::ConnectionEvent::LocalClose || + event == Network::ConnectionEvent::RemoteClose) { + active_connections_.listener_.removeConnection(*this); + } +} + } // namespace Server } // namespace Envoy diff --git a/source/server/active_stream_listener_base.h b/source/server/active_stream_listener_base.h index 2d99e3965bf69..f57a5a693c0eb 100644 --- a/source/server/active_stream_listener_base.h +++ b/source/server/active_stream_listener_base.h @@ -10,6 +10,7 @@ #include "envoy/network/connection.h" #include "envoy/network/connection_handler.h" #include "envoy/network/listener.h" +#include "envoy/stats/timespan.h" #include "envoy/stream_info/stream_info.h" #include "source/common/common/linked_object.h" @@ -135,5 +136,48 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase, Event::Dispatcher& dispatcher_; }; +struct ActiveTcpConnection; +class ActiveTcpListener; + +/** + * Wrapper for a group of active connections which are attached to the same filter chain context. + */ +class ActiveConnections : public Event::DeferredDeletable { +public: + ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain); + ~ActiveConnections() override; + + // listener filter chain pair is the owner of the connections + ActiveTcpListener& listener_; + const Network::FilterChain& filter_chain_; + // Owned connections + std::list> connections_; +}; + +/** + * Wrapper for an active TCP connection owned by this handler. + */ +struct ActiveTcpConnection : LinkedObject, + public Event::DeferredDeletable, + public Network::ConnectionCallbacks, + Logger::Loggable { + ActiveTcpConnection(ActiveConnections& active_connections, + Network::ConnectionPtr&& new_connection, TimeSource& time_system, + std::unique_ptr&& stream_info); + ~ActiveTcpConnection() override; + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + std::unique_ptr stream_info_; + ActiveConnections& active_connections_; + Network::ConnectionPtr connection_; + Stats::TimespanPtr conn_length_; +}; + +using ActiveConnectionPtr = std::unique_ptr; +using ActiveConnectionCollectionPtr = std::unique_ptr; + } // namespace Server } // namespace Envoy diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index cee5494394436..1d867d4a1033c 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -167,9 +167,9 @@ void ActiveTcpListener::newActiveConnection(const Network::FilterChain& filter_c Network::ServerConnectionPtr server_conn_ptr, std::unique_ptr stream_info) { auto& active_connections = getOrCreateActiveConnections(filter_chain); - ActiveTcpConnectionPtr active_connection( - new ActiveTcpConnection(active_connections, std::move(server_conn_ptr), - dispatcher().timeSource(), std::move(stream_info))); + auto active_connection = + std::make_unique(active_connections, std::move(server_conn_ptr), + dispatcher().timeSource(), std::move(stream_info)); // If the connection is already closed, we can just let this connection immediately die. if (active_connection->connection_->state() != Network::Connection::State::Closed) { ENVOY_CONN_LOG(debug, "new connection from {}", *active_connection->connection_, @@ -207,61 +207,5 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { }); } -ActiveConnections::ActiveConnections(ActiveTcpListener& listener, - const Network::FilterChain& filter_chain) - : listener_(listener), filter_chain_(filter_chain) {} - -ActiveConnections::~ActiveConnections() { - // connections should be defer deleted already. - ASSERT(connections_.empty()); -} - -ActiveTcpConnection::ActiveTcpConnection(ActiveConnections& active_connections, - Network::ConnectionPtr&& new_connection, - TimeSource& time_source, - std::unique_ptr&& stream_info) - : stream_info_(std::move(stream_info)), active_connections_(active_connections), - connection_(std::move(new_connection)), - conn_length_(new Stats::HistogramCompletableTimespanImpl( - active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) { - // We just universally set no delay on connections. Theoretically we might at some point want - // to make this configurable. - connection_->noDelay(true); - auto& listener = active_connections_.listener_; - listener.stats_.downstream_cx_total_.inc(); - listener.stats_.downstream_cx_active_.inc(); - listener.per_worker_stats_.downstream_cx_total_.inc(); - listener.per_worker_stats_.downstream_cx_active_.inc(); - - // Active connections on the handler (not listener). The per listener connections have already - // been incremented at this point either via the connection balancer or in the socket accept - // path if there is no configured balancer. - listener.parent_.incNumConnections(); -} - -ActiveTcpConnection::~ActiveTcpConnection() { - ActiveStreamListenerBase::emitLogs(*active_connections_.listener_.config_, *stream_info_); - auto& listener = active_connections_.listener_; - listener.stats_.downstream_cx_active_.dec(); - listener.stats_.downstream_cx_destroy_.inc(); - listener.per_worker_stats_.downstream_cx_active_.dec(); - conn_length_->complete(); - - // Active listener connections (not handler). - listener.decNumConnections(); - - // Active handler connections (not listener). - listener.parent_.decNumConnections(); -} - -void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) { - ENVOY_LOG(trace, "[C{}] connection on event {}", connection_->id(), static_cast(event)); - // Any event leads to destruction of the connection. - if (event == Network::ConnectionEvent::LocalClose || - event == Network::ConnectionEvent::RemoteClose) { - active_connections_.listener_.removeConnection(*this); - } -} - } // namespace Server } // namespace Envoy diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index 9ea378f445395..79344d5afd474 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -11,12 +11,6 @@ namespace Envoy { namespace Server { - -struct ActiveTcpConnection; -using ActiveTcpConnectionPtr = std::unique_ptr; -class ActiveConnections; -using ActiveConnectionCollectionPtr = std::unique_ptr; - namespace { // Structure used to allow a unique_ptr to be captured in a posted lambda. See below. struct RebalancedSocket { @@ -106,44 +100,5 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, }; using ActiveTcpListenerOptRef = absl::optional>; - -/** - * Wrapper for a group of active connections which are attached to the same filter chain context. - */ -class ActiveConnections : public Event::DeferredDeletable { -public: - ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain); - ~ActiveConnections() override; - - // listener filter chain pair is the owner of the connections - ActiveTcpListener& listener_; - const Network::FilterChain& filter_chain_; - // Owned connections - std::list connections_; -}; - -/** - * Wrapper for an active TCP connection owned by this handler. - */ -struct ActiveTcpConnection : LinkedObject, - public Event::DeferredDeletable, - public Network::ConnectionCallbacks, - Logger::Loggable { - ActiveTcpConnection(ActiveConnections& active_connections, - Network::ConnectionPtr&& new_connection, TimeSource& time_system, - std::unique_ptr&& stream_info); - ~ActiveTcpConnection() override; - - // Network::ConnectionCallbacks - void onEvent(Network::ConnectionEvent event) override; - void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} - - std::unique_ptr stream_info_; - ActiveConnections& active_connections_; - Network::ConnectionPtr connection_; - Stats::TimespanPtr conn_length_; -}; - } // namespace Server } // namespace Envoy From 38b6fd5a7f28e33b375f02e2b4546e47adefb5f0 Mon Sep 17 00:00:00 2001 From: Snow Pettersen Date: Fri, 30 Jul 2021 17:06:15 -0400 Subject: [PATCH 3/7] examples: pin version of go-control-plane (#17553) Signed-off-by: Snow Pettersen --- examples/dynamic-config-cp/Dockerfile-control-plane | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/dynamic-config-cp/Dockerfile-control-plane b/examples/dynamic-config-cp/Dockerfile-control-plane index 39c7f2ca4223b..3d475c7344219 100644 --- a/examples/dynamic-config-cp/Dockerfile-control-plane +++ b/examples/dynamic-config-cp/Dockerfile-control-plane @@ -8,5 +8,5 @@ RUN apt-get update \ RUN git clone https://github.com/envoyproxy/go-control-plane ADD ./resource.go /go/go-control-plane/internal/example/resource.go -RUN cd go-control-plane && make bin/example +RUN cd go-control-plane && git checkout b4adc3bb5fe5288bff01cd452dad418ef98c676e && make bin/example WORKDIR /go/go-control-plane From 2ba5beda1bb9718ab0da4e820f42e04a4aa42aa7 Mon Sep 17 00:00:00 2001 From: Ulf Adams Date: Fri, 30 Jul 2021 23:06:38 +0200 Subject: [PATCH 4/7] CI: avoid the legacy Bazel flag name auth_enabled (#17542) Risk Level: low Testing: n/a Docs Changes: n/a Release Notes: n/a Signed-off-by: Ulf Adams --- .bazelrc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.bazelrc b/.bazelrc index 2b2b0bad1f812..52382f91ec00b 100644 --- a/.bazelrc +++ b/.bazelrc @@ -220,7 +220,7 @@ build:remote --strategy=Javac=remote,sandboxed,local build:remote --strategy=Closure=remote,sandboxed,local build:remote --strategy=Genrule=remote,sandboxed,local build:remote --remote_timeout=7200 -build:remote --auth_enabled=true +build:remote --google_default_credentials=true build:remote --remote_download_toplevel # Windows bazel does not allow sandboxed as a spawn strategy @@ -229,7 +229,7 @@ build:remote-windows --strategy=Javac=remote,local build:remote-windows --strategy=Closure=remote,local build:remote-windows --strategy=Genrule=remote,local build:remote-windows --remote_timeout=7200 -build:remote-windows --auth_enabled=true +build:remote-windows --google_default_credentials=true build:remote-windows --remote_download_toplevel build:remote-clang --config=remote From 4506199dc7d735e0d30e0c9e3fe4805e8da56977 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Fri, 30 Jul 2021 14:36:17 -0700 Subject: [PATCH 5/7] Buffer: Implement tracking for BufferMemoryAccounts in WatermarkFactory. (#17093) This PR tracks memory accounts using >1MB of allocated space, with feedback mechanisms based on credits and debits on accounts. It further creates the handle from which the BufferMemoryAccount can reset the stream, and has the WatermarkBufferFactory also produce the particular BufferMemoryAccountImpl used for tracking. Risk Level: Medium Testing: Unit and Integration test Docs Changes: NA Release Notes: NA -- not yet user facing Platform Specific Features: NA Runtime guard: Yes, envoy.test_only.per_stream_buffer_accounting from #16218 sufficient Related Issue #15791 Signed-off-by: Kevin Baichoo --- envoy/buffer/BUILD | 1 + envoy/buffer/buffer.h | 26 +- envoy/http/BUILD | 6 + envoy/http/codec.h | 37 +- envoy/http/stream_reset_handler.h | 50 ++ source/common/buffer/BUILD | 1 + source/common/buffer/buffer_impl.h | 35 +- source/common/buffer/watermark_buffer.cc | 99 ++++ source/common/buffer/watermark_buffer.h | 125 +++++ source/common/http/conn_manager_impl.cc | 19 +- source/common/http/http1/codec_impl.cc | 15 + source/common/http/http1/codec_impl.h | 24 +- source/common/http/http2/codec_impl.cc | 25 + source/common/http/http2/codec_impl.h | 4 +- source/common/quic/envoy_quic_client_stream.h | 3 - .../common/quic/envoy_quic_server_stream.cc | 19 + source/common/quic/envoy_quic_server_stream.h | 4 +- source/common/quic/envoy_quic_stream.h | 8 + source/common/quic/envoy_quic_utils.cc | 1 + test/common/buffer/BUILD | 11 + .../buffer/buffer_memory_account_test.cc | 481 ++++++++++++++++++ test/common/buffer/owned_impl_test.cc | 271 ---------- test/integration/BUILD | 2 + .../buffer_accounting_integration_test.cc | 211 +++++++- test/integration/tracked_watermark_buffer.cc | 37 +- test/integration/tracked_watermark_buffer.h | 27 +- .../tracked_watermark_buffer_test.cc | 26 +- test/mocks/buffer/mocks.h | 2 + test/mocks/http/BUILD | 8 + test/mocks/http/stream.cc | 6 +- test/mocks/http/stream_reset_handler.h | 19 + 31 files changed, 1217 insertions(+), 386 deletions(-) create mode 100644 envoy/http/stream_reset_handler.h create mode 100644 test/common/buffer/buffer_memory_account_test.cc create mode 100644 test/mocks/http/stream_reset_handler.h diff --git a/envoy/buffer/BUILD b/envoy/buffer/BUILD index 3b9157d06f5fd..24a2e516527cd 100644 --- a/envoy/buffer/BUILD +++ b/envoy/buffer/BUILD @@ -16,6 +16,7 @@ envoy_cc_library( ], deps = [ "//envoy/api:os_sys_calls_interface", + "//envoy/http:stream_reset_handler_interface", "//source/common/common:assert_lib", "//source/common/common:byte_order_lib", "//source/common/common:utility_lib", diff --git a/envoy/buffer/buffer.h b/envoy/buffer/buffer.h index c30cbd84f2ca4..3c22e7c78a3e2 100644 --- a/envoy/buffer/buffer.h +++ b/envoy/buffer/buffer.h @@ -9,6 +9,7 @@ #include "envoy/common/exception.h" #include "envoy/common/platform.h" #include "envoy/common/pure.h" +#include "envoy/http/stream_reset_handler.h" #include "source/common/common/assert.h" #include "source/common/common/byte_order.h" @@ -109,6 +110,19 @@ class BufferMemoryAccount { * @param amount the amount to credit. */ virtual void credit(uint64_t amount) PURE; + + /** + * Clears the associated downstream with this account. + * After this has been called, calls to reset the downstream become no-ops. + * Must be called before downstream is deleted. + */ + virtual void clearDownstream() PURE; + + /** + * Reset the downstream stream associated with this account. Resetting the downstream stream + * should trigger a reset of the corresponding upstream stream if it exists. + */ + virtual void resetDownstream() PURE; }; using BufferMemoryAccountSharedPtr = std::shared_ptr; @@ -480,7 +494,8 @@ class Instance { using InstancePtr = std::unique_ptr; /** - * A factory for creating buffers which call callbacks when reaching high and low watermarks. + * An abstract factory for creating watermarked buffers and buffer memory + * accounts. The factory also supports tracking active memory accounts. */ class WatermarkFactory { public: @@ -497,6 +512,15 @@ class WatermarkFactory { virtual InstancePtr createBuffer(std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) PURE; + + /** + * Create and returns a buffer memory account. + * + * @param reset_handler supplies the stream_reset_handler the account will + * invoke to reset the stream. + * @return a BufferMemoryAccountSharedPtr of the newly created account. + */ + virtual BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) PURE; }; using WatermarkFactoryPtr = std::unique_ptr; diff --git a/envoy/http/BUILD b/envoy/http/BUILD index 09d26b373967a..ee9e63a01d6c2 100644 --- a/envoy/http/BUILD +++ b/envoy/http/BUILD @@ -43,6 +43,7 @@ envoy_cc_library( ":header_map_interface", ":metadata_interface", ":protocol_interface", + ":stream_reset_handler_interface", "//envoy/buffer:buffer_interface", "//envoy/grpc:status", "//envoy/network:address_interface", @@ -51,6 +52,11 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "stream_reset_handler_interface", + hdrs = ["stream_reset_handler.h"], +) + envoy_cc_library( name = "codes_interface", hdrs = ["codes.h"], diff --git a/envoy/http/codec.h b/envoy/http/codec.h index 3674fd88c3123..023b6a129bed3 100644 --- a/envoy/http/codec.h +++ b/envoy/http/codec.h @@ -11,6 +11,7 @@ #include "envoy/http/header_map.h" #include "envoy/http/metadata_interface.h" #include "envoy/http/protocol.h" +#include "envoy/http/stream_reset_handler.h" #include "envoy/network/address.h" #include "envoy/stream_info/stream_info.h" @@ -263,32 +264,6 @@ class ResponseDecoder : public virtual StreamDecoder { virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE; }; -/** - * Stream reset reasons. - */ -enum class StreamResetReason { - // If a local codec level reset was sent on the stream. - LocalReset, - // If a local codec level refused stream reset was sent on the stream (allowing for retry). - LocalRefusedStreamReset, - // If a remote codec level reset was received on the stream. - RemoteReset, - // If a remote codec level refused stream reset was received on the stream (allowing for retry). - RemoteRefusedStreamReset, - // If the stream was locally reset by a connection pool due to an initial connection failure. - ConnectionFailure, - // If the stream was locally reset due to connection termination. - ConnectionTermination, - // The stream was reset because of a resource overflow. - Overflow, - // Either there was an early TCP error for a CONNECT request or the peer reset with CONNECT_ERROR - ConnectError, - // Received payload did not conform to HTTP protocol. - ProtocolError, - // If the stream was locally reset by the Overload Manager. - OverloadManager -}; - /** * Callbacks that fire against a stream. */ @@ -319,10 +294,8 @@ class StreamCallbacks { /** * An HTTP stream (request, response, and push). */ -class Stream { +class Stream : public StreamResetHandler { public: - virtual ~Stream() = default; - /** * Add stream callbacks. * @param callbacks supplies the callbacks to fire on stream events. @@ -335,12 +308,6 @@ class Stream { */ virtual void removeCallbacks(StreamCallbacks& callbacks) PURE; - /** - * Reset the stream. No events will fire beyond this point. - * @param reason supplies the reset reason. - */ - virtual void resetStream(StreamResetReason reason) PURE; - /** * Enable/disable further data from this stream. * Cessation of data may not be immediate. For example, for HTTP/2 this may stop further flow diff --git a/envoy/http/stream_reset_handler.h b/envoy/http/stream_reset_handler.h new file mode 100644 index 0000000000000..7a6d23c5dac09 --- /dev/null +++ b/envoy/http/stream_reset_handler.h @@ -0,0 +1,50 @@ +#pragma once + +#include "envoy/common/pure.h" + +// Stream Reset is refactored from the codec to avoid cyclical dependencies with +// the BufferMemoryAccount interface. +namespace Envoy { +namespace Http { + +/** + * Stream reset reasons. + */ +enum class StreamResetReason { + // If a local codec level reset was sent on the stream. + LocalReset, + // If a local codec level refused stream reset was sent on the stream (allowing for retry). + LocalRefusedStreamReset, + // If a remote codec level reset was received on the stream. + RemoteReset, + // If a remote codec level refused stream reset was received on the stream (allowing for retry). + RemoteRefusedStreamReset, + // If the stream was locally reset by a connection pool due to an initial connection failure. + ConnectionFailure, + // If the stream was locally reset due to connection termination. + ConnectionTermination, + // The stream was reset because of a resource overflow. + Overflow, + // Either there was an early TCP error for a CONNECT request or the peer reset with CONNECT_ERROR + ConnectError, + // Received payload did not conform to HTTP protocol. + ProtocolError, + // If the stream was locally reset by the Overload Manager. + OverloadManager +}; + +/** + * Handler to reset an underlying HTTP stream. + */ +class StreamResetHandler { +public: + virtual ~StreamResetHandler() = default; + /** + * Reset the stream. No events will fire beyond this point. + * @param reason supplies the reset reason. + */ + virtual void resetStream(StreamResetReason reason) PURE; +}; + +} // namespace Http +} // namespace Envoy diff --git a/source/common/buffer/BUILD b/source/common/buffer/BUILD index 832d72c913128..9d531f683cf24 100644 --- a/source/common/buffer/BUILD +++ b/source/common/buffer/BUILD @@ -13,6 +13,7 @@ envoy_cc_library( srcs = ["watermark_buffer.cc"], hdrs = ["watermark_buffer.h"], deps = [ + "//envoy/http:stream_reset_handler_interface", "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", "//source/common/runtime:runtime_features_lib", diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 8f7977f9c269c..12152a9273021 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -7,6 +7,7 @@ #include #include "envoy/buffer/buffer.h" +#include "envoy/http/stream_reset_handler.h" #include "source/common/common/assert.h" #include "source/common/common/non_copyable.h" @@ -842,39 +843,5 @@ class OwnedBufferFragmentImpl final : public BufferFragment, public InlineStorag using OwnedBufferFragmentImplPtr = std::unique_ptr; -/** - * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and - * slices that originate from those buffers, or are untagged and pass through an - * associated buffer. - */ -class BufferMemoryAccountImpl : public BufferMemoryAccount { -public: - BufferMemoryAccountImpl() = default; - ~BufferMemoryAccountImpl() override { ASSERT(buffer_memory_allocated_ == 0); } - - // Make not copyable - BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete; - BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete; - - // Make not movable. - BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete; - BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete; - - uint64_t balance() const { return buffer_memory_allocated_; } - void charge(uint64_t amount) override { - // Check overflow - ASSERT(std::numeric_limits::max() - buffer_memory_allocated_ >= amount); - buffer_memory_allocated_ += amount; - } - - void credit(uint64_t amount) override { - ASSERT(buffer_memory_allocated_ >= amount); - buffer_memory_allocated_ -= amount; - } - -private: - uint64_t buffer_memory_allocated_ = 0; -}; - } // namespace Buffer } // namespace Envoy diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 781321f99dc44..1616afb542e0b 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -1,5 +1,9 @@ #include "source/common/buffer/watermark_buffer.h" +#include + +#include "envoy/buffer/buffer.h" + #include "source/common/common/assert.h" #include "source/common/runtime/runtime_features.h" @@ -136,5 +140,100 @@ void WatermarkBuffer::checkHighAndOverflowWatermarks() { } } +BufferMemoryAccountSharedPtr +WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) { + return BufferMemoryAccountImpl::createAccount(this, reset_handler); +} + +void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account, + int current_class, int new_class) { + ASSERT(current_class != new_class, "Expected the current_class and new_class to be different"); + + if (current_class == -1 && new_class >= 0) { + // Start tracking + ASSERT(!size_class_account_sets_[new_class].contains(account)); + size_class_account_sets_[new_class].insert(account); + } else if (current_class >= 0 && new_class == -1) { + // No longer track + ASSERT(size_class_account_sets_[current_class].contains(account)); + size_class_account_sets_[current_class].erase(account); + } else { + // Moving between buckets + ASSERT(size_class_account_sets_[current_class].contains(account)); + ASSERT(!size_class_account_sets_[new_class].contains(account)); + size_class_account_sets_[new_class].insert( + std::move(size_class_account_sets_[current_class].extract(account).value())); + } +} + +void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, + int current_class) { + if (current_class >= 0) { + ASSERT(size_class_account_sets_[current_class].contains(account)); + size_class_account_sets_[current_class].erase(account); + } +} + +WatermarkBufferFactory::~WatermarkBufferFactory() { + for (auto& account_set : size_class_account_sets_) { + ASSERT(account_set.empty(), + "Expected all Accounts to have unregistered from the Watermark Factory."); + } +} + +BufferMemoryAccountSharedPtr +BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory, + Http::StreamResetHandler& reset_handler) { + // We use shared_ptr ctor directly rather than make shared since the + // constructor being invoked is private as we want users to use this static + // method to createAccounts. + auto account = + std::shared_ptr(new BufferMemoryAccountImpl(factory, reset_handler)); + // Set shared_this_ in the account. + static_cast(account.get())->shared_this_ = account; + return account; +} + +int BufferMemoryAccountImpl::balanceToClassIndex() { + const uint64_t shifted_balance = buffer_memory_allocated_ >> 20; // shift by 1MB. + + if (shifted_balance == 0) { + return -1; // Not worth tracking anything < 1MB. + } + + const int class_idx = absl::bit_width(shifted_balance) - 1; + return std::min(class_idx, NUM_MEMORY_CLASSES_ - 1); +} + +void BufferMemoryAccountImpl::updateAccountClass() { + const int new_class = balanceToClassIndex(); + if (shared_this_ && new_class != current_bucket_idx_) { + factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class); + current_bucket_idx_ = new_class; + } +} + +void BufferMemoryAccountImpl::credit(uint64_t amount) { + ASSERT(buffer_memory_allocated_ >= amount); + buffer_memory_allocated_ -= amount; + updateAccountClass(); +} + +void BufferMemoryAccountImpl::charge(uint64_t amount) { + // Check overflow + ASSERT(std::numeric_limits::max() - buffer_memory_allocated_ >= amount); + buffer_memory_allocated_ += amount; + updateAccountClass(); +} + +void BufferMemoryAccountImpl::clearDownstream() { + if (reset_handler_.has_value()) { + reset_handler_.reset(); + factory_->unregisterAccount(shared_this_, current_bucket_idx_); + current_bucket_idx_ = -1; + shared_this_ = nullptr; + } +} + } // namespace Buffer } // namespace Envoy diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 6afc4d8602323..4c874c202f637 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -3,6 +3,9 @@ #include #include +#include "envoy/buffer/buffer.h" +#include "envoy/common/optref.h" + #include "source/common/buffer/buffer_impl.h" namespace Envoy { @@ -72,15 +75,137 @@ class WatermarkBuffer : public OwnedImpl { using WatermarkBufferPtr = std::unique_ptr; +class WatermarkBufferFactory; + +/** + * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and + * slices that originate from those buffers, or are untagged and pass through an + * associated buffer. + * + * This BufferMemoryAccount is produced by the *WatermarkBufferFactory*. + */ +class BufferMemoryAccountImpl : public BufferMemoryAccount { +public: + // Used to create the account, and complete wiring with the factory + // and shared_this_. + static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory, + Http::StreamResetHandler& reset_handler); + ~BufferMemoryAccountImpl() override { + ASSERT(buffer_memory_allocated_ == 0); + ASSERT(!reset_handler_.has_value()); + } + + // Make not copyable + BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete; + BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete; + + // Make not movable. + BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete; + BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete; + + uint64_t balance() const { return buffer_memory_allocated_; } + void charge(uint64_t amount) override; + void credit(uint64_t amount) override; + + // Clear the associated downstream, preparing the account to be destroyed. + // This is idempotent. + void clearDownstream() override; + + void resetDownstream() override { + if (reset_handler_.has_value()) { + reset_handler_->resetStream(Http::StreamResetReason::OverloadManager); + } + } + + // The number of memory classes the Account expects to exists. See + // *WatermarkBufferFactory* for details on the memory classes. + static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8; + +private: + BufferMemoryAccountImpl(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler) + : factory_(factory), reset_handler_(reset_handler) {} + + // Returns the class index based off of the buffer_memory_allocated_ + // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was + // just modified. + // The class indexes returned are based on buckets of powers of two, if the + // account is above a minimum threshold. Returned class index range is [-1, + // NUM_MEMORY_CLASSES_). + int balanceToClassIndex(); + void updateAccountClass(); + + uint64_t buffer_memory_allocated_ = 0; + // Current bucket index where the account is being tracked in. + int current_bucket_idx_ = -1; + + WatermarkBufferFactory* factory_ = nullptr; + + OptRef reset_handler_; + // Keep a copy of the shared_ptr pointing to this account. We opted to go this + // route rather than enable_shared_from_this to avoid wasteful atomic + // operations e.g. when updating the tracking of the account. + // This is set through the createAccount static method which is the only way to + // instantiate an instance of this class. This should is cleared when + // unregistering from the factory. + BufferMemoryAccountSharedPtr shared_this_ = nullptr; +}; + +/** + * The WatermarkBufferFactory creates *WatermarkBuffer*s and + * *BufferMemoryAccountImpl* that can be used to bind to the created buffers + * from a given downstream (and corresponding upstream, if one exists). The + * accounts can then be used to reset the underlying stream. + * + * Any account produced by this factory might be tracked by the factory using the + * following scheme: + * + * 1) Is the account balance >= 1MB? If not don't track. + * 2) For all accounts above the minimum threshold for tracking, put the account + * into one of the *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. + * + * We keep buckets containing accounts within a "memory class", which are + * power of two buckets. For example, with a minimum threshold of 1MB, our + * first bucket contains [1MB, 2MB) accounts, the second bucket contains + * [2MB, 4MB), and so forth for + * *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. These buckets + * allow us to coarsely track accounts, and if overloaded we can easily + * target more expensive streams. + * + * As the account balance changes, the account informs the Watermark Factory + * if the bucket for that account has changed. See + * *BufferMemoryAccountImpl::balanceToClassIndex()* for details on the memory + * class for a given account balance. + * + * TODO(kbaichoo): Update this documentation when we make the minimum account + * threshold configurable. + * + */ class WatermarkBufferFactory : public WatermarkFactory { public: // Buffer::WatermarkFactory + ~WatermarkBufferFactory() override; InstancePtr createBuffer(std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) override { return std::make_unique(below_low_watermark, above_high_watermark, above_overflow_watermark); } + + BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; + + // Called by BufferMemoryAccountImpls created by the factory on account class + // updated. + void updateAccountClass(const BufferMemoryAccountSharedPtr& account, int current_class, + int new_class); + + // Unregister a buffer memory account. + virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class); + +protected: + // Enable subclasses to inspect the mapping. + using MemoryClassesToAccountsSet = std::array, + BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; + MemoryClassesToAccountsSet size_class_account_sets_; }; } // namespace Buffer diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 5641c155dcead..3321e1126726b 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -276,13 +276,15 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod // Set the account to start accounting if enabled. This is still a // work-in-progress, and will be removed when other features using the // accounting are implemented. - Buffer::BufferMemoryAccountSharedPtr downstream_request_account; + Buffer::BufferMemoryAccountSharedPtr downstream_stream_account; if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) { - downstream_request_account = std::make_shared(); - response_encoder.getStream().setAccount(downstream_request_account); + // Create account, wiring the stream to use it. + auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory(); + downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream()); + response_encoder.getStream().setAccount(downstream_stream_account); } ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), - std::move(downstream_request_account))); + std::move(downstream_stream_account))); accumulated_requests_++; if (config_.maxRequestsPerConnection() > 0 && @@ -1517,6 +1519,7 @@ void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_ // 1) We TX an app level reset // 2) The codec TX a codec level reset // 3) The codec RX a reset + // 4) The overload manager reset the stream // If we need to differentiate we need to do it inside the codec. Can start with this. ENVOY_STREAM_LOG(debug, "stream reset", *this); connection_manager_.stats_.named_.downstream_rq_rx_reset_.inc(); @@ -1531,6 +1534,14 @@ void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_ filter_manager_.streamInfo().setResponseCodeDetails(encoder_details); } + // Check if we're in the overload manager reset case. + // encoder_details should be empty in this case as we don't have a codec error. + if (encoder_details.empty() && reset_reason == StreamResetReason::OverloadManager) { + filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::OverloadManager); + filter_manager_.streamInfo().setResponseCodeDetails( + StreamInfo::ResponseCodeDetails::get().Overload); + } + connection_manager_.doDeferredStreamDestroy(*this); } diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index cb0af92e83bc9..afd530d9df75d 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -350,6 +350,21 @@ void StreamEncoderImpl::resetStream(StreamResetReason reason) { connection_.onResetStreamBase(reason); } +void ResponseEncoderImpl::resetStream(StreamResetReason reason) { + // Clear the downstream on the account since we're resetting the downstream. + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + + // For H1, we use idleTimeouts to cancel streams unless there was an + // explicit protocol error prior to sending a response to the downstream + // in which case we send a local reply. + // TODO(kbaichoo): If we want snappier resets of H1 streams we can + // 1) Send local reply if no response data sent yet + // 2) Invoke the idle timeout sooner to close underlying connection + StreamEncoderImpl::resetStream(reason); +} + void StreamEncoderImpl::readDisable(bool disable) { if (disable) { ++read_disable_calls_; diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index d67412706a712..b6f32a16d5f6b 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -69,8 +69,12 @@ class StreamEncoderImpl : public virtual StreamEncoder, // require a flush timeout not already covered by other timeouts. } - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - // TODO(kbaichoo): implement account tracking for H1. + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { + // TODO(kbaichoo): implement account tracking for H1. Particularly, binding + // the account to the buffers used. The current wiring is minimal, and used + // to ensure the memory_account gets notified that the downstream request is + // closing. + buffer_memory_account_ = account; } void setIsResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; } @@ -88,6 +92,7 @@ class StreamEncoderImpl : public virtual StreamEncoder, static const std::string CRLF; static const std::string LAST_CHUNK; + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; ConnectionImpl& connection_; uint32_t read_disable_calls_{}; bool disable_chunk_encoding_ : 1; @@ -134,6 +139,18 @@ class ResponseEncoderImpl : public StreamEncoderImpl, public ResponseEncoder { : StreamEncoderImpl(connection), stream_error_on_invalid_http_message_(stream_error_on_invalid_http_message) {} + ~ResponseEncoderImpl() override { + // Only the downstream stream should clear the downstream of the + // memory account. + // + // There are cases where a corresponding upstream stream dtor might + // be called, but the downstream stream isn't going to terminate soon + // such as StreamDecoderFilterCallbacks::recreateStream(). + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + } + bool startedResponse() { return started_response_; } // Http::ResponseEncoder @@ -145,6 +162,9 @@ class ResponseEncoderImpl : public StreamEncoderImpl, public ResponseEncoder { return stream_error_on_invalid_http_message_; } + // Http1::StreamEncoderImpl + void resetStream(StreamResetReason reason) override; + private: bool started_response_{}; const bool stream_error_on_invalid_http_message_; diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 0ae50eb547d27..f782d64d5daf5 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -157,6 +157,22 @@ void ConnectionImpl::StreamImpl::destroy() { parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); } +void ConnectionImpl::ServerStreamImpl::destroy() { + // Only the downstream stream should clear the downstream of the + // memory account. + // This occurs in destroy as we want to ensure the Stream does not get + // reset called on it from the account. + // + // There are cases where a corresponding upstream stream dtor might + // be called, but the downstream stream isn't going to terminate soon + // such as StreamDecoderFilterCallbacks::recreateStream(). + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + + StreamImpl::destroy(); +} + static void insertHeader(std::vector& headers, const HeaderEntry& header) { uint8_t flags = 0; if (header.key().isReference()) { @@ -530,6 +546,15 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e } } +void ConnectionImpl::ServerStreamImpl::resetStream(StreamResetReason reason) { + // Clear the downstream on the account since we're resetting the downstream. + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + + StreamImpl::resetStream(reason); +} + void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) { // Higher layers expect calling resetStream() to immediately raise reset callbacks. runResetCallbacks(reason); diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 6b86ee41b9aa2..95758a1a91193 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -192,7 +192,7 @@ class ConnectionImpl : public virtual Connection, // TODO(mattklein123): Optimally this would be done in the destructor but there are currently // deferred delete lifetime issues that need sorting out if the destructor of the stream is // going to be able to refer to the parent connection. - void destroy(); + virtual void destroy(); void disarmStreamIdleTimer() { if (stream_idle_timer_ != nullptr) { // To ease testing and the destructor assertion. @@ -388,6 +388,7 @@ class ConnectionImpl : public virtual Connection, : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()) {} // StreamImpl + void destroy() override; void submitHeaders(const std::vector& final_headers, nghttp2_data_provider* provider) override; StreamDecoder& decoder() override { return *request_decoder_; } @@ -407,6 +408,7 @@ class ConnectionImpl : public virtual Connection, return createHeaderMap(trailers); } void createPendingFlushTimer() override; + void resetStream(StreamResetReason reason) override; // ResponseEncoder void encode100ContinueHeaders(const ResponseHeaderMap& headers) override; diff --git a/source/common/quic/envoy_quic_client_stream.h b/source/common/quic/envoy_quic_client_stream.h index 0e82d8622319e..fd051332e44fb 100644 --- a/source/common/quic/envoy_quic_client_stream.h +++ b/source/common/quic/envoy_quic_client_stream.h @@ -49,9 +49,6 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, void resetStream(Http::StreamResetReason reason) override; void setFlushTimeout(std::chrono::milliseconds) override {} - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - // TODO(kbaichoo): implement account tracking for QUIC. - } // quic::QuicSpdyStream void OnBodyAvailable() override; void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; diff --git a/source/common/quic/envoy_quic_server_stream.cc b/source/common/quic/envoy_quic_server_stream.cc index 86ac524b5e660..5dbf8f47351e7 100644 --- a/source/common/quic/envoy_quic_server_stream.cc +++ b/source/common/quic/envoy_quic_server_stream.cc @@ -111,6 +111,10 @@ void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*meta } void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) { + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + if (local_end_stream_ && !reading_stopped()) { // This is after 200 early response. Reset with QUIC_STREAM_NO_ERROR instead // of propagating original reset reason. In QUICHE if a stream stops reading @@ -298,6 +302,21 @@ void EnvoyQuicServerStream::OnConnectionClosed(quic::QuicErrorCode error, quic::QuicSpdyServerStreamBase::OnConnectionClosed(error, source); } +void EnvoyQuicServerStream::CloseWriteSide() { + // Clear the downstream since the stream should not write additional data + // after this is called, e.g. cannot reset the stream. + // Only the downstream stream should clear the downstream of the + // memory account. + // + // There are cases where a corresponding upstream stream dtor might + // be called, but the downstream stream isn't going to terminate soon + // such as StreamDecoderFilterCallbacks::recreateStream(). + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + quic::QuicSpdyServerStreamBase::CloseWriteSide(); +} + void EnvoyQuicServerStream::OnClose() { quic::QuicSpdyServerStreamBase::OnClose(); if (isDoingWatermarkAccounting()) { diff --git a/source/common/quic/envoy_quic_server_stream.h b/source/common/quic/envoy_quic_server_stream.h index e0b98e835fdad..3cb8d1f004145 100644 --- a/source/common/quic/envoy_quic_server_stream.h +++ b/source/common/quic/envoy_quic_server_stream.h @@ -55,9 +55,6 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. } - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - // TODO(kbaichoo): implement account tracking for QUIC. - } // quic::QuicSpdyStream void OnBodyAvailable() override; bool OnStopSending(quic::QuicRstStreamErrorCode error) override; @@ -67,6 +64,7 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, void OnCanWrite() override; // quic::QuicSpdyServerStreamBase void OnConnectionClosed(quic::QuicErrorCode error, quic::ConnectionCloseSource source) override; + void CloseWriteSide() override; void clearWatermarkBuffer(); diff --git a/source/common/quic/envoy_quic_stream.h b/source/common/quic/envoy_quic_stream.h index f28cae4cf0119..f5c4be6900dd0 100644 --- a/source/common/quic/envoy_quic_stream.h +++ b/source/common/quic/envoy_quic_stream.h @@ -1,5 +1,6 @@ #pragma once +#include "envoy/buffer/buffer.h" #include "envoy/config/core/v3/protocol.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/http/codec.h" @@ -79,6 +80,10 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder, return connection()->addressProvider().localAddress(); } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { + buffer_memory_account_ = account; + } + // SendBufferMonitor void updateBytesBuffered(size_t old_buffered_bytes, size_t new_buffered_bytes) override { if (new_buffered_bytes == old_buffered_bytes) { @@ -130,6 +135,9 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder, const envoy::config::core::v3::Http3ProtocolOptions& http3_options_; bool close_connection_upon_invalid_header_{false}; absl::string_view details_; + // TODO(kbaichoo): bind the account to the QUIC buffers to enable tracking of + // memory allocated within QUIC buffers. + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_ = nullptr; private: // Keeps track of bytes buffered in the stream send buffer in QUICHE and reacts diff --git a/source/common/quic/envoy_quic_utils.cc b/source/common/quic/envoy_quic_utils.cc index 0a42e22cbbe73..76291b3a161db 100644 --- a/source/common/quic/envoy_quic_utils.cc +++ b/source/common/quic/envoy_quic_utils.cc @@ -73,6 +73,7 @@ quic::QuicRstStreamErrorCode envoyResetReasonToQuicRstError(Http::StreamResetRea case Http::StreamResetReason::ConnectionTermination: return quic::QUIC_STREAM_CONNECTION_ERROR; case Http::StreamResetReason::LocalReset: + case Http::StreamResetReason::OverloadManager: return quic::QUIC_STREAM_CANCELLED; default: return quic::QUIC_BAD_APPLICATION_PAYLOAD; diff --git a/test/common/buffer/BUILD b/test/common/buffer/BUILD index bd01534ca6cae..80bad25f619a7 100644 --- a/test/common/buffer/BUILD +++ b/test/common/buffer/BUILD @@ -83,6 +83,17 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "buffer_memory_account_test", + srcs = ["buffer_memory_account_test.cc"], + deps = [ + "//source/common/buffer:buffer_lib", + "//test/integration:tracked_watermark_buffer_lib", + "//test/mocks/buffer:buffer_mocks", + "//test/mocks/http:stream_reset_handler_mock", + ], +) + envoy_cc_test( name = "zero_copy_input_stream_test", srcs = ["zero_copy_input_stream_test.cc"], diff --git a/test/common/buffer/buffer_memory_account_test.cc b/test/common/buffer/buffer_memory_account_test.cc new file mode 100644 index 0000000000000..62f16f14d1b0c --- /dev/null +++ b/test/common/buffer/buffer_memory_account_test.cc @@ -0,0 +1,481 @@ +#include "envoy/http/codec.h" + +#include "source/common/buffer/buffer_impl.h" + +#include "test/integration/tracked_watermark_buffer.h" +#include "test/mocks/http/stream_reset_handler.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Buffer { +namespace { + +using testing::_; + +using MemoryClassesToAccountsSet = std::array, + BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; + +constexpr uint64_t kMinimumBalanceToTrack = 1024 * 1024; +constexpr uint64_t kThresholdForFinalBucket = 128 * 1024 * 1024; + +// Gets the balance of an account assuming it's a BufferMemoryAccountImpl. +static int getBalance(const BufferMemoryAccountSharedPtr& account) { + return static_cast(account.get())->balance(); +} + +// Check the memory_classes_to_account is empty. +static void noAccountsTracked(MemoryClassesToAccountsSet& memory_classes_to_account) { + for (const auto& set : memory_classes_to_account) { + EXPECT_TRUE(set.empty()); + } +} + +class BufferMemoryAccountTest : public testing::Test { +protected: + TrackedWatermarkBufferFactory factory_; + Http::MockStreamResetHandler mock_reset_handler_; +}; + +TEST_F(BufferMemoryAccountTest, ManagesAccountBalance) { + auto account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer(account); + ASSERT_EQ(getBalance(account), 0); + + // Check the balance increases as expected. + { + // New slice created + buffer.add("Hello"); + EXPECT_EQ(getBalance(account), 4096); + + // Should just be added to existing slice. + buffer.add(" World!"); + EXPECT_EQ(getBalance(account), 4096); + + // Trigger new slice creation with add. + const std::string long_string(4096, 'a'); + buffer.add(long_string); + EXPECT_EQ(getBalance(account), 8192); + + // AppendForTest also adds new slice. + buffer.appendSliceForTest("Extra Slice"); + EXPECT_EQ(getBalance(account), 12288); + } + + // Check the balance drains as slices are consumed. + { + // Shouldn't trigger slice free yet + buffer.drain(4095); + EXPECT_EQ(getBalance(account), 12288); + + // Trigger slice reclaim. + buffer.drain(1); + EXPECT_EQ(getBalance(account), 8192); + + // Reclaim next slice + buffer.drain(std::string("Hello World!").length()); + EXPECT_EQ(getBalance(account), 4096); + + // Reclaim remaining + buffer.drain(std::string("Extra Slice").length()); + EXPECT_EQ(getBalance(account), 0); + } + + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, BufferAccountsForUnownedSliceMovedInto) { + auto account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl accounted_buffer(account); + + Buffer::OwnedImpl unowned_buffer; + unowned_buffer.add("Unaccounted Slice"); + ASSERT_EQ(getBalance(account), 0); + + // Transfer over buffer + accounted_buffer.move(unowned_buffer); + EXPECT_EQ(getBalance(account), 4096); + + accounted_buffer.drain(accounted_buffer.length()); + EXPECT_EQ(getBalance(account), 0); + + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, BufferFragmentsShouldNotHaveAnAssociatedAccount) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + const char data[] = "hello world"; + BufferFragmentImpl frag(data, 11, nullptr); + buffer_one.addBufferFragment(frag); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(buffer_one.length(), 11); + + // Transfer over buffer + buffer_two.move(buffer_one); + EXPECT_EQ(getBalance(buffer_two_account), 0); + EXPECT_EQ(buffer_two.length(), 11); + + buffer_two.drain(buffer_two.length()); + EXPECT_EQ(getBalance(buffer_two_account), 0); + EXPECT_EQ(buffer_two.length(), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, SliceRemainsAttachToOriginalAccountWhenMoved) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + buffer_one.add("Charged to Account One"); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + // Transfer over buffer, still tied to account one. + buffer_two.move(buffer_one); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_two.drain(buffer_two.length()); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, + SliceRemainsAttachToOriginalAccountWhenMovedUnlessCoalescedIntoExistingSlice) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + buffer_one.add("Will Coalesce"); + buffer_two.add("To be Coalesce into:"); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + // Transfer over buffer, slices coalesce, crediting account one. + buffer_two.move(buffer_one); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + buffer_two.drain(std::string("To be Coalesce into:Will Coalesce").length()); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, SliceCanRemainAttachedToOriginalAccountWhenMovedAndCoalescedInto) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + auto buffer_three_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_three(buffer_three_account); + ASSERT_EQ(getBalance(buffer_three_account), 0); + + buffer_one.add("Will Coalesce"); + buffer_two.add("To be Coalesce into:"); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + // Transfer buffers, leading to slice coalescing in third buffer. + buffer_three.move(buffer_two); + buffer_three.move(buffer_one); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + EXPECT_EQ(getBalance(buffer_three_account), 0); + + buffer_three.drain(std::string("To be Coalesce into:Will Coalesce").length()); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); + buffer_three_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, LinearizedBufferShouldChargeItsAssociatedAccount) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + auto buffer_three_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_three(buffer_three_account); + ASSERT_EQ(getBalance(buffer_three_account), 0); + + const std::string long_string(4096, 'a'); + buffer_one.add(long_string); + buffer_two.add(long_string); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + // Move into the third buffer. + buffer_three.move(buffer_one); + buffer_three.move(buffer_two); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + EXPECT_EQ(getBalance(buffer_three_account), 0); + + // Linearize, which does a copy out of the slices. + buffer_three.linearize(8192); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 0); + EXPECT_EQ(getBalance(buffer_three_account), 8192); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); + buffer_three_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ManagesAccountBalanceWhenPrepending) { + auto prepend_to_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_to_prepend_to(prepend_to_account); + ASSERT_EQ(getBalance(prepend_to_account), 0); + + auto prepend_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_to_prepend(prepend_account); + ASSERT_EQ(getBalance(prepend_account), 0); + + Buffer::OwnedImpl unowned_buffer_to_prepend; + + unowned_buffer_to_prepend.add("World"); + buffer_to_prepend.add("Goodbye World"); + EXPECT_EQ(getBalance(prepend_account), 4096); + + // Prepend the buffers. + buffer_to_prepend_to.prepend(buffer_to_prepend); + EXPECT_EQ(getBalance(prepend_account), 4096); + EXPECT_EQ(getBalance(prepend_to_account), 0); + + buffer_to_prepend_to.prepend(unowned_buffer_to_prepend); + EXPECT_EQ(getBalance(prepend_to_account), 4096); + + // Prepend a string view. + buffer_to_prepend_to.prepend("Hello "); + EXPECT_EQ(getBalance(prepend_to_account), 8192); + + prepend_account->clearDownstream(); + prepend_to_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ExtractingSliceWithExistingStorageCreditsAccountOnce) { + auto buffer_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer(buffer_account); + ASSERT_EQ(getBalance(buffer_account), 0); + + buffer.appendSliceForTest("Slice 1"); + buffer.appendSliceForTest("Slice 2"); + EXPECT_EQ(getBalance(buffer_account), 8192); + + // Account should only be credited when slice is extracted. + // Not on slice dtor. + { + auto slice = buffer.extractMutableFrontSlice(); + EXPECT_EQ(getBalance(buffer_account), 4096); + } + + EXPECT_EQ(getBalance(buffer_account), 4096); + + buffer_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, NewReservationSlicesOnlyChargedAfterCommit) { + auto buffer_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer(buffer_account); + ASSERT_EQ(getBalance(buffer_account), 0); + + auto reservation = buffer.reserveForRead(); + EXPECT_EQ(getBalance(buffer_account), 0); + + // We should only be charged for the slices committed. + reservation.commit(16384); + EXPECT_EQ(getBalance(buffer_account), 16384); + + buffer_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ReservationShouldNotChargeForExistingSlice) { + auto buffer_account = factory_.createAccount(mock_reset_handler_); + + Buffer::OwnedImpl buffer(buffer_account); + ASSERT_EQ(getBalance(buffer_account), 0); + + buffer.add("Many bytes remaining in this slice to use for reservation."); + EXPECT_EQ(getBalance(buffer_account), 4096); + + // The account shouldn't be charged again at commit since the commit + // uses memory from the slice already charged for. + auto reservation = buffer.reserveForRead(); + reservation.commit(2000); + EXPECT_EQ(getBalance(buffer_account), 4096); + + buffer_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, AccountShouldNotBeTrackedByFactoryUnlessAboveMinimumBalance) { + auto account = factory_.createAccount(mock_reset_handler_); + + // Check not tracked + factory_.inspectMemoryClasses(noAccountsTracked); + + // Still below minimum + account->charge(2020); + factory_.inspectMemoryClasses(noAccountsTracked); + + account->charge(kMinimumBalanceToTrack); + + // Check now tracked + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ClearingDownstreamShouldUnregisterTrackedAccounts) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kMinimumBalanceToTrack); + + // Check tracked + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + account->clearDownstream(); + + // Check no longer tracked + factory_.inspectMemoryClasses(noAccountsTracked); + + account->credit(getBalance(account)); +} + +TEST_F(BufferMemoryAccountTest, AccountCanResetStream) { + auto account = factory_.createAccount(mock_reset_handler_); + + EXPECT_CALL(mock_reset_handler_, resetStream(_)); + account->resetDownstream(); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, FactoryTracksAccountCorrectlyAsBalanceIncreases) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kMinimumBalanceToTrack); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + for (size_t i = 0; i < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1; ++i) { + // Double the balance to enter the higher buckets. + account->charge(getBalance(account)); + factory_.inspectMemoryClasses([i](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[i].size(), 0); + EXPECT_EQ(memory_classes_to_account[i + 1].size(), 1); + }); + } + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, FactoryTracksAccountCorrectlyAsBalanceDecreases) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kThresholdForFinalBucket); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 1); + }); + + for (int i = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 2; i > 0; --i) { + // Halve the balance to enter the lower buckets. + account->credit(getBalance(account) / 2); + factory_.inspectMemoryClasses([i](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[i + 1].size(), 0); + EXPECT_EQ(memory_classes_to_account[i].size(), 1); + }); + } + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, SizeSaturatesInLargestBucket) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kThresholdForFinalBucket); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 1); + }); + + account->charge(getBalance(account)); + + // Remains in final bucket. + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 1); + }); + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, RemainsInSameBucketIfChangesWithinThreshold) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kMinimumBalanceToTrack); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + // Charge to see in same bucket. + account->charge(kMinimumBalanceToTrack - 1); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + // Credit to see in same bucket. + account->credit(kMinimumBalanceToTrack - 1); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +} // namespace +} // namespace Buffer +} // namespace Envoy diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index 55a877a01370e..854be0f4dd5e1 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1265,277 +1265,6 @@ TEST_F(OwnedImplTest, FrontSlice) { EXPECT_EQ(1, buffer.frontSlice().len_); } -TEST(BufferMemoryAccountTest, ManagesAccountBalance) { - auto account = std::make_shared(); - Buffer::OwnedImpl buffer(account); - ASSERT_EQ(account->balance(), 0); - - // Check the balance increases as expected. - { - // New slice created - buffer.add("Hello"); - EXPECT_EQ(account->balance(), 4096); - - // Should just be added to existing slice. - buffer.add(" World!"); - EXPECT_EQ(account->balance(), 4096); - - // Trigger new slice creation with add. - const std::string long_string(4096, 'a'); - buffer.add(long_string); - EXPECT_EQ(account->balance(), 8192); - - // AppendForTest also adds new slice. - buffer.appendSliceForTest("Extra Slice"); - EXPECT_EQ(account->balance(), 12288); - } - - // Check the balance drains as slices are consumed. - { - // Shouldn't trigger slice free yet - buffer.drain(4095); - EXPECT_EQ(account->balance(), 12288); - - // Trigger slice reclaim. - buffer.drain(1); - EXPECT_EQ(account->balance(), 8192); - - // Reclaim next slice - buffer.drain(std::string("Hello World!").length()); - EXPECT_EQ(account->balance(), 4096); - - // Reclaim remaining - buffer.drain(std::string("Extra Slice").length()); - EXPECT_EQ(account->balance(), 0); - } -} - -TEST(BufferMemoryAccountTest, BufferAccountsForUnownedSliceMovedInto) { - auto account = std::make_shared(); - Buffer::OwnedImpl accounted_buffer(account); - - Buffer::OwnedImpl unowned_buffer; - unowned_buffer.add("Unaccounted Slice"); - ASSERT_EQ(account->balance(), 0); - - // Transfer over buffer - accounted_buffer.move(unowned_buffer); - EXPECT_EQ(account->balance(), 4096); - - accounted_buffer.drain(accounted_buffer.length()); - EXPECT_EQ(account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, BufferFragmentsShouldNotHaveAnAssociatedAccount) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - const char data[] = "hello world"; - BufferFragmentImpl frag(data, 11, nullptr); - buffer_one.addBufferFragment(frag); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_one.length(), 11); - - // Transfer over buffer - buffer_two.move(buffer_one); - EXPECT_EQ(buffer_two_account->balance(), 0); - EXPECT_EQ(buffer_two.length(), 11); - - buffer_two.drain(buffer_two.length()); - EXPECT_EQ(buffer_two_account->balance(), 0); - EXPECT_EQ(buffer_two.length(), 0); -} - -TEST(BufferMemoryAccountTest, SliceRemainsAttachToOriginalAccountWhenMoved) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - buffer_one.add("Charged to Account One"); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 0); - - // Transfer over buffer, still tied to account one. - buffer_two.move(buffer_one); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 0); - - buffer_two.drain(buffer_two.length()); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, - SliceRemainsAttachToOriginalAccountWhenMovedUnlessCoalescedIntoExistingSlice) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - buffer_one.add("Will Coalesce"); - buffer_two.add("To be Coalesce into:"); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - // Transfer over buffer, slices coalesce, crediting account one. - buffer_two.move(buffer_one); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - buffer_two.drain(std::string("To be Coalesce into:Will Coalesce").length()); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, SliceCanRemainAttachedToOriginalAccountWhenMovedAndCoalescedInto) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - auto buffer_three_account = std::make_shared(); - Buffer::OwnedImpl buffer_three(buffer_three_account); - ASSERT_EQ(buffer_three_account->balance(), 0); - - buffer_one.add("Will Coalesce"); - buffer_two.add("To be Coalesce into:"); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - // Transfer buffers, leading to slice coalescing in third buffer. - buffer_three.move(buffer_two); - buffer_three.move(buffer_one); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 4096); - EXPECT_EQ(buffer_three_account->balance(), 0); - - buffer_three.drain(std::string("To be Coalesce into:Will Coalesce").length()); - EXPECT_EQ(buffer_two_account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, LinearizedBufferShouldChargeItsAssociatedAccount) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - auto buffer_three_account = std::make_shared(); - Buffer::OwnedImpl buffer_three(buffer_three_account); - ASSERT_EQ(buffer_three_account->balance(), 0); - - const std::string long_string(4096, 'a'); - buffer_one.add(long_string); - buffer_two.add(long_string); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - // Move into the third buffer. - buffer_three.move(buffer_one); - buffer_three.move(buffer_two); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - EXPECT_EQ(buffer_three_account->balance(), 0); - - // Linearize, which does a copy out of the slices. - buffer_three.linearize(8192); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 0); - EXPECT_EQ(buffer_three_account->balance(), 8192); -} - -TEST(BufferMemoryAccountTest, ManagesAccountBalanceWhenPrepending) { - auto prepend_to_account = std::make_shared(); - Buffer::OwnedImpl buffer_to_prepend_to(prepend_to_account); - ASSERT_EQ(prepend_to_account->balance(), 0); - - auto prepend_account = std::make_shared(); - Buffer::OwnedImpl buffer_to_prepend(prepend_account); - ASSERT_EQ(prepend_account->balance(), 0); - - Buffer::OwnedImpl unowned_buffer_to_prepend; - - unowned_buffer_to_prepend.add("World"); - buffer_to_prepend.add("Goodbye World"); - EXPECT_EQ(prepend_account->balance(), 4096); - - // Prepend the buffers. - buffer_to_prepend_to.prepend(buffer_to_prepend); - EXPECT_EQ(prepend_account->balance(), 4096); - EXPECT_EQ(prepend_to_account->balance(), 0); - - buffer_to_prepend_to.prepend(unowned_buffer_to_prepend); - EXPECT_EQ(prepend_to_account->balance(), 4096); - - // Prepend a string view. - buffer_to_prepend_to.prepend("Hello "); - EXPECT_EQ(prepend_to_account->balance(), 8192); -} - -TEST(BufferMemoryAccountTest, ExtractingSliceWithExistingStorageCreditsAccountOnce) { - auto buffer_account = std::make_shared(); - Buffer::OwnedImpl buffer(buffer_account); - ASSERT_EQ(buffer_account->balance(), 0); - - buffer.appendSliceForTest("Slice 1"); - buffer.appendSliceForTest("Slice 2"); - EXPECT_EQ(buffer_account->balance(), 8192); - - // Account should only be credited when slice is extracted. - // Not on slice dtor. - { - auto slice = buffer.extractMutableFrontSlice(); - EXPECT_EQ(buffer_account->balance(), 4096); - } - - EXPECT_EQ(buffer_account->balance(), 4096); -} - -TEST(BufferMemoryAccountTest, NewReservationSlicesOnlyChargedAfterCommit) { - auto buffer_account = std::make_shared(); - Buffer::OwnedImpl buffer(buffer_account); - ASSERT_EQ(buffer_account->balance(), 0); - - auto reservation = buffer.reserveForRead(); - EXPECT_EQ(buffer_account->balance(), 0); - - // We should only be charged for the slices committed. - reservation.commit(16384); - EXPECT_EQ(buffer_account->balance(), 16384); -} - -TEST(BufferMemoryAccountTest, ReservationShouldNotChargeForExistingSlice) { - auto buffer_account = std::make_shared(); - Buffer::OwnedImpl buffer(buffer_account); - ASSERT_EQ(buffer_account->balance(), 0); - - buffer.add("Many bytes remaining in this slice to use for reservation."); - EXPECT_EQ(buffer_account->balance(), 4096); - - // The account shouldn't be charged again at commit since the commit - // uses memory from the slice already charged for. - auto reservation = buffer.reserveForRead(); - reservation.commit(2000); - EXPECT_EQ(buffer_account->balance(), 4096); -} - } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index f1fdda3435d77..aed12c5b25dda 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -384,6 +384,7 @@ envoy_cc_test( ], deps = [ ":http_integration_lib", + ":http_protocol_integration_lib", ":socket_interface_swap_lib", ":tracked_watermark_buffer_lib", "//test/mocks/http:http_mocks", @@ -1348,6 +1349,7 @@ envoy_cc_test( srcs = ["tracked_watermark_buffer_test.cc"], deps = [ ":tracked_watermark_buffer_lib", + "//test/mocks/http:stream_reset_handler_mock", "//test/test_common:test_runtime_lib", ], ) diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index b11c916e568be..4a345011d08c8 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -8,6 +8,7 @@ #include "source/common/buffer/buffer_impl.h" #include "test/integration/autonomous_upstream.h" +#include "test/integration/http_protocol_integration.h" #include "test/integration/tracked_watermark_buffer.h" #include "test/integration/utility.h" #include "test/mocks/http/mocks.h" @@ -20,20 +21,48 @@ namespace Envoy { namespace { -std::string ipVersionAndBufferAccountingTestParamsToString( - const ::testing::TestParamInfo>& params) { - return fmt::format( - "{}_{}", - TestUtility::ipTestParamsToString(::testing::TestParamInfo( - std::get<0>(params.param), params.index)), - std::get<1>(params.param) ? "with_per_stream_buffer_accounting" - : "without_per_stream_buffer_accounting"); +std::string protocolTestParamsAndBoolToString( + const ::testing::TestParamInfo>& params) { + return fmt::format("{}_{}", + HttpProtocolIntegrationTest::protocolTestParamsToString( + ::testing::TestParamInfo(std::get<0>(params.param), + /*an_index=*/0)), + std::get<1>(params.param) ? "with_per_stream_buffer_accounting" + : "without_per_stream_buffer_accounting"); +} + +void runOnWorkerThreadsAndWaitforCompletion(Server::Instance& server, std::function func) { + absl::Notification done_notification; + ThreadLocal::TypedSlotPtr<> slot; + Envoy::Thread::ThreadId main_tid; + server.dispatcher().post([&] { + slot = ThreadLocal::TypedSlot<>::makeUnique(server.threadLocal()); + slot->set( + [](Envoy::Event::Dispatcher&) -> std::shared_ptr { + return nullptr; + }); + + main_tid = server.api().threadFactory().currentThreadId(); + + slot->runOnAllThreads( + [main_tid, &server, &func](OptRef) { + // Run on the worker thread. + if (server.api().threadFactory().currentThreadId() != main_tid) { + func(); + } + }, + [&slot, &done_notification] { + slot.reset(nullptr); + done_notification.Notify(); + }); + }); + done_notification.WaitForNotification(); } } // namespace -class HttpBufferWatermarksTest +class Http2BufferWatermarksTest : public SocketInterfaceSwap, - public testing::TestWithParam>, + public testing::TestWithParam>, public HttpIntegrationTest { public: std::vector @@ -54,15 +83,16 @@ class HttpBufferWatermarksTest return responses; } - // TODO(kbaichoo): Parameterize on the client codec type when other protocols - // (H1, H3) support buffer accounting. - HttpBufferWatermarksTest() - : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) { + Http2BufferWatermarksTest() + : HttpIntegrationTest( + std::get<0>(GetParam()).downstream_protocol, std::get<0>(GetParam()).version, + ConfigHelper::httpProxyConfig( + /*downstream_is_quic=*/std::get<0>(GetParam()).downstream_protocol == + Http::CodecType::HTTP3)) { config_helper_.addRuntimeOverride("envoy.test_only.per_stream_buffer_accounting", streamBufferAccounting() ? "true" : "false"); setServerBufferFactory(buffer_factory_); - setDownstreamProtocol(Http::CodecClient::Type::HTTP2); - setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); + setUpstreamProtocol(std::get<0>(GetParam()).upstream_protocol); } protected: @@ -91,14 +121,20 @@ class HttpBufferWatermarksTest } }; +// Run the tests using HTTP2 only since its the only protocol that's fully +// supported. +// TODO(kbaichoo): Instantiate with H3 and H1 as well when their buffers are +// bounded to accounts. INSTANTIATE_TEST_SUITE_P( - IpVersions, HttpBufferWatermarksTest, - testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), - ipVersionAndBufferAccountingTestParamsToString); + IpVersions, Http2BufferWatermarksTest, + testing::Combine(testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP2}, {FakeHttpConnection::Type::HTTP2})), + testing::Bool()), + protocolTestParamsAndBoolToString); // We should create four buffers each billing the same downstream request's // account which originated the chain. -TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { +TEST_P(Http2BufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { FakeStreamPtr upstream_request1; FakeStreamPtr upstream_request2; default_request_headers_.setContentLength(1000); @@ -153,7 +189,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } -TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { +TEST_P(Http2BufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { const int num_requests = 5; const uint32_t request_body_size = 4096; const uint32_t response_body_size = 4096; @@ -188,7 +224,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { } } -TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { +TEST_P(Http2BufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { const int num_requests = 5; const uint32_t request_body_size = 4096; const uint32_t response_body_size = 16384; @@ -224,4 +260,135 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { } } +// Focuses on tests using the various codec. Currently, the accounting is only +// fully wired through with H2, but it's important to test that H1 and H3 end +// up notifying the BufferMemoryAccount when the dtor of the downstream stream +// occurs. +class ProtocolsBufferWatermarksTest + : public testing::TestWithParam>, + public HttpIntegrationTest { +public: + ProtocolsBufferWatermarksTest() + : HttpIntegrationTest( + std::get<0>(GetParam()).downstream_protocol, std::get<0>(GetParam()).version, + ConfigHelper::httpProxyConfig( + /*downstream_is_quic=*/std::get<0>(GetParam()).downstream_protocol == + Http::CodecType::HTTP3)) { + config_helper_.addRuntimeOverride("envoy.test_only.per_stream_buffer_accounting", + streamBufferAccounting() ? "true" : "false"); + setServerBufferFactory(buffer_factory_); + setUpstreamProtocol(std::get<0>(GetParam()).upstream_protocol); + } + +protected: + std::shared_ptr buffer_factory_ = + std::make_shared(); + + bool streamBufferAccounting() { return std::get<1>(GetParam()); } +}; + +INSTANTIATE_TEST_SUITE_P( + IpVersions, ProtocolsBufferWatermarksTest, + testing::Combine(testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP1, Http::CodecType::HTTP2, Http::CodecType::HTTP3}, + {FakeHttpConnection::Type::HTTP2})), + testing::Bool()), + protocolTestParamsAndBoolToString); + +TEST_P(ProtocolsBufferWatermarksTest, AccountShouldBeRegisteredAndUnregisteredOnce) { + FakeStreamPtr upstream_request1; + default_request_headers_.setContentLength(1000); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends the first request. + auto response1 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request1 = std::move(upstream_request_); + + if (streamBufferAccounting()) { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1); + } else { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0); + } + + upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request1->encodeData(1000, true); + ASSERT_TRUE(response1->waitForEndStream()); + ASSERT_TRUE(upstream_request1->complete()); + + // Check single call to unregister if stream account, 0 otherwise + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)); + } else { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0)); + } +} + +TEST_P(ProtocolsBufferWatermarksTest, ResettingStreamUnregistersAccount) { + FakeStreamPtr upstream_request1; + default_request_headers_.setContentLength(1000); + // H1 on RST ends up leveraging idle timeout if no active stream on the + // connection. + config_helper_.setDownstreamHttpIdleTimeout(std::chrono::milliseconds(100)); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends the first request. + auto response1 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request1 = std::move(upstream_request_); + + if (streamBufferAccounting()) { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1); + } else { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0); + } + + if (streamBufferAccounting()) { + // Reset the downstream via the account interface on the worker thread. + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1); + Buffer::BufferMemoryAccountSharedPtr account; + auto& server = test_server_->server(); + + // Get access to the account. + buffer_factory_->inspectAccounts( + [&account](Buffer::TrackedWatermarkBufferFactory::AccountToBoundBuffersMap& map) { + for (auto& [acct, _] : map) { + account = acct; + } + }, + server); + + // Reset the stream from the worker. + runOnWorkerThreadsAndWaitforCompletion(server, [&account]() { account->resetDownstream(); }); + + if (std::get<0>(GetParam()).downstream_protocol == Http::CodecType::HTTP1) { + // For H1, we use idleTimeouts to cancel streams unless there was an + // explicit protocol error prior to sending a response to the downstream. + // Since that's not the case, the reset will fire twice, once due to + // overload manager, and once due to timeout which will close the + // connection. + ASSERT_TRUE(codec_client_->waitForDisconnect(std::chrono::milliseconds(10000))); + } else { + ASSERT_TRUE(response1->waitForReset()); + EXPECT_EQ(response1->resetReason(), Http::StreamResetReason::RemoteReset); + } + } else { + upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request1->encodeData(1000, true); + ASSERT_TRUE(response1->waitForEndStream()); + ASSERT_TRUE(upstream_request1->complete()); + } + + // Check single call to unregister if stream account, 0 otherwise + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)); + } else { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0)); + } +} + } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 1a14d888a1d52..79a5f99145f6c 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -74,6 +74,21 @@ TrackedWatermarkBufferFactory::createBuffer(std::function below_low_wate below_low_watermark, above_high_watermark, above_overflow_watermark); } +BufferMemoryAccountSharedPtr +TrackedWatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) { + auto account = WatermarkBufferFactory::createAccount(reset_handler); + absl::MutexLock lock(&mutex_); + ++total_accounts_created_; + return account; +} + +void TrackedWatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, + int current_class) { + WatermarkBufferFactory::unregisterAccount(account, current_class); + absl::MutexLock lock(&mutex_); + ++total_accounts_unregistered_; +} + uint64_t TrackedWatermarkBufferFactory::numBuffersCreated() const { absl::MutexLock lock(&mutex_); return buffer_infos_.size(); @@ -139,6 +154,21 @@ std::pair TrackedWatermarkBufferFactory::highWatermarkRange( return std::make_pair(min_watermark, max_watermark); } +uint64_t TrackedWatermarkBufferFactory::numAccountsCreated() const { + absl::MutexLock lock(&mutex_); + return total_accounts_created_; +} + +bool TrackedWatermarkBufferFactory::waitForExpectedAccountUnregistered( + uint64_t expected_accounts_unregistered, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, expected_accounts_unregistered]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + return expected_accounts_unregistered == total_accounts_unregistered_; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); +} + bool TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds( uint64_t byte_size, std::chrono::milliseconds timeout) { absl::MutexLock lock(&mutex_); @@ -196,6 +226,11 @@ void TrackedWatermarkBufferFactory::inspectAccounts( done_notification.WaitForNotification(); } +void TrackedWatermarkBufferFactory::inspectMemoryClasses( + std::function func) { + func(size_class_account_sets_); +} + void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size_per_account, uint32_t num_accounts) { absl::MutexLock lock(&mutex_); @@ -231,7 +266,7 @@ void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { // This is thread safe since this function should run on the only Envoy worker // thread. for (auto& acc : account_infos_) { - if (static_cast(acc.first.get())->balance() < + if (static_cast(acc.first.get())->balance() < expected_balances_->balance_per_account_) { return; } diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 999e955037fc6..99bc091956bb7 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -59,7 +59,7 @@ class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { }; // Factory that tracks how the created buffers are used. -class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { +class TrackedWatermarkBufferFactory : public WatermarkBufferFactory { public: TrackedWatermarkBufferFactory() = default; ~TrackedWatermarkBufferFactory() override; @@ -67,6 +67,8 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { Buffer::InstancePtr createBuffer(std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) override; + BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; + void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class) override; // Number of buffers created. uint64_t numBuffersCreated() const; @@ -82,6 +84,17 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // functionality is disabled. std::pair highWatermarkRange() const; + // Number of accounts created. + uint64_t numAccountsCreated() const; + + // Waits for the expected number of accounts unregistered. Unlike + // numAccountsCreated, there are no pre-existing hooks into Envoy when an + // account unregistered call occurs as it depends upon deferred delete. + // This creates the synchronization needed. + bool waitForExpectedAccountUnregistered( + uint64_t expected_accounts_unregistered, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + // Total bytes currently buffered across all known buffers. uint64_t totalBytesBuffered() const { absl::MutexLock lock(&mutex_); @@ -116,9 +129,17 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { using AccountToBoundBuffersMap = absl::flat_hash_map>; + // Used to inspect all accounts tied to any buffer created from this factory. void inspectAccounts(std::function func, Server::Instance& server); + // Used to inspect the memory class to accounts within that class structure. + // This differs from inspectAccounts as that has all accounts bounded to an + // active buffer, while this might not track certain accounts (e.g. below + // thresholds.) As implemented this is NOT thread-safe! + void inspectMemoryClasses( + std::function func); + private: // Remove "dangling" accounts; accounts where the account_info map is the only // entity still pointing to the account. @@ -148,6 +169,10 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t active_buffer_count_ ABSL_GUARDED_BY(mutex_) = 0; // total bytes buffered across all buffers. uint64_t total_buffer_size_ ABSL_GUARDED_BY(mutex_) = 0; + // total number of accounts created + uint64_t total_accounts_created_ ABSL_GUARDED_BY(mutex_) = 0; + // total number of accounts unregistered + uint64_t total_accounts_unregistered_ ABSL_GUARDED_BY(mutex_) = 0; // Info about the buffer, by buffer idx. absl::node_hash_map buffer_infos_ ABSL_GUARDED_BY(mutex_); // The expected balances for the accounts. If set, when a buffer updates its diff --git a/test/integration/tracked_watermark_buffer_test.cc b/test/integration/tracked_watermark_buffer_test.cc index 734fa5cd31aee..782fcc71a04d7 100644 --- a/test/integration/tracked_watermark_buffer_test.cc +++ b/test/integration/tracked_watermark_buffer_test.cc @@ -7,6 +7,7 @@ #include "test/integration/tracked_watermark_buffer.h" #include "test/mocks/common.h" +#include "test/mocks/http/stream_reset_handler.h" #include "test/test_common/test_runtime.h" #include "test/test_common/thread_factory_for_test.h" @@ -22,6 +23,7 @@ namespace { class TrackedWatermarkBufferTest : public testing::Test { public: TrackedWatermarkBufferFactory factory_; + Http::MockStreamResetHandler mock_reset_handler_; }; TEST_F(TrackedWatermarkBufferTest, WatermarkFunctions) { @@ -131,7 +133,7 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfBuffersActivelyBound) { auto buffer1 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer2 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer3 = factory_.createBuffer([]() {}, []() {}, []() {}); - BufferMemoryAccountSharedPtr account = std::make_shared(); + auto account = factory_.createAccount(mock_reset_handler_); ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); buffer1->bindAccount(account); @@ -141,7 +143,8 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfBuffersActivelyBound) { buffer3->bindAccount(account); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 3)); - // Release test access to the account. + // Release test and account access to shared_this. + account->clearDownstream(); account.reset(); buffer3.reset(); @@ -156,7 +159,7 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { auto buffer1 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer2 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer3 = factory_.createBuffer([]() {}, []() {}, []() {}); - BufferMemoryAccountSharedPtr account1 = std::make_shared(); + auto account1 = factory_.createAccount(mock_reset_handler_); ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); buffer1->bindAccount(account1); @@ -164,10 +167,12 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { buffer2->bindAccount(account1); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); - // Release test access to the account. + // Release test and account access to shared_this. + account1->clearDownstream(); account1.reset(); - buffer3->bindAccount(std::make_shared()); + auto account2 = factory_.createAccount(mock_reset_handler_); + buffer3->bindAccount(account2); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 3)); buffer2.reset(); @@ -175,6 +180,10 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { buffer1.reset(); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); + // Release test and account access to shared_this. + account2->clearDownstream(); + account2.reset(); + buffer3.reset(); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } @@ -182,8 +191,8 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrueWhenConditionsMet) { auto buffer1 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer2 = factory_.createBuffer([]() {}, []() {}, []() {}); - BufferMemoryAccountSharedPtr account1 = std::make_shared(); - BufferMemoryAccountSharedPtr account2 = std::make_shared(); + auto account1 = factory_.createAccount(mock_reset_handler_); + auto account2 = factory_.createAccount(mock_reset_handler_); buffer1->bindAccount(account1); buffer2->bindAccount(account2); @@ -194,6 +203,9 @@ TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrue buffer2->add("Now we have expected balances!"); EXPECT_TRUE(factory_.waitForExpectedAccountBalanceWithTimeout(std::chrono::seconds(0))); + + account1->clearDownstream(); + account2->clearDownstream(); } } // namespace diff --git a/test/mocks/buffer/mocks.h b/test/mocks/buffer/mocks.h index 22a215206939b..c3579a3b8d7d4 100644 --- a/test/mocks/buffer/mocks.h +++ b/test/mocks/buffer/mocks.h @@ -87,6 +87,8 @@ class MockBufferFactory : public Buffer::WatermarkFactory { MOCK_METHOD(Buffer::Instance*, createBuffer_, (std::function below_low, std::function above_high, std::function above_overflow)); + + MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, createAccount, (Http::StreamResetHandler&)); }; MATCHER_P(BufferEqual, rhs, testing::PrintToString(*rhs)) { diff --git a/test/mocks/http/BUILD b/test/mocks/http/BUILD index 51c6c7f2fae1d..063aa41e96586 100644 --- a/test/mocks/http/BUILD +++ b/test/mocks/http/BUILD @@ -108,3 +108,11 @@ envoy_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_cc_mock( + name = "stream_reset_handler_mock", + hdrs = ["stream_reset_handler.h"], + deps = [ + "//envoy/http:stream_reset_handler_interface", + ], +) diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 1a3d4e8bcae67..19181d8c26ed6 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -29,7 +29,11 @@ MockStream::MockStream() { [this](Buffer::BufferMemoryAccountSharedPtr account) -> void { account_ = account; })); } -MockStream::~MockStream() = default; +MockStream::~MockStream() { + if (account_) { + account_->clearDownstream(); + } +} } // namespace Http } // namespace Envoy diff --git a/test/mocks/http/stream_reset_handler.h b/test/mocks/http/stream_reset_handler.h new file mode 100644 index 0000000000000..6469a7b124cc0 --- /dev/null +++ b/test/mocks/http/stream_reset_handler.h @@ -0,0 +1,19 @@ +#pragma once + +#include "envoy/http/stream_reset_handler.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Http { + +class MockStreamResetHandler : public StreamResetHandler { +public: + MockStreamResetHandler() = default; + + // Http::StreamResetHandler + MOCK_METHOD(void, resetStream, (StreamResetReason reason)); +}; + +} // namespace Http +} // namespace Envoy From 0d2418e9f19d50197ed237dfe5497c715d3c99f0 Mon Sep 17 00:00:00 2001 From: Xie Zhihao Date: Mon, 2 Aug 2021 04:27:30 +0800 Subject: [PATCH 6/7] rbac: add matching range of destination ports (#17356) use one destination port range rule to cover thousands of port rules in RBAC permissions, adding convenience for writing configuration and making matching quicker Risk Level: Low Testing: Unit Docs Changes: N/A Release Notes: Added Platform Specific Features: N/A Fixes #16039 Signed-off-by: Xie Zhihao --- api/envoy/config/rbac/v3/BUILD | 1 + api/envoy/config/rbac/v3/rbac.proto | 6 +- api/envoy/config/rbac/v4alpha/BUILD | 1 + api/envoy/config/rbac/v4alpha/rbac.proto | 6 +- docs/root/version_history/current.rst | 2 +- .../envoy/config/rbac/v3/BUILD | 1 + .../envoy/config/rbac/v3/rbac.proto | 6 +- .../envoy/config/rbac/v4alpha/BUILD | 1 + .../envoy/config/rbac/v4alpha/rbac.proto | 6 +- .../filters/common/rbac/matchers.cc | 30 +++++++++ .../extensions/filters/common/rbac/matchers.h | 12 ++++ .../filters/common/rbac/matchers_test.cc | 63 +++++++++++++++++++ 12 files changed, 130 insertions(+), 5 deletions(-) diff --git a/api/envoy/config/rbac/v3/BUILD b/api/envoy/config/rbac/v3/BUILD index c5246439c7b55..c289def1f11d2 100644 --- a/api/envoy/config/rbac/v3/BUILD +++ b/api/envoy/config/rbac/v3/BUILD @@ -10,6 +10,7 @@ api_proto_package( "//envoy/config/core/v3:pkg", "//envoy/config/route/v3:pkg", "//envoy/type/matcher/v3:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/api/envoy/config/rbac/v3/rbac.proto b/api/envoy/config/rbac/v3/rbac.proto index 44b3cf7cee6ec..d66f9be2b4981 100644 --- a/api/envoy/config/rbac/v3/rbac.proto +++ b/api/envoy/config/rbac/v3/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v3/route_components.proto"; import "envoy/type/matcher/v3/metadata.proto"; import "envoy/type/matcher/v3/path.proto"; import "envoy/type/matcher/v3/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -145,7 +146,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v2.Permission"; @@ -185,6 +186,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v3.MetadataMatcher metadata = 7; diff --git a/api/envoy/config/rbac/v4alpha/BUILD b/api/envoy/config/rbac/v4alpha/BUILD index f5683a61a2867..090d01f3cd17c 100644 --- a/api/envoy/config/rbac/v4alpha/BUILD +++ b/api/envoy/config/rbac/v4alpha/BUILD @@ -10,6 +10,7 @@ api_proto_package( "//envoy/config/rbac/v3:pkg", "//envoy/config/route/v4alpha:pkg", "//envoy/type/matcher/v4alpha:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/api/envoy/config/rbac/v4alpha/rbac.proto b/api/envoy/config/rbac/v4alpha/rbac.proto index bd56c0c3dc326..6fbd5a90f37db 100644 --- a/api/envoy/config/rbac/v4alpha/rbac.proto +++ b/api/envoy/config/rbac/v4alpha/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v4alpha/route_components.proto"; import "envoy/type/matcher/v4alpha/metadata.proto"; import "envoy/type/matcher/v4alpha/path.proto"; import "envoy/type/matcher/v4alpha/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -143,7 +144,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v3.Permission"; @@ -183,6 +184,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v4alpha.MetadataMatcher metadata = 7; diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 243facceaed48..357698e616675 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -56,8 +56,8 @@ New Features * bootstrap: added :ref:`inline_headers ` in the bootstrap to make custom inline headers bootstrap configurable. * http: added :ref:`string_match ` in the header matcher. * http: added support for :ref:`max_requests_per_connection ` for both upstream and downstream connections. - * jwt_authn: added support for :ref:`Jwt Cache ` and its size can be specified by :ref:`jwt_cache_size `. +* rbac: added :ref:`destination_port_range ` for matching range of destination ports. Deprecated ---------- diff --git a/generated_api_shadow/envoy/config/rbac/v3/BUILD b/generated_api_shadow/envoy/config/rbac/v3/BUILD index c5246439c7b55..c289def1f11d2 100644 --- a/generated_api_shadow/envoy/config/rbac/v3/BUILD +++ b/generated_api_shadow/envoy/config/rbac/v3/BUILD @@ -10,6 +10,7 @@ api_proto_package( "//envoy/config/core/v3:pkg", "//envoy/config/route/v3:pkg", "//envoy/type/matcher/v3:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/generated_api_shadow/envoy/config/rbac/v3/rbac.proto b/generated_api_shadow/envoy/config/rbac/v3/rbac.proto index 44b3cf7cee6ec..d66f9be2b4981 100644 --- a/generated_api_shadow/envoy/config/rbac/v3/rbac.proto +++ b/generated_api_shadow/envoy/config/rbac/v3/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v3/route_components.proto"; import "envoy/type/matcher/v3/metadata.proto"; import "envoy/type/matcher/v3/path.proto"; import "envoy/type/matcher/v3/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -145,7 +146,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v2.Permission"; @@ -185,6 +186,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v3.MetadataMatcher metadata = 7; diff --git a/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD b/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD index ddf34cc1032bc..2b205e7373632 100644 --- a/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD +++ b/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD @@ -11,6 +11,7 @@ api_proto_package( "//envoy/config/rbac/v3:pkg", "//envoy/config/route/v4alpha:pkg", "//envoy/type/matcher/v4alpha:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto b/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto index 3b27e68bba1dc..bff8576a27c84 100644 --- a/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto +++ b/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v4alpha/route_components.proto"; import "envoy/type/matcher/v4alpha/metadata.proto"; import "envoy/type/matcher/v4alpha/path.proto"; import "envoy/type/matcher/v4alpha/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -144,7 +145,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v3.Permission"; @@ -184,6 +185,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v4alpha.MetadataMatcher metadata = 7; diff --git a/source/extensions/filters/common/rbac/matchers.cc b/source/extensions/filters/common/rbac/matchers.cc index 25d8e2de63b54..fc010bc46c68e 100644 --- a/source/extensions/filters/common/rbac/matchers.cc +++ b/source/extensions/filters/common/rbac/matchers.cc @@ -23,6 +23,8 @@ MatcherConstSharedPtr Matcher::create(const envoy::config::rbac::v3::Permission& IPMatcher::Type::DownstreamLocal); case envoy::config::rbac::v3::Permission::RuleCase::kDestinationPort: return std::make_shared(permission.destination_port()); + case envoy::config::rbac::v3::Permission::RuleCase::kDestinationPortRange: + return std::make_shared(permission.destination_port_range()); case envoy::config::rbac::v3::Permission::RuleCase::kAny: return std::make_shared(); case envoy::config::rbac::v3::Permission::RuleCase::kMetadata: @@ -159,6 +161,34 @@ bool PortMatcher::matches(const Network::Connection&, const Envoy::Http::Request return ip && ip->port() == port_; } +PortRangeMatcher::PortRangeMatcher(const ::envoy::type::v3::Int32Range& range) + : start_(range.start()), end_(range.end()) { + auto start = range.start(); + auto end = range.end(); + if (start < 0 || start > 65536) { + throw EnvoyException(fmt::format("range start {} is out of bounds", start)); + } + if (end < 0 || end > 65536) { + throw EnvoyException(fmt::format("range end {} is out of bounds", end)); + } + if (start >= end) { + throw EnvoyException( + fmt::format("range start {} cannot be greater or equal than range end {}", start, end)); + } +} + +bool PortRangeMatcher::matches(const Network::Connection&, const Envoy::Http::RequestHeaderMap&, + const StreamInfo::StreamInfo& info) const { + const Envoy::Network::Address::Ip* ip = + info.downstreamAddressProvider().localAddress().get()->ip(); + if (ip) { + const auto port = ip->port(); + return start_ <= port && port < end_; + } else { + return false; + } +} + bool AuthenticatedMatcher::matches(const Network::Connection& connection, const Envoy::Http::RequestHeaderMap&, const StreamInfo::StreamInfo&) const { diff --git a/source/extensions/filters/common/rbac/matchers.h b/source/extensions/filters/common/rbac/matchers.h index 472b4a2c9c17e..79d43fb59f0c2 100644 --- a/source/extensions/filters/common/rbac/matchers.h +++ b/source/extensions/filters/common/rbac/matchers.h @@ -163,6 +163,18 @@ class PortMatcher : public Matcher { const uint32_t port_; }; +class PortRangeMatcher : public Matcher { +public: + PortRangeMatcher(const ::envoy::type::v3::Int32Range& range); + + bool matches(const Network::Connection&, const Envoy::Http::RequestHeaderMap&, + const StreamInfo::StreamInfo& info) const override; + +private: + const uint32_t start_; + const uint32_t end_; +}; + /** * Matches the principal name as described in the peer certificate. Uses the URI SAN first. If that * field is not present, uses the subject instead. diff --git a/test/extensions/filters/common/rbac/matchers_test.cc b/test/extensions/filters/common/rbac/matchers_test.cc index 87d22517dd428..0979445343df8 100644 --- a/test/extensions/filters/common/rbac/matchers_test.cc +++ b/test/extensions/filters/common/rbac/matchers_test.cc @@ -4,6 +4,7 @@ #include "envoy/config/route/v3/route_components.pb.h" #include "envoy/type/matcher/v3/metadata.pb.h" +#include "source/common/network/address_impl.h" #include "source/common/network/utility.h" #include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/common/rbac/matchers.h" @@ -33,6 +34,10 @@ void checkMatcher( EXPECT_EQ(expected, matcher.matches(connection, headers, info)); } +PortRangeMatcher createPortRangeMatcher(envoy::type::v3::Int32Range range) { + return PortRangeMatcher(range); +} + TEST(AlwaysMatcher, AlwaysMatches) { checkMatcher(RBAC::AlwaysMatcher(), true); } TEST(AndMatcher, Permission_Set) { @@ -101,6 +106,12 @@ TEST(OrMatcher, Permission_Set) { checkMatcher(RBAC::OrMatcher(set), false, conn, headers, info); + perm = set.add_rules(); + perm->mutable_destination_port_range()->set_start(123); + perm->mutable_destination_port_range()->set_end(456); + + checkMatcher(RBAC::OrMatcher(set), false, conn, headers, info); + perm = set.add_rules(); perm->set_any(true); @@ -233,6 +244,58 @@ TEST(PortMatcher, PortMatcher) { checkMatcher(PortMatcher(456), false, conn, headers, info); } +// Test valid and invalid destination_port_range permission rule in RBAC. +TEST(PortRangeMatcher, PortRangeMatcher) { + Envoy::Network::MockConnection conn; + Envoy::Http::TestRequestHeaderMapImpl headers; + NiceMock info; + Envoy::Network::Address::InstanceConstSharedPtr addr = + Envoy::Network::Utility::parseInternetAddress("1.2.3.4", 456, false); + info.downstream_address_provider_->setLocalAddress(addr); + + // IP address with port 456 is in range [123, 789) and [456, 789), but not in range [123, 456) or + // [12, 34). + envoy::type::v3::Int32Range range; + range.set_start(123); + range.set_end(789); + checkMatcher(PortRangeMatcher(range), true, conn, headers, info); + + range.set_start(456); + range.set_end(789); + checkMatcher(PortRangeMatcher(range), true, conn, headers, info); + + range.set_start(123); + range.set_end(456); + checkMatcher(PortRangeMatcher(range), false, conn, headers, info); + + range.set_start(12); + range.set_end(34); + checkMatcher(PortRangeMatcher(range), false, conn, headers, info); + + // Only IP address is valid for the permission rule. + NiceMock info2; + Envoy::Network::Address::InstanceConstSharedPtr addr2 = + std::make_shared("test"); + info2.downstream_address_provider_->setLocalAddress(addr2); + checkMatcher(PortRangeMatcher(range), false, conn, headers, info2); + + // Invalid rule will cause an exception. + range.set_start(-1); + range.set_end(80); + EXPECT_THROW_WITH_REGEX(createPortRangeMatcher(range), EnvoyException, + "range start .* is out of bounds"); + + range.set_start(80); + range.set_end(65537); + EXPECT_THROW_WITH_REGEX(createPortRangeMatcher(range), EnvoyException, + "range end .* is out of bounds"); + + range.set_start(80); + range.set_end(80); + EXPECT_THROW_WITH_REGEX(createPortRangeMatcher(range), EnvoyException, + "range start .* cannot be greater or equal than range end .*"); +} + TEST(AuthenticatedMatcher, uriSanPeerCertificate) { Envoy::Network::MockConnection conn; auto ssl = std::make_shared(); From ec8b808979e5a74e8b95a3d29886315c52550118 Mon Sep 17 00:00:00 2001 From: Kevin Baichoo Date: Tue, 13 Jul 2021 15:57:56 +0000 Subject: [PATCH 7/7] Add configurability of BufferFactory. We can now configure the minimum account size to track. Signed-off-by: Kevin Baichoo --- api/envoy/config/bootstrap/v3/bootstrap.proto | 17 +++++++++++++- .../config/bootstrap/v4alpha/bootstrap.proto | 20 ++++++++++++++++- envoy/api/BUILD | 1 + envoy/api/api.h | 6 +++++ .../envoy/config/bootstrap/v3/bootstrap.proto | 17 +++++++++++++- .../config/bootstrap/v4alpha/bootstrap.proto | 20 ++++++++++++++++- source/common/api/BUILD | 1 + source/common/api/api_impl.cc | 8 +++++-- source/common/api/api_impl.h | 4 ++++ source/common/buffer/BUILD | 1 + source/common/buffer/watermark_buffer.cc | 19 ++++++++++++++-- source/common/buffer/watermark_buffer.h | 6 +++++ source/common/event/dispatcher_impl.cc | 3 ++- source/server/config_validation/BUILD | 1 + source/server/config_validation/api.cc | 5 +++-- source/server/config_validation/api.h | 4 +++- source/server/config_validation/server.cc | 22 +++++++++---------- source/server/config_validation/server.h | 2 ++ source/server/server.cc | 8 +++---- source/server/server.h | 2 +- test/common/buffer/BUILD | 1 + .../buffer/buffer_memory_account_test.cc | 15 ++++++++++++- test/integration/BUILD | 1 + .../buffer_accounting_integration_test.cc | 2 +- test/integration/tracked_watermark_buffer.cc | 9 ++++++++ test/integration/tracked_watermark_buffer.h | 4 +++- test/integration/utility.cc | 4 +++- test/mocks/api/mocks.cc | 2 ++ test/mocks/api/mocks.h | 3 +++ test/server/config_validation/BUILD | 1 + .../config_validation/dispatcher_test.cc | 5 ++++- test/test_common/BUILD | 1 + test/test_common/utility.cc | 4 +++- 33 files changed, 185 insertions(+), 34 deletions(-) diff --git a/api/envoy/config/bootstrap/v3/bootstrap.proto b/api/envoy/config/bootstrap/v3/bootstrap.proto index 0e8de36633354..16808ab3c19e9 100644 --- a/api/envoy/config/bootstrap/v3/bootstrap.proto +++ b/api/envoy/config/bootstrap/v3/bootstrap.proto @@ -40,7 +40,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v2.Bootstrap"; @@ -219,6 +219,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -642,3 +644,16 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/api/envoy/config/bootstrap/v4alpha/bootstrap.proto b/api/envoy/config/bootstrap/v4alpha/bootstrap.proto index 5c45b8f7dbce9..f007d298fd8e0 100644 --- a/api/envoy/config/bootstrap/v4alpha/bootstrap.proto +++ b/api/envoy/config/bootstrap/v4alpha/bootstrap.proto @@ -37,7 +37,7 @@ option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSIO // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v3.Bootstrap"; @@ -199,6 +199,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -618,3 +620,19 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.bootstrap.v3.BufferFactoryConfig"; + + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/envoy/api/BUILD b/envoy/api/BUILD index 55e267505ee54..904e5fff75f8a 100644 --- a/envoy/api/BUILD +++ b/envoy/api/BUILD @@ -18,6 +18,7 @@ envoy_cc_library( "//envoy/filesystem:filesystem_interface", "//envoy/server:process_context_interface", "//envoy/thread:thread_interface", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/envoy/api/api.h b/envoy/api/api.h index 89336aaf57347..c83198beed8be 100644 --- a/envoy/api/api.h +++ b/envoy/api/api.h @@ -5,6 +5,7 @@ #include "envoy/common/random_generator.h" #include "envoy/common/time.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/event/scaled_range_timer_manager.h" #include "envoy/filesystem/filesystem.h" @@ -83,6 +84,11 @@ class Api { * @return an optional reference to the ProcessContext */ virtual ProcessContextOptRef processContext() PURE; + + /** + * @return the bootstrap Envoy started with. + */ + virtual const envoy::config::bootstrap::v3::Bootstrap& bootstrap() const PURE; }; using ApiPtr = std::unique_ptr; diff --git a/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto b/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto index 9171d066a4302..0ec15a7665e0a 100644 --- a/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto +++ b/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto @@ -40,7 +40,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v2.Bootstrap"; @@ -217,6 +217,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -646,3 +648,16 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto b/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto index b21acabe686fc..5908379743b15 100644 --- a/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto +++ b/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto @@ -39,7 +39,7 @@ option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSIO // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v3.Bootstrap"; @@ -215,6 +215,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -650,3 +652,19 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.bootstrap.v3.BufferFactoryConfig"; + + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/source/common/api/BUILD b/source/common/api/BUILD index 60412c4513432..950ea63a165cc 100644 --- a/source/common/api/BUILD +++ b/source/common/api/BUILD @@ -18,6 +18,7 @@ envoy_cc_library( "//source/common/common:thread_lib", "//source/common/event:dispatcher_lib", "//source/common/network:socket_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/source/common/api/api_impl.cc b/source/common/api/api_impl.cc index 73de8f4a320c2..485fd5f50d8d2 100644 --- a/source/common/api/api_impl.cc +++ b/source/common/api/api_impl.cc @@ -3,6 +3,8 @@ #include #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" + #include "source/common/common/thread.h" #include "source/common/event/dispatcher_impl.h" @@ -11,10 +13,12 @@ namespace Api { Impl::Impl(Thread::ThreadFactory& thread_factory, Stats::Store& store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, - Random::RandomGenerator& random_generator, const ProcessContextOptRef& process_context, + Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap, + const ProcessContextOptRef& process_context, Buffer::WatermarkFactorySharedPtr watermark_factory) : thread_factory_(thread_factory), store_(store), time_system_(time_system), - file_system_(file_system), random_generator_(random_generator), + file_system_(file_system), random_generator_(random_generator), bootstrap_(bootstrap), process_context_(process_context), watermark_factory_(std::move(watermark_factory)) {} Event::DispatcherPtr Impl::allocateDispatcher(const std::string& name) { diff --git a/source/common/api/api_impl.h b/source/common/api/api_impl.h index 0bec3b866562d..9a9e1e3fad096 100644 --- a/source/common/api/api_impl.h +++ b/source/common/api/api_impl.h @@ -4,6 +4,7 @@ #include #include "envoy/api/api.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/timer.h" #include "envoy/filesystem/filesystem.h" #include "envoy/network/socket.h" @@ -19,6 +20,7 @@ class Impl : public Api { public: Impl(Thread::ThreadFactory& thread_factory, Stats::Store& store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap, const ProcessContextOptRef& process_context = absl::nullopt, Buffer::WatermarkFactorySharedPtr watermark_factory = nullptr); @@ -34,6 +36,7 @@ class Impl : public Api { TimeSource& timeSource() override { return time_system_; } Stats::Scope& rootScope() override { return store_; } Random::RandomGenerator& randomGenerator() override { return random_generator_; } + const envoy::config::bootstrap::v3::Bootstrap& bootstrap() const override { return bootstrap_; } ProcessContextOptRef processContext() override { return process_context_; } private: @@ -42,6 +45,7 @@ class Impl : public Api { Event::TimeSystem& time_system_; Filesystem::Instance& file_system_; Random::RandomGenerator& random_generator_; + const envoy::config::bootstrap::v3::Bootstrap& bootstrap_; ProcessContextOptRef process_context_; const Buffer::WatermarkFactorySharedPtr watermark_factory_; }; diff --git a/source/common/buffer/BUILD b/source/common/buffer/BUILD index 9d531f683cf24..5d93729ddb3cb 100644 --- a/source/common/buffer/BUILD +++ b/source/common/buffer/BUILD @@ -17,6 +17,7 @@ envoy_cc_library( "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", "//source/common/runtime:runtime_features_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 1616afb542e0b..6de9a62ee80b3 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -9,6 +9,9 @@ namespace Envoy { namespace Buffer { +namespace { +constexpr uint32_t kDefaultMinimumTrackingBytes = absl::bit_width(uint32_t(1024 * 256)) - 1; +} // end namespace void WatermarkBuffer::add(const void* data, uint64_t size) { OwnedImpl::add(data, size); @@ -174,6 +177,17 @@ void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPt } } +WatermarkBufferFactory::WatermarkBufferFactory( + const envoy::config::bootstrap::v3::BufferFactoryConfig& config) + : bitshift_(config.account_tracking_threshold_bytes() + ? absl::bit_width(config.account_tracking_threshold_bytes() - 1) + : kDefaultMinimumTrackingBytes) { + RELEASE_ASSERT(config.account_tracking_threshold_bytes() == 0 || + (config.account_tracking_threshold_bytes() & + config.account_tracking_threshold_bytes() - 1) == 0, + "Expected account_tracking_threshold_bytes to be a power of two."); +} + WatermarkBufferFactory::~WatermarkBufferFactory() { for (auto& account_set : size_class_account_sets_) { ASSERT(account_set.empty(), @@ -195,10 +209,11 @@ BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory, } int BufferMemoryAccountImpl::balanceToClassIndex() { - const uint64_t shifted_balance = buffer_memory_allocated_ >> 20; // shift by 1MB. + static uint32_t bitshift = factory_->bitshift(); + uint64_t shifted_balance = buffer_memory_allocated_ >> bitshift; if (shifted_balance == 0) { - return -1; // Not worth tracking anything < 1MB. + return -1; // Not worth tracking anything < configured minimum threshold } const int class_idx = absl::bit_width(shifted_balance) - 1; diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 4c874c202f637..d3cbb1f20f3c3 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -5,6 +5,7 @@ #include "envoy/buffer/buffer.h" #include "envoy/common/optref.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "source/common/buffer/buffer_impl.h" @@ -182,6 +183,8 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount { */ class WatermarkBufferFactory : public WatermarkFactory { public: + WatermarkBufferFactory(const envoy::config::bootstrap::v3::BufferFactoryConfig& config); + // Buffer::WatermarkFactory ~WatermarkBufferFactory() override; InstancePtr createBuffer(std::function below_low_watermark, @@ -198,6 +201,8 @@ class WatermarkBufferFactory : public WatermarkFactory { void updateAccountClass(const BufferMemoryAccountSharedPtr& account, int current_class, int new_class); + uint32_t bitshift() const { return bitshift_; } + // Unregister a buffer memory account. virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class); @@ -206,6 +211,7 @@ class WatermarkBufferFactory : public WatermarkFactory { using MemoryClassesToAccountsSet = std::array, BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; MemoryClassesToAccountsSet size_class_account_sets_; + uint32_t bitshift_; }; } // namespace Buffer diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index c9c48cb31a657..1bdd31c613946 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -61,7 +61,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, : name_(name), api_(api), buffer_factory_(watermark_factory != nullptr ? watermark_factory - : std::make_shared()), + : std::make_shared( + api.bootstrap().buffer_factory_config())), scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)), thread_local_delete_cb_( base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })), diff --git a/source/server/config_validation/BUILD b/source/server/config_validation/BUILD index f33cd523ce856..bc6fd2391fcd5 100644 --- a/source/server/config_validation/BUILD +++ b/source/server/config_validation/BUILD @@ -26,6 +26,7 @@ envoy_cc_library( "//envoy/api:api_interface", "//envoy/filesystem:filesystem_interface", "//source/common/api:api_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/source/server/config_validation/api.cc b/source/server/config_validation/api.cc index 75f8e755797c2..c9d4a2546d625 100644 --- a/source/server/config_validation/api.cc +++ b/source/server/config_validation/api.cc @@ -8,8 +8,9 @@ namespace Api { ValidationImpl::ValidationImpl(Thread::ThreadFactory& thread_factory, Stats::Store& stats_store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, - Random::RandomGenerator& random_generator) - : Impl(thread_factory, stats_store, time_system, file_system, random_generator), + Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap) + : Impl(thread_factory, stats_store, time_system, file_system, random_generator, bootstrap), time_system_(time_system) {} Event::DispatcherPtr ValidationImpl::allocateDispatcher(const std::string& name) { diff --git a/source/server/config_validation/api.h b/source/server/config_validation/api.h index 6725362caa733..c1a64a65df54a 100644 --- a/source/server/config_validation/api.h +++ b/source/server/config_validation/api.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/api/api.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/timer.h" #include "envoy/filesystem/filesystem.h" @@ -17,7 +18,8 @@ class ValidationImpl : public Impl { public: ValidationImpl(Thread::ThreadFactory& thread_factory, Stats::Store& stats_store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, - Random::RandomGenerator& random_generator); + Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap); Event::DispatcherPtr allocateDispatcher(const std::string& name) override; Event::DispatcherPtr allocateDispatcher(const std::string& name, diff --git a/source/server/config_validation/server.cc b/source/server/config_validation/server.cc index 3d37c7ac56c49..1eaac379016ab 100644 --- a/source/server/config_validation/server.cc +++ b/source/server/config_validation/server.cc @@ -45,8 +45,9 @@ ValidationInstance::ValidationInstance( : options_(options), validation_context_(options_.allowUnknownStaticFields(), !options.rejectUnknownDynamicFields(), !options.ignoreUnknownDynamicFields()), - stats_store_(store), api_(new Api::ValidationImpl(thread_factory, store, time_system, - file_system, random_generator_)), + stats_store_(store), + api_(new Api::ValidationImpl(thread_factory, store, time_system, file_system, + random_generator_, bootstrap_)), dispatcher_(api_->allocateDispatcher("main_thread")), singleton_manager_(new Singleton::ManagerImpl(api_->threadFactory())), access_log_manager_(options.fileFlushIntervalMsec(), *api_, *dispatcher_, access_log_lock, @@ -78,22 +79,21 @@ void ValidationInstance::initialize(const Options& options, // If we get all the way through that stripped-down initialization flow, to the point where we'd // be ready to serve, then the config has passed validation. // Handle configuration that needs to take place prior to the main configuration load. - envoy::config::bootstrap::v3::Bootstrap bootstrap; - InstanceUtil::loadBootstrapConfig(bootstrap, options, + InstanceUtil::loadBootstrapConfig(bootstrap_, options, messageValidationContext().staticValidationVisitor(), *api_); - Config::Utility::createTagProducer(bootstrap); - bootstrap.mutable_node()->set_hidden_envoy_deprecated_build_version(VersionInfo::version()); + Config::Utility::createTagProducer(bootstrap_); + bootstrap_.mutable_node()->set_hidden_envoy_deprecated_build_version(VersionInfo::version()); local_info_ = std::make_unique( - stats().symbolTable(), bootstrap.node(), bootstrap.node_context_params(), local_address, + stats().symbolTable(), bootstrap_.node(), bootstrap_.node_context_params(), local_address, options.serviceZone(), options.serviceClusterName(), options.serviceNodeName()); overload_manager_ = std::make_unique( - dispatcher(), stats(), threadLocal(), bootstrap.overload_manager(), + dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager(), messageValidationContext().staticValidationVisitor(), *api_, options_); - Configuration::InitialImpl initial_config(bootstrap, options); - initial_config.initAdminAccessLog(bootstrap, *this); + Configuration::InitialImpl initial_config(bootstrap_, options); + initial_config.initAdminAccessLog(bootstrap_, *this); admin_ = std::make_unique(initial_config.admin().address()); listener_manager_ = std::make_unique(*this, *this, *this, false, quic_stat_names_); @@ -107,7 +107,7 @@ void ValidationInstance::initialize(const Options& options, localInfo(), *secret_manager_, messageValidationContext(), *api_, http_context_, grpc_context_, router_context_, accessLogManager(), singletonManager(), options, quic_stat_names_); - config_.initialize(bootstrap, *this, *cluster_manager_factory_); + config_.initialize(bootstrap_, *this, *cluster_manager_factory_); runtime().initialize(clusterManager()); clusterManager().setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); }); } diff --git a/source/server/config_validation/server.h b/source/server/config_validation/server.h index 356769f2b5962..472f88d45fb7c 100644 --- a/source/server/config_validation/server.h +++ b/source/server/config_validation/server.h @@ -2,6 +2,7 @@ #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/core/v3/config_source.pb.h" #include "envoy/config/listener/v3/listener.pb.h" #include "envoy/config/listener/v3/listener_components.pb.h" @@ -195,6 +196,7 @@ class ValidationInstance final : Logger::Loggable, ProtobufMessage::ProdValidationContextImpl validation_context_; Stats::IsolatedStoreImpl& stats_store_; ThreadLocal::InstanceImpl thread_local_; + envoy::config::bootstrap::v3::Bootstrap bootstrap_; Api::ApiPtr api_; Event::DispatcherPtr dispatcher_; std::unique_ptr admin_; diff --git a/source/server/server.cc b/source/server/server.cc index 8ca82f4909315..6ab74b6007f7a 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -74,10 +74,10 @@ InstanceImpl::InstanceImpl( time_source_(time_system), restarter_(restarter), start_time_(time(nullptr)), original_start_time_(start_time_), stats_store_(store), thread_local_(tls), random_generator_(std::move(random_generator)), - api_(new Api::Impl(thread_factory, store, time_system, file_system, *random_generator_, - process_context ? ProcessContextOptRef(std::ref(*process_context)) - : absl::nullopt, - watermark_factory)), + api_(new Api::Impl( + thread_factory, store, time_system, file_system, *random_generator_, bootstrap_, + process_context ? ProcessContextOptRef(std::ref(*process_context)) : absl::nullopt, + watermark_factory)), dispatcher_(api_->allocateDispatcher("main_thread")), singleton_manager_(new Singleton::ManagerImpl(api_->threadFactory())), handler_(new ConnectionHandlerImpl(*dispatcher_, absl::nullopt)), diff --git a/source/server/server.h b/source/server/server.h index 36a017416317c..9234f6c2ae7b7 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -345,6 +345,7 @@ class InstanceImpl final : Logger::Loggable, Assert::ActionRegistrationPtr envoy_bug_action_registration_; ThreadLocal::Instance& thread_local_; Random::RandomGeneratorPtr random_generator_; + envoy::config::bootstrap::v3::Bootstrap bootstrap_; Api::ApiPtr api_; Event::DispatcherPtr dispatcher_; std::unique_ptr admin_; @@ -367,7 +368,6 @@ class InstanceImpl final : Logger::Loggable, std::unique_ptr worker_guard_dog_; bool terminated_; std::unique_ptr file_logger_; - envoy::config::bootstrap::v3::Bootstrap bootstrap_; ConfigTracker::EntryOwnerPtr config_tracker_entry_; SystemTime bootstrap_config_update_time_; Grpc::AsyncClientManagerPtr async_client_manager_; diff --git a/test/common/buffer/BUILD b/test/common/buffer/BUILD index 80bad25f619a7..85d7f1dbd516a 100644 --- a/test/common/buffer/BUILD +++ b/test/common/buffer/BUILD @@ -91,6 +91,7 @@ envoy_cc_test( "//test/integration:tracked_watermark_buffer_lib", "//test/mocks/buffer:buffer_mocks", "//test/mocks/http:stream_reset_handler_mock", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/test/common/buffer/buffer_memory_account_test.cc b/test/common/buffer/buffer_memory_account_test.cc index 62f16f14d1b0c..a1c1cce2b3080 100644 --- a/test/common/buffer/buffer_memory_account_test.cc +++ b/test/common/buffer/buffer_memory_account_test.cc @@ -1,3 +1,4 @@ +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/http/codec.h" #include "source/common/buffer/buffer_impl.h" @@ -34,7 +35,7 @@ static void noAccountsTracked(MemoryClassesToAccountsSet& memory_classes_to_acco class BufferMemoryAccountTest : public testing::Test { protected: - TrackedWatermarkBufferFactory factory_; + TrackedWatermarkBufferFactory factory_{kMinimumBalanceToTrack}; Http::MockStreamResetHandler mock_reset_handler_; }; @@ -476,6 +477,18 @@ TEST_F(BufferMemoryAccountTest, RemainsInSameBucketIfChangesWithinThreshold) { account->clearDownstream(); } +TEST(WatermarkBufferFactoryTest, CanConfigureMinimumTrackingAmount) { + TrackedWatermarkBufferFactory factory(4); + EXPECT_EQ(factory.bitshift(), 2); +} + +TEST(WatermarkBufferFactoryTest, ReleaseAssertIfAccountTrackingThresholdBytesIsNotPowerOfTwo) { + envoy::config::bootstrap::v3::BufferFactoryConfig config; + config.set_account_tracking_threshold_bytes(3); + EXPECT_DEATH(WatermarkBufferFactory{config}, + "Expected account_tracking_threshold_bytes to be a power of two."); +} + } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index aed12c5b25dda..d7da6529dc28c 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -843,6 +843,7 @@ envoy_cc_test_library( "//test/test_common:test_time_system_interface", "//test/test_common:utility_lib", "@com_google_absl//absl/synchronization", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/transport_sockets/quic/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg_cc_proto", diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 4a345011d08c8..7eb6ec7ab6249 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -97,7 +97,7 @@ class Http2BufferWatermarksTest protected: std::shared_ptr buffer_factory_ = - std::make_shared(); + std::make_shared(1024 * 1024); // Track >= 1MB bool streamBufferAccounting() { return std::get<1>(GetParam()); } diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 79a5f99145f6c..8d5dabce1ad0c 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -9,6 +9,15 @@ namespace Envoy { namespace Buffer { +TrackedWatermarkBufferFactory::TrackedWatermarkBufferFactory() : TrackedWatermarkBufferFactory(0) {} + +TrackedWatermarkBufferFactory::TrackedWatermarkBufferFactory(uint32_t min_tracking_bytes) + : WatermarkBufferFactory([min_tracking_bytes]() { + auto config = envoy::config::bootstrap::v3::BufferFactoryConfig(); + config.set_account_tracking_threshold_bytes(min_tracking_bytes); + return config; + }()) {} + TrackedWatermarkBufferFactory::~TrackedWatermarkBufferFactory() { ASSERT(active_buffer_count_ == 0); } diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 99bc091956bb7..d308f8c53f8ad 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -61,7 +61,9 @@ class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { // Factory that tracks how the created buffers are used. class TrackedWatermarkBufferFactory : public WatermarkBufferFactory { public: - TrackedWatermarkBufferFactory() = default; + // Use the default minimum tracking threshold. + TrackedWatermarkBufferFactory(); + TrackedWatermarkBufferFactory(uint32_t min_tracking_bytes); ~TrackedWatermarkBufferFactory() override; // Buffer::WatermarkFactory Buffer::InstancePtr createBuffer(std::function below_low_watermark, diff --git a/test/integration/utility.cc b/test/integration/utility.cc index c6746cae7869d..a6eb50a495aab 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -5,6 +5,7 @@ #include #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/extensions/transport_sockets/quic/v3/quic_transport.pb.h" #include "envoy/network/connection.h" @@ -187,8 +188,9 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt NiceMock random; Event::GlobalTimeSystem time_system; NiceMock random_generator; + envoy::config::bootstrap::v3::Bootstrap bootstrap; Api::Impl api(Thread::threadFactoryForTest(), mock_stats_store, time_system, - Filesystem::fileSystemForTest(), random_generator); + Filesystem::fileSystemForTest(), random_generator, bootstrap); Event::DispatcherPtr dispatcher(api.allocateDispatcher("test_thread")); TestConnectionCallbacks connection_callbacks(*dispatcher); diff --git a/test/mocks/api/mocks.cc b/test/mocks/api/mocks.cc index dfd9345ae8063..c3eeed40d0c5c 100644 --- a/test/mocks/api/mocks.cc +++ b/test/mocks/api/mocks.cc @@ -8,6 +8,7 @@ using testing::_; using testing::Invoke; +using testing::ReturnRef; namespace Envoy { namespace Api { @@ -16,6 +17,7 @@ MockApi::MockApi() { ON_CALL(*this, fileSystem()).WillByDefault(ReturnRef(file_system_)); ON_CALL(*this, rootScope()).WillByDefault(ReturnRef(stats_store_)); ON_CALL(*this, randomGenerator()).WillByDefault(ReturnRef(random_)); + ON_CALL(*this, bootstrap()).WillByDefault(ReturnRef(empty_bootstrap_)); } MockApi::~MockApi() = default; diff --git a/test/mocks/api/mocks.h b/test/mocks/api/mocks.h index 94b5db59f99c2..ef6f02c999cbd 100644 --- a/test/mocks/api/mocks.h +++ b/test/mocks/api/mocks.h @@ -5,6 +5,7 @@ #include "envoy/api/api.h" #include "envoy/api/os_sys_calls.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/event/timer.h" @@ -47,12 +48,14 @@ class MockApi : public Api { MOCK_METHOD(Thread::ThreadFactory&, threadFactory, ()); MOCK_METHOD(Stats::Scope&, rootScope, ()); MOCK_METHOD(Random::RandomGenerator&, randomGenerator, ()); + MOCK_METHOD(const envoy::config::bootstrap::v3::Bootstrap&, bootstrap, (), (const)); MOCK_METHOD(ProcessContextOptRef, processContext, ()); testing::NiceMock file_system_; Event::GlobalTimeSystem time_system_; testing::NiceMock stats_store_; testing::NiceMock random_; + envoy::config::bootstrap::v3::Bootstrap empty_bootstrap_; }; class MockOsSysCalls : public OsSysCallsImpl { diff --git a/test/server/config_validation/BUILD b/test/server/config_validation/BUILD index 51ebda4ae37c4..03cddb20150e0 100644 --- a/test/server/config_validation/BUILD +++ b/test/server/config_validation/BUILD @@ -74,6 +74,7 @@ envoy_cc_test( "//test/test_common:environment_lib", "//test/test_common:network_utility_lib", "//test/test_common:test_time_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/test/server/config_validation/dispatcher_test.cc b/test/server/config_validation/dispatcher_test.cc index 608135be22ae0..e72afe4cdc223 100644 --- a/test/server/config_validation/dispatcher_test.cc +++ b/test/server/config_validation/dispatcher_test.cc @@ -1,5 +1,7 @@ #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" + #include "source/common/common/thread.h" #include "source/common/event/dispatcher_impl.h" #include "source/common/event/libevent.h" @@ -24,7 +26,7 @@ class ConfigValidation : public testing::TestWithParam( Thread::threadFactoryForTest(), stats_store_, test_time_.timeSystem(), - Filesystem::fileSystemForTest(), random_generator_); + Filesystem::fileSystemForTest(), random_generator_, bootstrap_); dispatcher_ = validation_->allocateDispatcher("test_thread"); } @@ -32,6 +34,7 @@ class ConfigValidation : public testing::TestWithParam random_generator_; + envoy::config::bootstrap::v3::Bootstrap bootstrap_; private: // Using config validation API. diff --git a/test/test_common/BUILD b/test/test_common/BUILD index 75a5445637854..97475f1e989d6 100644 --- a/test/test_common/BUILD +++ b/test/test_common/BUILD @@ -135,6 +135,7 @@ envoy_cc_test_library( "//source/common/protobuf:utility_lib", "//source/common/stats:stats_lib", "//test/mocks/stats:stats_mocks", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index 35b5305cd0a9b..1461cd0a9923c 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -12,6 +12,7 @@ #include "envoy/buffer/buffer.h" #include "envoy/common/platform.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/listener/v3/listener.pb.h" @@ -399,6 +400,7 @@ class TestImplProvider { Event::GlobalTimeSystem global_time_system_; testing::NiceMock default_stats_store_; testing::NiceMock mock_random_generator_; + envoy::config::bootstrap::v3::Bootstrap empty_bootstrap_; }; class TestImpl : public TestImplProvider, public Impl { @@ -408,7 +410,7 @@ class TestImpl : public TestImplProvider, public Impl { Random::RandomGenerator* random = nullptr) : Impl(thread_factory, stats_store ? *stats_store : default_stats_store_, time_system ? *time_system : global_time_system_, file_system, - random ? *random : mock_random_generator_) {} + random ? *random : mock_random_generator_, empty_bootstrap_) {} }; ApiPtr createApiForTest() {