diff --git a/.bazelrc b/.bazelrc index 2b2b0bad1f812..52382f91ec00b 100644 --- a/.bazelrc +++ b/.bazelrc @@ -220,7 +220,7 @@ build:remote --strategy=Javac=remote,sandboxed,local build:remote --strategy=Closure=remote,sandboxed,local build:remote --strategy=Genrule=remote,sandboxed,local build:remote --remote_timeout=7200 -build:remote --auth_enabled=true +build:remote --google_default_credentials=true build:remote --remote_download_toplevel # Windows bazel does not allow sandboxed as a spawn strategy @@ -229,7 +229,7 @@ build:remote-windows --strategy=Javac=remote,local build:remote-windows --strategy=Closure=remote,local build:remote-windows --strategy=Genrule=remote,local build:remote-windows --remote_timeout=7200 -build:remote-windows --auth_enabled=true +build:remote-windows --google_default_credentials=true build:remote-windows --remote_download_toplevel build:remote-clang --config=remote diff --git a/api/BUILD b/api/BUILD index 3e990fa9783a5..a70eae799d797 100644 --- a/api/BUILD +++ b/api/BUILD @@ -1,3 +1,5 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + load("@rules_proto//proto:defs.bzl", "proto_library") licenses(["notice"]) # Apache 2 diff --git a/api/envoy/config/bootstrap/v3/bootstrap.proto b/api/envoy/config/bootstrap/v3/bootstrap.proto index 0e8de36633354..16808ab3c19e9 100644 --- a/api/envoy/config/bootstrap/v3/bootstrap.proto +++ b/api/envoy/config/bootstrap/v3/bootstrap.proto @@ -40,7 +40,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v2.Bootstrap"; @@ -219,6 +219,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -642,3 +644,16 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/api/envoy/config/bootstrap/v4alpha/bootstrap.proto b/api/envoy/config/bootstrap/v4alpha/bootstrap.proto index 5c45b8f7dbce9..f007d298fd8e0 100644 --- a/api/envoy/config/bootstrap/v4alpha/bootstrap.proto +++ b/api/envoy/config/bootstrap/v4alpha/bootstrap.proto @@ -37,7 +37,7 @@ option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSIO // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v3.Bootstrap"; @@ -199,6 +199,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -618,3 +620,19 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.bootstrap.v3.BufferFactoryConfig"; + + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/api/envoy/config/rbac/v3/BUILD b/api/envoy/config/rbac/v3/BUILD index c5246439c7b55..c289def1f11d2 100644 --- a/api/envoy/config/rbac/v3/BUILD +++ b/api/envoy/config/rbac/v3/BUILD @@ -10,6 +10,7 @@ api_proto_package( "//envoy/config/core/v3:pkg", "//envoy/config/route/v3:pkg", "//envoy/type/matcher/v3:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/api/envoy/config/rbac/v3/rbac.proto b/api/envoy/config/rbac/v3/rbac.proto index 44b3cf7cee6ec..d66f9be2b4981 100644 --- a/api/envoy/config/rbac/v3/rbac.proto +++ b/api/envoy/config/rbac/v3/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v3/route_components.proto"; import "envoy/type/matcher/v3/metadata.proto"; import "envoy/type/matcher/v3/path.proto"; import "envoy/type/matcher/v3/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -145,7 +146,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v2.Permission"; @@ -185,6 +186,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v3.MetadataMatcher metadata = 7; diff --git a/api/envoy/config/rbac/v4alpha/BUILD b/api/envoy/config/rbac/v4alpha/BUILD index f5683a61a2867..090d01f3cd17c 100644 --- a/api/envoy/config/rbac/v4alpha/BUILD +++ b/api/envoy/config/rbac/v4alpha/BUILD @@ -10,6 +10,7 @@ api_proto_package( "//envoy/config/rbac/v3:pkg", "//envoy/config/route/v4alpha:pkg", "//envoy/type/matcher/v4alpha:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/api/envoy/config/rbac/v4alpha/rbac.proto b/api/envoy/config/rbac/v4alpha/rbac.proto index bd56c0c3dc326..6fbd5a90f37db 100644 --- a/api/envoy/config/rbac/v4alpha/rbac.proto +++ b/api/envoy/config/rbac/v4alpha/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v4alpha/route_components.proto"; import "envoy/type/matcher/v4alpha/metadata.proto"; import "envoy/type/matcher/v4alpha/path.proto"; import "envoy/type/matcher/v4alpha/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -143,7 +144,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v3.Permission"; @@ -183,6 +184,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v4alpha.MetadataMatcher metadata = 7; diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 243facceaed48..357698e616675 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -56,8 +56,8 @@ New Features * bootstrap: added :ref:`inline_headers ` in the bootstrap to make custom inline headers bootstrap configurable. * http: added :ref:`string_match ` in the header matcher. * http: added support for :ref:`max_requests_per_connection ` for both upstream and downstream connections. - * jwt_authn: added support for :ref:`Jwt Cache ` and its size can be specified by :ref:`jwt_cache_size `. +* rbac: added :ref:`destination_port_range ` for matching range of destination ports. Deprecated ---------- diff --git a/envoy/api/BUILD b/envoy/api/BUILD index 55e267505ee54..904e5fff75f8a 100644 --- a/envoy/api/BUILD +++ b/envoy/api/BUILD @@ -18,6 +18,7 @@ envoy_cc_library( "//envoy/filesystem:filesystem_interface", "//envoy/server:process_context_interface", "//envoy/thread:thread_interface", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/envoy/api/api.h b/envoy/api/api.h index 89336aaf57347..c83198beed8be 100644 --- a/envoy/api/api.h +++ b/envoy/api/api.h @@ -5,6 +5,7 @@ #include "envoy/common/random_generator.h" #include "envoy/common/time.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/event/scaled_range_timer_manager.h" #include "envoy/filesystem/filesystem.h" @@ -83,6 +84,11 @@ class Api { * @return an optional reference to the ProcessContext */ virtual ProcessContextOptRef processContext() PURE; + + /** + * @return the bootstrap Envoy started with. + */ + virtual const envoy::config::bootstrap::v3::Bootstrap& bootstrap() const PURE; }; using ApiPtr = std::unique_ptr; diff --git a/envoy/buffer/BUILD b/envoy/buffer/BUILD index 3b9157d06f5fd..24a2e516527cd 100644 --- a/envoy/buffer/BUILD +++ b/envoy/buffer/BUILD @@ -16,6 +16,7 @@ envoy_cc_library( ], deps = [ "//envoy/api:os_sys_calls_interface", + "//envoy/http:stream_reset_handler_interface", "//source/common/common:assert_lib", "//source/common/common:byte_order_lib", "//source/common/common:utility_lib", diff --git a/envoy/buffer/buffer.h b/envoy/buffer/buffer.h index c30cbd84f2ca4..3c22e7c78a3e2 100644 --- a/envoy/buffer/buffer.h +++ b/envoy/buffer/buffer.h @@ -9,6 +9,7 @@ #include "envoy/common/exception.h" #include "envoy/common/platform.h" #include "envoy/common/pure.h" +#include "envoy/http/stream_reset_handler.h" #include "source/common/common/assert.h" #include "source/common/common/byte_order.h" @@ -109,6 +110,19 @@ class BufferMemoryAccount { * @param amount the amount to credit. */ virtual void credit(uint64_t amount) PURE; + + /** + * Clears the associated downstream with this account. + * After this has been called, calls to reset the downstream become no-ops. + * Must be called before downstream is deleted. + */ + virtual void clearDownstream() PURE; + + /** + * Reset the downstream stream associated with this account. Resetting the downstream stream + * should trigger a reset of the corresponding upstream stream if it exists. + */ + virtual void resetDownstream() PURE; }; using BufferMemoryAccountSharedPtr = std::shared_ptr; @@ -480,7 +494,8 @@ class Instance { using InstancePtr = std::unique_ptr; /** - * A factory for creating buffers which call callbacks when reaching high and low watermarks. + * An abstract factory for creating watermarked buffers and buffer memory + * accounts. The factory also supports tracking active memory accounts. */ class WatermarkFactory { public: @@ -497,6 +512,15 @@ class WatermarkFactory { virtual InstancePtr createBuffer(std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) PURE; + + /** + * Create and returns a buffer memory account. + * + * @param reset_handler supplies the stream_reset_handler the account will + * invoke to reset the stream. + * @return a BufferMemoryAccountSharedPtr of the newly created account. + */ + virtual BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) PURE; }; using WatermarkFactoryPtr = std::unique_ptr; diff --git a/envoy/http/BUILD b/envoy/http/BUILD index 09d26b373967a..ee9e63a01d6c2 100644 --- a/envoy/http/BUILD +++ b/envoy/http/BUILD @@ -43,6 +43,7 @@ envoy_cc_library( ":header_map_interface", ":metadata_interface", ":protocol_interface", + ":stream_reset_handler_interface", "//envoy/buffer:buffer_interface", "//envoy/grpc:status", "//envoy/network:address_interface", @@ -51,6 +52,11 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "stream_reset_handler_interface", + hdrs = ["stream_reset_handler.h"], +) + envoy_cc_library( name = "codes_interface", hdrs = ["codes.h"], diff --git a/envoy/http/codec.h b/envoy/http/codec.h index 3674fd88c3123..023b6a129bed3 100644 --- a/envoy/http/codec.h +++ b/envoy/http/codec.h @@ -11,6 +11,7 @@ #include "envoy/http/header_map.h" #include "envoy/http/metadata_interface.h" #include "envoy/http/protocol.h" +#include "envoy/http/stream_reset_handler.h" #include "envoy/network/address.h" #include "envoy/stream_info/stream_info.h" @@ -263,32 +264,6 @@ class ResponseDecoder : public virtual StreamDecoder { virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE; }; -/** - * Stream reset reasons. - */ -enum class StreamResetReason { - // If a local codec level reset was sent on the stream. - LocalReset, - // If a local codec level refused stream reset was sent on the stream (allowing for retry). - LocalRefusedStreamReset, - // If a remote codec level reset was received on the stream. - RemoteReset, - // If a remote codec level refused stream reset was received on the stream (allowing for retry). - RemoteRefusedStreamReset, - // If the stream was locally reset by a connection pool due to an initial connection failure. - ConnectionFailure, - // If the stream was locally reset due to connection termination. - ConnectionTermination, - // The stream was reset because of a resource overflow. - Overflow, - // Either there was an early TCP error for a CONNECT request or the peer reset with CONNECT_ERROR - ConnectError, - // Received payload did not conform to HTTP protocol. - ProtocolError, - // If the stream was locally reset by the Overload Manager. - OverloadManager -}; - /** * Callbacks that fire against a stream. */ @@ -319,10 +294,8 @@ class StreamCallbacks { /** * An HTTP stream (request, response, and push). */ -class Stream { +class Stream : public StreamResetHandler { public: - virtual ~Stream() = default; - /** * Add stream callbacks. * @param callbacks supplies the callbacks to fire on stream events. @@ -335,12 +308,6 @@ class Stream { */ virtual void removeCallbacks(StreamCallbacks& callbacks) PURE; - /** - * Reset the stream. No events will fire beyond this point. - * @param reason supplies the reset reason. - */ - virtual void resetStream(StreamResetReason reason) PURE; - /** * Enable/disable further data from this stream. * Cessation of data may not be immediate. For example, for HTTP/2 this may stop further flow diff --git a/envoy/http/stream_reset_handler.h b/envoy/http/stream_reset_handler.h new file mode 100644 index 0000000000000..7a6d23c5dac09 --- /dev/null +++ b/envoy/http/stream_reset_handler.h @@ -0,0 +1,50 @@ +#pragma once + +#include "envoy/common/pure.h" + +// Stream Reset is refactored from the codec to avoid cyclical dependencies with +// the BufferMemoryAccount interface. +namespace Envoy { +namespace Http { + +/** + * Stream reset reasons. + */ +enum class StreamResetReason { + // If a local codec level reset was sent on the stream. + LocalReset, + // If a local codec level refused stream reset was sent on the stream (allowing for retry). + LocalRefusedStreamReset, + // If a remote codec level reset was received on the stream. + RemoteReset, + // If a remote codec level refused stream reset was received on the stream (allowing for retry). + RemoteRefusedStreamReset, + // If the stream was locally reset by a connection pool due to an initial connection failure. + ConnectionFailure, + // If the stream was locally reset due to connection termination. + ConnectionTermination, + // The stream was reset because of a resource overflow. + Overflow, + // Either there was an early TCP error for a CONNECT request or the peer reset with CONNECT_ERROR + ConnectError, + // Received payload did not conform to HTTP protocol. + ProtocolError, + // If the stream was locally reset by the Overload Manager. + OverloadManager +}; + +/** + * Handler to reset an underlying HTTP stream. + */ +class StreamResetHandler { +public: + virtual ~StreamResetHandler() = default; + /** + * Reset the stream. No events will fire beyond this point. + * @param reason supplies the reset reason. + */ + virtual void resetStream(StreamResetReason reason) PURE; +}; + +} // namespace Http +} // namespace Envoy diff --git a/examples/dynamic-config-cp/Dockerfile-control-plane b/examples/dynamic-config-cp/Dockerfile-control-plane index 39c7f2ca4223b..3d475c7344219 100644 --- a/examples/dynamic-config-cp/Dockerfile-control-plane +++ b/examples/dynamic-config-cp/Dockerfile-control-plane @@ -8,5 +8,5 @@ RUN apt-get update \ RUN git clone https://github.com/envoyproxy/go-control-plane ADD ./resource.go /go/go-control-plane/internal/example/resource.go -RUN cd go-control-plane && make bin/example +RUN cd go-control-plane && git checkout b4adc3bb5fe5288bff01cd452dad418ef98c676e && make bin/example WORKDIR /go/go-control-plane diff --git a/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto b/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto index 9171d066a4302..0ec15a7665e0a 100644 --- a/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto +++ b/generated_api_shadow/envoy/config/bootstrap/v3/bootstrap.proto @@ -40,7 +40,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v2.Bootstrap"; @@ -217,6 +217,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -646,3 +648,16 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto b/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto index b21acabe686fc..5908379743b15 100644 --- a/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto +++ b/generated_api_shadow/envoy/config/bootstrap/v4alpha/bootstrap.proto @@ -39,7 +39,7 @@ option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSIO // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 33] +// [#next-free-field: 34] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v3.Bootstrap"; @@ -215,6 +215,8 @@ message Bootstrap { (udpa.annotations.security).configure_for_untrusted_upstream = true ]; + BufferFactoryConfig buffer_factory_config = 33; + // Enable :ref:`stats for event dispatcher `, defaults to false. // Note that this records a value for each iteration of the event loop on every thread. This // should normally be minimal overhead, but when using @@ -650,3 +652,19 @@ message CustomInlineHeader { // The type of the header that is expected to be set as the inline header. InlineHeaderType inline_header_type = 2 [(validate.rules).enum = {defined_only: true}]; } + +// Configuration for the Buffer Factories that create Buffers and Accounts. +message BufferFactoryConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.bootstrap.v3.BufferFactoryConfig"; + + // The minimum account size at which Envoy starts tracking a stream. + // This *MUST* be a power of two. + // + // Envoy has 8 power of two buckets starting from this value. + // Concretely the 1st bucket contains accounts for streams that use + // [account_tracking_threshold_bytes, 2 * account_tracking_threshold_bytes). + // With the 8th bucket tracking accounts + // >= 128 * account_tracking_threshold_bytes. + uint32 account_tracking_threshold_bytes = 1 [(validate.rules).uint32 = {gt: 0}]; +} diff --git a/generated_api_shadow/envoy/config/rbac/v3/BUILD b/generated_api_shadow/envoy/config/rbac/v3/BUILD index c5246439c7b55..c289def1f11d2 100644 --- a/generated_api_shadow/envoy/config/rbac/v3/BUILD +++ b/generated_api_shadow/envoy/config/rbac/v3/BUILD @@ -10,6 +10,7 @@ api_proto_package( "//envoy/config/core/v3:pkg", "//envoy/config/route/v3:pkg", "//envoy/type/matcher/v3:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/generated_api_shadow/envoy/config/rbac/v3/rbac.proto b/generated_api_shadow/envoy/config/rbac/v3/rbac.proto index 44b3cf7cee6ec..d66f9be2b4981 100644 --- a/generated_api_shadow/envoy/config/rbac/v3/rbac.proto +++ b/generated_api_shadow/envoy/config/rbac/v3/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v3/route_components.proto"; import "envoy/type/matcher/v3/metadata.proto"; import "envoy/type/matcher/v3/path.proto"; import "envoy/type/matcher/v3/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -145,7 +146,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v2.Permission"; @@ -185,6 +186,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v3.MetadataMatcher metadata = 7; diff --git a/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD b/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD index ddf34cc1032bc..2b205e7373632 100644 --- a/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD +++ b/generated_api_shadow/envoy/config/rbac/v4alpha/BUILD @@ -11,6 +11,7 @@ api_proto_package( "//envoy/config/rbac/v3:pkg", "//envoy/config/route/v4alpha:pkg", "//envoy/type/matcher/v4alpha:pkg", + "//envoy/type/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_google_googleapis//google/api/expr/v1alpha1:checked_proto", "@com_google_googleapis//google/api/expr/v1alpha1:syntax_proto", diff --git a/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto b/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto index 3b27e68bba1dc..bff8576a27c84 100644 --- a/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto +++ b/generated_api_shadow/envoy/config/rbac/v4alpha/rbac.proto @@ -7,6 +7,7 @@ import "envoy/config/route/v4alpha/route_components.proto"; import "envoy/type/matcher/v4alpha/metadata.proto"; import "envoy/type/matcher/v4alpha/path.proto"; import "envoy/type/matcher/v4alpha/string.proto"; +import "envoy/type/v3/range.proto"; import "google/api/expr/v1alpha1/checked.proto"; import "google/api/expr/v1alpha1/syntax.proto"; @@ -144,7 +145,7 @@ message Policy { } // Permission defines an action (or actions) that a principal can take. -// [#next-free-field: 11] +// [#next-free-field: 12] message Permission { option (udpa.annotations.versioning).previous_message_type = "envoy.config.rbac.v3.Permission"; @@ -184,6 +185,9 @@ message Permission { // A port number that describes the destination port connecting to. uint32 destination_port = 6 [(validate.rules).uint32 = {lte: 65535}]; + // A port number range that describes a range of destination ports connecting to. + type.v3.Int32Range destination_port_range = 11; + // Metadata that describes additional information about the action. type.matcher.v4alpha.MetadataMatcher metadata = 7; diff --git a/source/common/api/BUILD b/source/common/api/BUILD index 60412c4513432..950ea63a165cc 100644 --- a/source/common/api/BUILD +++ b/source/common/api/BUILD @@ -18,6 +18,7 @@ envoy_cc_library( "//source/common/common:thread_lib", "//source/common/event:dispatcher_lib", "//source/common/network:socket_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/source/common/api/api_impl.cc b/source/common/api/api_impl.cc index 73de8f4a320c2..485fd5f50d8d2 100644 --- a/source/common/api/api_impl.cc +++ b/source/common/api/api_impl.cc @@ -3,6 +3,8 @@ #include #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" + #include "source/common/common/thread.h" #include "source/common/event/dispatcher_impl.h" @@ -11,10 +13,12 @@ namespace Api { Impl::Impl(Thread::ThreadFactory& thread_factory, Stats::Store& store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, - Random::RandomGenerator& random_generator, const ProcessContextOptRef& process_context, + Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap, + const ProcessContextOptRef& process_context, Buffer::WatermarkFactorySharedPtr watermark_factory) : thread_factory_(thread_factory), store_(store), time_system_(time_system), - file_system_(file_system), random_generator_(random_generator), + file_system_(file_system), random_generator_(random_generator), bootstrap_(bootstrap), process_context_(process_context), watermark_factory_(std::move(watermark_factory)) {} Event::DispatcherPtr Impl::allocateDispatcher(const std::string& name) { diff --git a/source/common/api/api_impl.h b/source/common/api/api_impl.h index 0bec3b866562d..9a9e1e3fad096 100644 --- a/source/common/api/api_impl.h +++ b/source/common/api/api_impl.h @@ -4,6 +4,7 @@ #include #include "envoy/api/api.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/timer.h" #include "envoy/filesystem/filesystem.h" #include "envoy/network/socket.h" @@ -19,6 +20,7 @@ class Impl : public Api { public: Impl(Thread::ThreadFactory& thread_factory, Stats::Store& store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap, const ProcessContextOptRef& process_context = absl::nullopt, Buffer::WatermarkFactorySharedPtr watermark_factory = nullptr); @@ -34,6 +36,7 @@ class Impl : public Api { TimeSource& timeSource() override { return time_system_; } Stats::Scope& rootScope() override { return store_; } Random::RandomGenerator& randomGenerator() override { return random_generator_; } + const envoy::config::bootstrap::v3::Bootstrap& bootstrap() const override { return bootstrap_; } ProcessContextOptRef processContext() override { return process_context_; } private: @@ -42,6 +45,7 @@ class Impl : public Api { Event::TimeSystem& time_system_; Filesystem::Instance& file_system_; Random::RandomGenerator& random_generator_; + const envoy::config::bootstrap::v3::Bootstrap& bootstrap_; ProcessContextOptRef process_context_; const Buffer::WatermarkFactorySharedPtr watermark_factory_; }; diff --git a/source/common/buffer/BUILD b/source/common/buffer/BUILD index 832d72c913128..5d93729ddb3cb 100644 --- a/source/common/buffer/BUILD +++ b/source/common/buffer/BUILD @@ -13,9 +13,11 @@ envoy_cc_library( srcs = ["watermark_buffer.cc"], hdrs = ["watermark_buffer.h"], deps = [ + "//envoy/http:stream_reset_handler_interface", "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", "//source/common/runtime:runtime_features_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 8f7977f9c269c..12152a9273021 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -7,6 +7,7 @@ #include #include "envoy/buffer/buffer.h" +#include "envoy/http/stream_reset_handler.h" #include "source/common/common/assert.h" #include "source/common/common/non_copyable.h" @@ -842,39 +843,5 @@ class OwnedBufferFragmentImpl final : public BufferFragment, public InlineStorag using OwnedBufferFragmentImplPtr = std::unique_ptr; -/** - * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and - * slices that originate from those buffers, or are untagged and pass through an - * associated buffer. - */ -class BufferMemoryAccountImpl : public BufferMemoryAccount { -public: - BufferMemoryAccountImpl() = default; - ~BufferMemoryAccountImpl() override { ASSERT(buffer_memory_allocated_ == 0); } - - // Make not copyable - BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete; - BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete; - - // Make not movable. - BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete; - BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete; - - uint64_t balance() const { return buffer_memory_allocated_; } - void charge(uint64_t amount) override { - // Check overflow - ASSERT(std::numeric_limits::max() - buffer_memory_allocated_ >= amount); - buffer_memory_allocated_ += amount; - } - - void credit(uint64_t amount) override { - ASSERT(buffer_memory_allocated_ >= amount); - buffer_memory_allocated_ -= amount; - } - -private: - uint64_t buffer_memory_allocated_ = 0; -}; - } // namespace Buffer } // namespace Envoy diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 781321f99dc44..6de9a62ee80b3 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -1,10 +1,17 @@ #include "source/common/buffer/watermark_buffer.h" +#include + +#include "envoy/buffer/buffer.h" + #include "source/common/common/assert.h" #include "source/common/runtime/runtime_features.h" namespace Envoy { namespace Buffer { +namespace { +constexpr uint32_t kDefaultMinimumTrackingBytes = absl::bit_width(uint32_t(1024 * 256)) - 1; +} // end namespace void WatermarkBuffer::add(const void* data, uint64_t size) { OwnedImpl::add(data, size); @@ -136,5 +143,112 @@ void WatermarkBuffer::checkHighAndOverflowWatermarks() { } } +BufferMemoryAccountSharedPtr +WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) { + return BufferMemoryAccountImpl::createAccount(this, reset_handler); +} + +void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account, + int current_class, int new_class) { + ASSERT(current_class != new_class, "Expected the current_class and new_class to be different"); + + if (current_class == -1 && new_class >= 0) { + // Start tracking + ASSERT(!size_class_account_sets_[new_class].contains(account)); + size_class_account_sets_[new_class].insert(account); + } else if (current_class >= 0 && new_class == -1) { + // No longer track + ASSERT(size_class_account_sets_[current_class].contains(account)); + size_class_account_sets_[current_class].erase(account); + } else { + // Moving between buckets + ASSERT(size_class_account_sets_[current_class].contains(account)); + ASSERT(!size_class_account_sets_[new_class].contains(account)); + size_class_account_sets_[new_class].insert( + std::move(size_class_account_sets_[current_class].extract(account).value())); + } +} + +void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, + int current_class) { + if (current_class >= 0) { + ASSERT(size_class_account_sets_[current_class].contains(account)); + size_class_account_sets_[current_class].erase(account); + } +} + +WatermarkBufferFactory::WatermarkBufferFactory( + const envoy::config::bootstrap::v3::BufferFactoryConfig& config) + : bitshift_(config.account_tracking_threshold_bytes() + ? absl::bit_width(config.account_tracking_threshold_bytes() - 1) + : kDefaultMinimumTrackingBytes) { + RELEASE_ASSERT(config.account_tracking_threshold_bytes() == 0 || + (config.account_tracking_threshold_bytes() & + config.account_tracking_threshold_bytes() - 1) == 0, + "Expected account_tracking_threshold_bytes to be a power of two."); +} + +WatermarkBufferFactory::~WatermarkBufferFactory() { + for (auto& account_set : size_class_account_sets_) { + ASSERT(account_set.empty(), + "Expected all Accounts to have unregistered from the Watermark Factory."); + } +} + +BufferMemoryAccountSharedPtr +BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory, + Http::StreamResetHandler& reset_handler) { + // We use shared_ptr ctor directly rather than make shared since the + // constructor being invoked is private as we want users to use this static + // method to createAccounts. + auto account = + std::shared_ptr(new BufferMemoryAccountImpl(factory, reset_handler)); + // Set shared_this_ in the account. + static_cast(account.get())->shared_this_ = account; + return account; +} + +int BufferMemoryAccountImpl::balanceToClassIndex() { + static uint32_t bitshift = factory_->bitshift(); + uint64_t shifted_balance = buffer_memory_allocated_ >> bitshift; + + if (shifted_balance == 0) { + return -1; // Not worth tracking anything < configured minimum threshold + } + + const int class_idx = absl::bit_width(shifted_balance) - 1; + return std::min(class_idx, NUM_MEMORY_CLASSES_ - 1); +} + +void BufferMemoryAccountImpl::updateAccountClass() { + const int new_class = balanceToClassIndex(); + if (shared_this_ && new_class != current_bucket_idx_) { + factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class); + current_bucket_idx_ = new_class; + } +} + +void BufferMemoryAccountImpl::credit(uint64_t amount) { + ASSERT(buffer_memory_allocated_ >= amount); + buffer_memory_allocated_ -= amount; + updateAccountClass(); +} + +void BufferMemoryAccountImpl::charge(uint64_t amount) { + // Check overflow + ASSERT(std::numeric_limits::max() - buffer_memory_allocated_ >= amount); + buffer_memory_allocated_ += amount; + updateAccountClass(); +} + +void BufferMemoryAccountImpl::clearDownstream() { + if (reset_handler_.has_value()) { + reset_handler_.reset(); + factory_->unregisterAccount(shared_this_, current_bucket_idx_); + current_bucket_idx_ = -1; + shared_this_ = nullptr; + } +} + } // namespace Buffer } // namespace Envoy diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 6afc4d8602323..d3cbb1f20f3c3 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -3,6 +3,10 @@ #include #include +#include "envoy/buffer/buffer.h" +#include "envoy/common/optref.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" + #include "source/common/buffer/buffer_impl.h" namespace Envoy { @@ -72,15 +76,142 @@ class WatermarkBuffer : public OwnedImpl { using WatermarkBufferPtr = std::unique_ptr; +class WatermarkBufferFactory; + +/** + * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and + * slices that originate from those buffers, or are untagged and pass through an + * associated buffer. + * + * This BufferMemoryAccount is produced by the *WatermarkBufferFactory*. + */ +class BufferMemoryAccountImpl : public BufferMemoryAccount { +public: + // Used to create the account, and complete wiring with the factory + // and shared_this_. + static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory, + Http::StreamResetHandler& reset_handler); + ~BufferMemoryAccountImpl() override { + ASSERT(buffer_memory_allocated_ == 0); + ASSERT(!reset_handler_.has_value()); + } + + // Make not copyable + BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete; + BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete; + + // Make not movable. + BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete; + BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete; + + uint64_t balance() const { return buffer_memory_allocated_; } + void charge(uint64_t amount) override; + void credit(uint64_t amount) override; + + // Clear the associated downstream, preparing the account to be destroyed. + // This is idempotent. + void clearDownstream() override; + + void resetDownstream() override { + if (reset_handler_.has_value()) { + reset_handler_->resetStream(Http::StreamResetReason::OverloadManager); + } + } + + // The number of memory classes the Account expects to exists. See + // *WatermarkBufferFactory* for details on the memory classes. + static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8; + +private: + BufferMemoryAccountImpl(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler) + : factory_(factory), reset_handler_(reset_handler) {} + + // Returns the class index based off of the buffer_memory_allocated_ + // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was + // just modified. + // The class indexes returned are based on buckets of powers of two, if the + // account is above a minimum threshold. Returned class index range is [-1, + // NUM_MEMORY_CLASSES_). + int balanceToClassIndex(); + void updateAccountClass(); + + uint64_t buffer_memory_allocated_ = 0; + // Current bucket index where the account is being tracked in. + int current_bucket_idx_ = -1; + + WatermarkBufferFactory* factory_ = nullptr; + + OptRef reset_handler_; + // Keep a copy of the shared_ptr pointing to this account. We opted to go this + // route rather than enable_shared_from_this to avoid wasteful atomic + // operations e.g. when updating the tracking of the account. + // This is set through the createAccount static method which is the only way to + // instantiate an instance of this class. This should is cleared when + // unregistering from the factory. + BufferMemoryAccountSharedPtr shared_this_ = nullptr; +}; + +/** + * The WatermarkBufferFactory creates *WatermarkBuffer*s and + * *BufferMemoryAccountImpl* that can be used to bind to the created buffers + * from a given downstream (and corresponding upstream, if one exists). The + * accounts can then be used to reset the underlying stream. + * + * Any account produced by this factory might be tracked by the factory using the + * following scheme: + * + * 1) Is the account balance >= 1MB? If not don't track. + * 2) For all accounts above the minimum threshold for tracking, put the account + * into one of the *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. + * + * We keep buckets containing accounts within a "memory class", which are + * power of two buckets. For example, with a minimum threshold of 1MB, our + * first bucket contains [1MB, 2MB) accounts, the second bucket contains + * [2MB, 4MB), and so forth for + * *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. These buckets + * allow us to coarsely track accounts, and if overloaded we can easily + * target more expensive streams. + * + * As the account balance changes, the account informs the Watermark Factory + * if the bucket for that account has changed. See + * *BufferMemoryAccountImpl::balanceToClassIndex()* for details on the memory + * class for a given account balance. + * + * TODO(kbaichoo): Update this documentation when we make the minimum account + * threshold configurable. + * + */ class WatermarkBufferFactory : public WatermarkFactory { public: + WatermarkBufferFactory(const envoy::config::bootstrap::v3::BufferFactoryConfig& config); + // Buffer::WatermarkFactory + ~WatermarkBufferFactory() override; InstancePtr createBuffer(std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) override { return std::make_unique(below_low_watermark, above_high_watermark, above_overflow_watermark); } + + BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; + + // Called by BufferMemoryAccountImpls created by the factory on account class + // updated. + void updateAccountClass(const BufferMemoryAccountSharedPtr& account, int current_class, + int new_class); + + uint32_t bitshift() const { return bitshift_; } + + // Unregister a buffer memory account. + virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class); + +protected: + // Enable subclasses to inspect the mapping. + using MemoryClassesToAccountsSet = std::array, + BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; + MemoryClassesToAccountsSet size_class_account_sets_; + uint32_t bitshift_; }; } // namespace Buffer diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index c9c48cb31a657..1bdd31c613946 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -61,7 +61,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, : name_(name), api_(api), buffer_factory_(watermark_factory != nullptr ? watermark_factory - : std::make_shared()), + : std::make_shared( + api.bootstrap().buffer_factory_config())), scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)), thread_local_delete_cb_( base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })), diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 5641c155dcead..3321e1126726b 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -276,13 +276,15 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod // Set the account to start accounting if enabled. This is still a // work-in-progress, and will be removed when other features using the // accounting are implemented. - Buffer::BufferMemoryAccountSharedPtr downstream_request_account; + Buffer::BufferMemoryAccountSharedPtr downstream_stream_account; if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) { - downstream_request_account = std::make_shared(); - response_encoder.getStream().setAccount(downstream_request_account); + // Create account, wiring the stream to use it. + auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory(); + downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream()); + response_encoder.getStream().setAccount(downstream_stream_account); } ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), - std::move(downstream_request_account))); + std::move(downstream_stream_account))); accumulated_requests_++; if (config_.maxRequestsPerConnection() > 0 && @@ -1517,6 +1519,7 @@ void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_ // 1) We TX an app level reset // 2) The codec TX a codec level reset // 3) The codec RX a reset + // 4) The overload manager reset the stream // If we need to differentiate we need to do it inside the codec. Can start with this. ENVOY_STREAM_LOG(debug, "stream reset", *this); connection_manager_.stats_.named_.downstream_rq_rx_reset_.inc(); @@ -1531,6 +1534,14 @@ void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_ filter_manager_.streamInfo().setResponseCodeDetails(encoder_details); } + // Check if we're in the overload manager reset case. + // encoder_details should be empty in this case as we don't have a codec error. + if (encoder_details.empty() && reset_reason == StreamResetReason::OverloadManager) { + filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::OverloadManager); + filter_manager_.streamInfo().setResponseCodeDetails( + StreamInfo::ResponseCodeDetails::get().Overload); + } + connection_manager_.doDeferredStreamDestroy(*this); } diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index cb0af92e83bc9..afd530d9df75d 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -350,6 +350,21 @@ void StreamEncoderImpl::resetStream(StreamResetReason reason) { connection_.onResetStreamBase(reason); } +void ResponseEncoderImpl::resetStream(StreamResetReason reason) { + // Clear the downstream on the account since we're resetting the downstream. + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + + // For H1, we use idleTimeouts to cancel streams unless there was an + // explicit protocol error prior to sending a response to the downstream + // in which case we send a local reply. + // TODO(kbaichoo): If we want snappier resets of H1 streams we can + // 1) Send local reply if no response data sent yet + // 2) Invoke the idle timeout sooner to close underlying connection + StreamEncoderImpl::resetStream(reason); +} + void StreamEncoderImpl::readDisable(bool disable) { if (disable) { ++read_disable_calls_; diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index d67412706a712..b6f32a16d5f6b 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -69,8 +69,12 @@ class StreamEncoderImpl : public virtual StreamEncoder, // require a flush timeout not already covered by other timeouts. } - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - // TODO(kbaichoo): implement account tracking for H1. + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { + // TODO(kbaichoo): implement account tracking for H1. Particularly, binding + // the account to the buffers used. The current wiring is minimal, and used + // to ensure the memory_account gets notified that the downstream request is + // closing. + buffer_memory_account_ = account; } void setIsResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; } @@ -88,6 +92,7 @@ class StreamEncoderImpl : public virtual StreamEncoder, static const std::string CRLF; static const std::string LAST_CHUNK; + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_; ConnectionImpl& connection_; uint32_t read_disable_calls_{}; bool disable_chunk_encoding_ : 1; @@ -134,6 +139,18 @@ class ResponseEncoderImpl : public StreamEncoderImpl, public ResponseEncoder { : StreamEncoderImpl(connection), stream_error_on_invalid_http_message_(stream_error_on_invalid_http_message) {} + ~ResponseEncoderImpl() override { + // Only the downstream stream should clear the downstream of the + // memory account. + // + // There are cases where a corresponding upstream stream dtor might + // be called, but the downstream stream isn't going to terminate soon + // such as StreamDecoderFilterCallbacks::recreateStream(). + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + } + bool startedResponse() { return started_response_; } // Http::ResponseEncoder @@ -145,6 +162,9 @@ class ResponseEncoderImpl : public StreamEncoderImpl, public ResponseEncoder { return stream_error_on_invalid_http_message_; } + // Http1::StreamEncoderImpl + void resetStream(StreamResetReason reason) override; + private: bool started_response_{}; const bool stream_error_on_invalid_http_message_; diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 0ae50eb547d27..f782d64d5daf5 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -157,6 +157,22 @@ void ConnectionImpl::StreamImpl::destroy() { parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); } +void ConnectionImpl::ServerStreamImpl::destroy() { + // Only the downstream stream should clear the downstream of the + // memory account. + // This occurs in destroy as we want to ensure the Stream does not get + // reset called on it from the account. + // + // There are cases where a corresponding upstream stream dtor might + // be called, but the downstream stream isn't going to terminate soon + // such as StreamDecoderFilterCallbacks::recreateStream(). + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + + StreamImpl::destroy(); +} + static void insertHeader(std::vector& headers, const HeaderEntry& header) { uint8_t flags = 0; if (header.key().isReference()) { @@ -530,6 +546,15 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e } } +void ConnectionImpl::ServerStreamImpl::resetStream(StreamResetReason reason) { + // Clear the downstream on the account since we're resetting the downstream. + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + + StreamImpl::resetStream(reason); +} + void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) { // Higher layers expect calling resetStream() to immediately raise reset callbacks. runResetCallbacks(reason); diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 6b86ee41b9aa2..95758a1a91193 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -192,7 +192,7 @@ class ConnectionImpl : public virtual Connection, // TODO(mattklein123): Optimally this would be done in the destructor but there are currently // deferred delete lifetime issues that need sorting out if the destructor of the stream is // going to be able to refer to the parent connection. - void destroy(); + virtual void destroy(); void disarmStreamIdleTimer() { if (stream_idle_timer_ != nullptr) { // To ease testing and the destructor assertion. @@ -388,6 +388,7 @@ class ConnectionImpl : public virtual Connection, : StreamImpl(parent, buffer_limit), headers_or_trailers_(RequestHeaderMapImpl::create()) {} // StreamImpl + void destroy() override; void submitHeaders(const std::vector& final_headers, nghttp2_data_provider* provider) override; StreamDecoder& decoder() override { return *request_decoder_; } @@ -407,6 +408,7 @@ class ConnectionImpl : public virtual Connection, return createHeaderMap(trailers); } void createPendingFlushTimer() override; + void resetStream(StreamResetReason reason) override; // ResponseEncoder void encode100ContinueHeaders(const ResponseHeaderMap& headers) override; diff --git a/source/common/quic/envoy_quic_client_stream.h b/source/common/quic/envoy_quic_client_stream.h index 0e82d8622319e..fd051332e44fb 100644 --- a/source/common/quic/envoy_quic_client_stream.h +++ b/source/common/quic/envoy_quic_client_stream.h @@ -49,9 +49,6 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, void resetStream(Http::StreamResetReason reason) override; void setFlushTimeout(std::chrono::milliseconds) override {} - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - // TODO(kbaichoo): implement account tracking for QUIC. - } // quic::QuicSpdyStream void OnBodyAvailable() override; void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; diff --git a/source/common/quic/envoy_quic_server_stream.cc b/source/common/quic/envoy_quic_server_stream.cc index 86ac524b5e660..5dbf8f47351e7 100644 --- a/source/common/quic/envoy_quic_server_stream.cc +++ b/source/common/quic/envoy_quic_server_stream.cc @@ -111,6 +111,10 @@ void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*meta } void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) { + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + if (local_end_stream_ && !reading_stopped()) { // This is after 200 early response. Reset with QUIC_STREAM_NO_ERROR instead // of propagating original reset reason. In QUICHE if a stream stops reading @@ -298,6 +302,21 @@ void EnvoyQuicServerStream::OnConnectionClosed(quic::QuicErrorCode error, quic::QuicSpdyServerStreamBase::OnConnectionClosed(error, source); } +void EnvoyQuicServerStream::CloseWriteSide() { + // Clear the downstream since the stream should not write additional data + // after this is called, e.g. cannot reset the stream. + // Only the downstream stream should clear the downstream of the + // memory account. + // + // There are cases where a corresponding upstream stream dtor might + // be called, but the downstream stream isn't going to terminate soon + // such as StreamDecoderFilterCallbacks::recreateStream(). + if (buffer_memory_account_) { + buffer_memory_account_->clearDownstream(); + } + quic::QuicSpdyServerStreamBase::CloseWriteSide(); +} + void EnvoyQuicServerStream::OnClose() { quic::QuicSpdyServerStreamBase::OnClose(); if (isDoingWatermarkAccounting()) { diff --git a/source/common/quic/envoy_quic_server_stream.h b/source/common/quic/envoy_quic_server_stream.h index e0b98e835fdad..3cb8d1f004145 100644 --- a/source/common/quic/envoy_quic_server_stream.h +++ b/source/common/quic/envoy_quic_server_stream.h @@ -55,9 +55,6 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. } - void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { - // TODO(kbaichoo): implement account tracking for QUIC. - } // quic::QuicSpdyStream void OnBodyAvailable() override; bool OnStopSending(quic::QuicRstStreamErrorCode error) override; @@ -67,6 +64,7 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, void OnCanWrite() override; // quic::QuicSpdyServerStreamBase void OnConnectionClosed(quic::QuicErrorCode error, quic::ConnectionCloseSource source) override; + void CloseWriteSide() override; void clearWatermarkBuffer(); diff --git a/source/common/quic/envoy_quic_stream.h b/source/common/quic/envoy_quic_stream.h index f28cae4cf0119..f5c4be6900dd0 100644 --- a/source/common/quic/envoy_quic_stream.h +++ b/source/common/quic/envoy_quic_stream.h @@ -1,5 +1,6 @@ #pragma once +#include "envoy/buffer/buffer.h" #include "envoy/config/core/v3/protocol.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/http/codec.h" @@ -79,6 +80,10 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder, return connection()->addressProvider().localAddress(); } + void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { + buffer_memory_account_ = account; + } + // SendBufferMonitor void updateBytesBuffered(size_t old_buffered_bytes, size_t new_buffered_bytes) override { if (new_buffered_bytes == old_buffered_bytes) { @@ -130,6 +135,9 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder, const envoy::config::core::v3::Http3ProtocolOptions& http3_options_; bool close_connection_upon_invalid_header_{false}; absl::string_view details_; + // TODO(kbaichoo): bind the account to the QUIC buffers to enable tracking of + // memory allocated within QUIC buffers. + Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_ = nullptr; private: // Keeps track of bytes buffered in the stream send buffer in QUICHE and reacts diff --git a/source/common/quic/envoy_quic_utils.cc b/source/common/quic/envoy_quic_utils.cc index 0a42e22cbbe73..76291b3a161db 100644 --- a/source/common/quic/envoy_quic_utils.cc +++ b/source/common/quic/envoy_quic_utils.cc @@ -73,6 +73,7 @@ quic::QuicRstStreamErrorCode envoyResetReasonToQuicRstError(Http::StreamResetRea case Http::StreamResetReason::ConnectionTermination: return quic::QUIC_STREAM_CONNECTION_ERROR; case Http::StreamResetReason::LocalReset: + case Http::StreamResetReason::OverloadManager: return quic::QUIC_STREAM_CANCELLED; default: return quic::QUIC_BAD_APPLICATION_PAYLOAD; diff --git a/source/extensions/filters/common/rbac/matchers.cc b/source/extensions/filters/common/rbac/matchers.cc index 25d8e2de63b54..fc010bc46c68e 100644 --- a/source/extensions/filters/common/rbac/matchers.cc +++ b/source/extensions/filters/common/rbac/matchers.cc @@ -23,6 +23,8 @@ MatcherConstSharedPtr Matcher::create(const envoy::config::rbac::v3::Permission& IPMatcher::Type::DownstreamLocal); case envoy::config::rbac::v3::Permission::RuleCase::kDestinationPort: return std::make_shared(permission.destination_port()); + case envoy::config::rbac::v3::Permission::RuleCase::kDestinationPortRange: + return std::make_shared(permission.destination_port_range()); case envoy::config::rbac::v3::Permission::RuleCase::kAny: return std::make_shared(); case envoy::config::rbac::v3::Permission::RuleCase::kMetadata: @@ -159,6 +161,34 @@ bool PortMatcher::matches(const Network::Connection&, const Envoy::Http::Request return ip && ip->port() == port_; } +PortRangeMatcher::PortRangeMatcher(const ::envoy::type::v3::Int32Range& range) + : start_(range.start()), end_(range.end()) { + auto start = range.start(); + auto end = range.end(); + if (start < 0 || start > 65536) { + throw EnvoyException(fmt::format("range start {} is out of bounds", start)); + } + if (end < 0 || end > 65536) { + throw EnvoyException(fmt::format("range end {} is out of bounds", end)); + } + if (start >= end) { + throw EnvoyException( + fmt::format("range start {} cannot be greater or equal than range end {}", start, end)); + } +} + +bool PortRangeMatcher::matches(const Network::Connection&, const Envoy::Http::RequestHeaderMap&, + const StreamInfo::StreamInfo& info) const { + const Envoy::Network::Address::Ip* ip = + info.downstreamAddressProvider().localAddress().get()->ip(); + if (ip) { + const auto port = ip->port(); + return start_ <= port && port < end_; + } else { + return false; + } +} + bool AuthenticatedMatcher::matches(const Network::Connection& connection, const Envoy::Http::RequestHeaderMap&, const StreamInfo::StreamInfo&) const { diff --git a/source/extensions/filters/common/rbac/matchers.h b/source/extensions/filters/common/rbac/matchers.h index 472b4a2c9c17e..79d43fb59f0c2 100644 --- a/source/extensions/filters/common/rbac/matchers.h +++ b/source/extensions/filters/common/rbac/matchers.h @@ -163,6 +163,18 @@ class PortMatcher : public Matcher { const uint32_t port_; }; +class PortRangeMatcher : public Matcher { +public: + PortRangeMatcher(const ::envoy::type::v3::Int32Range& range); + + bool matches(const Network::Connection&, const Envoy::Http::RequestHeaderMap&, + const StreamInfo::StreamInfo& info) const override; + +private: + const uint32_t start_; + const uint32_t end_; +}; + /** * Matches the principal name as described in the peer certificate. Uses the URI SAN first. If that * field is not present, uses the subject instead. diff --git a/source/server/BUILD b/source/server/BUILD index 26825d04a6de3..630e61710f15a 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -157,6 +157,7 @@ envoy_cc_library( name = "active_tcp_listener_headers", hdrs = [ "active_stream_listener_base.h", + "active_tcp_listener.h", "active_tcp_socket.h", ], deps = [ @@ -171,7 +172,10 @@ envoy_cc_library( "//envoy/network:listen_socket_interface", "//envoy/network:listener_interface", "//envoy/server:listener_manager_interface", + "//source/common/common:assert_lib", "//source/common/common:linked_object", + "//source/common/network:connection_lib", + "//source/common/stats:timespan_lib", ], ) diff --git a/source/server/active_stream_listener_base.cc b/source/server/active_stream_listener_base.cc index 39d336034c5fc..8bec6788040a9 100644 --- a/source/server/active_stream_listener_base.cc +++ b/source/server/active_stream_listener_base.cc @@ -2,6 +2,9 @@ #include "envoy/network/filter.h" +#include "source/common/stats/timespan_impl.h" +#include "source/server/active_tcp_listener.h" + namespace Envoy { namespace Server { @@ -58,5 +61,61 @@ void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& sock newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info)); } +ActiveConnections::ActiveConnections(ActiveTcpListener& listener, + const Network::FilterChain& filter_chain) + : listener_(listener), filter_chain_(filter_chain) {} + +ActiveConnections::~ActiveConnections() { + // connections should be defer deleted already. + ASSERT(connections_.empty()); +} + +ActiveTcpConnection::ActiveTcpConnection(ActiveConnections& active_connections, + Network::ConnectionPtr&& new_connection, + TimeSource& time_source, + std::unique_ptr&& stream_info) + : stream_info_(std::move(stream_info)), active_connections_(active_connections), + connection_(std::move(new_connection)), + conn_length_(new Stats::HistogramCompletableTimespanImpl( + active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) { + // We just universally set no delay on connections. Theoretically we might at some point want + // to make this configurable. + connection_->noDelay(true); + auto& listener = active_connections_.listener_; + listener.stats_.downstream_cx_total_.inc(); + listener.stats_.downstream_cx_active_.inc(); + listener.per_worker_stats_.downstream_cx_total_.inc(); + listener.per_worker_stats_.downstream_cx_active_.inc(); + + // Active connections on the handler (not listener). The per listener connections have already + // been incremented at this point either via the connection balancer or in the socket accept + // path if there is no configured balancer. + listener.parent_.incNumConnections(); +} + +ActiveTcpConnection::~ActiveTcpConnection() { + ActiveStreamListenerBase::emitLogs(*active_connections_.listener_.config_, *stream_info_); + auto& listener = active_connections_.listener_; + listener.stats_.downstream_cx_active_.dec(); + listener.stats_.downstream_cx_destroy_.inc(); + listener.per_worker_stats_.downstream_cx_active_.dec(); + conn_length_->complete(); + + // Active listener connections (not handler). + listener.decNumConnections(); + + // Active handler connections (not listener). + listener.parent_.decNumConnections(); +} + +void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) { + ENVOY_LOG(trace, "[C{}] connection on event {}", connection_->id(), static_cast(event)); + // Any event leads to destruction of the connection. + if (event == Network::ConnectionEvent::LocalClose || + event == Network::ConnectionEvent::RemoteClose) { + active_connections_.listener_.removeConnection(*this); + } +} + } // namespace Server } // namespace Envoy diff --git a/source/server/active_stream_listener_base.h b/source/server/active_stream_listener_base.h index 2d99e3965bf69..f57a5a693c0eb 100644 --- a/source/server/active_stream_listener_base.h +++ b/source/server/active_stream_listener_base.h @@ -10,6 +10,7 @@ #include "envoy/network/connection.h" #include "envoy/network/connection_handler.h" #include "envoy/network/listener.h" +#include "envoy/stats/timespan.h" #include "envoy/stream_info/stream_info.h" #include "source/common/common/linked_object.h" @@ -135,5 +136,48 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase, Event::Dispatcher& dispatcher_; }; +struct ActiveTcpConnection; +class ActiveTcpListener; + +/** + * Wrapper for a group of active connections which are attached to the same filter chain context. + */ +class ActiveConnections : public Event::DeferredDeletable { +public: + ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain); + ~ActiveConnections() override; + + // listener filter chain pair is the owner of the connections + ActiveTcpListener& listener_; + const Network::FilterChain& filter_chain_; + // Owned connections + std::list> connections_; +}; + +/** + * Wrapper for an active TCP connection owned by this handler. + */ +struct ActiveTcpConnection : LinkedObject, + public Event::DeferredDeletable, + public Network::ConnectionCallbacks, + Logger::Loggable { + ActiveTcpConnection(ActiveConnections& active_connections, + Network::ConnectionPtr&& new_connection, TimeSource& time_system, + std::unique_ptr&& stream_info); + ~ActiveTcpConnection() override; + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + std::unique_ptr stream_info_; + ActiveConnections& active_connections_; + Network::ConnectionPtr connection_; + Stats::TimespanPtr conn_length_; +}; + +using ActiveConnectionPtr = std::unique_ptr; +using ActiveConnectionCollectionPtr = std::unique_ptr; + } // namespace Server } // namespace Envoy diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index cee5494394436..1d867d4a1033c 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -167,9 +167,9 @@ void ActiveTcpListener::newActiveConnection(const Network::FilterChain& filter_c Network::ServerConnectionPtr server_conn_ptr, std::unique_ptr stream_info) { auto& active_connections = getOrCreateActiveConnections(filter_chain); - ActiveTcpConnectionPtr active_connection( - new ActiveTcpConnection(active_connections, std::move(server_conn_ptr), - dispatcher().timeSource(), std::move(stream_info))); + auto active_connection = + std::make_unique(active_connections, std::move(server_conn_ptr), + dispatcher().timeSource(), std::move(stream_info)); // If the connection is already closed, we can just let this connection immediately die. if (active_connection->connection_->state() != Network::Connection::State::Closed) { ENVOY_CONN_LOG(debug, "new connection from {}", *active_connection->connection_, @@ -207,61 +207,5 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) { }); } -ActiveConnections::ActiveConnections(ActiveTcpListener& listener, - const Network::FilterChain& filter_chain) - : listener_(listener), filter_chain_(filter_chain) {} - -ActiveConnections::~ActiveConnections() { - // connections should be defer deleted already. - ASSERT(connections_.empty()); -} - -ActiveTcpConnection::ActiveTcpConnection(ActiveConnections& active_connections, - Network::ConnectionPtr&& new_connection, - TimeSource& time_source, - std::unique_ptr&& stream_info) - : stream_info_(std::move(stream_info)), active_connections_(active_connections), - connection_(std::move(new_connection)), - conn_length_(new Stats::HistogramCompletableTimespanImpl( - active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) { - // We just universally set no delay on connections. Theoretically we might at some point want - // to make this configurable. - connection_->noDelay(true); - auto& listener = active_connections_.listener_; - listener.stats_.downstream_cx_total_.inc(); - listener.stats_.downstream_cx_active_.inc(); - listener.per_worker_stats_.downstream_cx_total_.inc(); - listener.per_worker_stats_.downstream_cx_active_.inc(); - - // Active connections on the handler (not listener). The per listener connections have already - // been incremented at this point either via the connection balancer or in the socket accept - // path if there is no configured balancer. - listener.parent_.incNumConnections(); -} - -ActiveTcpConnection::~ActiveTcpConnection() { - ActiveStreamListenerBase::emitLogs(*active_connections_.listener_.config_, *stream_info_); - auto& listener = active_connections_.listener_; - listener.stats_.downstream_cx_active_.dec(); - listener.stats_.downstream_cx_destroy_.inc(); - listener.per_worker_stats_.downstream_cx_active_.dec(); - conn_length_->complete(); - - // Active listener connections (not handler). - listener.decNumConnections(); - - // Active handler connections (not listener). - listener.parent_.decNumConnections(); -} - -void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) { - ENVOY_LOG(trace, "[C{}] connection on event {}", connection_->id(), static_cast(event)); - // Any event leads to destruction of the connection. - if (event == Network::ConnectionEvent::LocalClose || - event == Network::ConnectionEvent::RemoteClose) { - active_connections_.listener_.removeConnection(*this); - } -} - } // namespace Server } // namespace Envoy diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index 9ea378f445395..79344d5afd474 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -11,12 +11,6 @@ namespace Envoy { namespace Server { - -struct ActiveTcpConnection; -using ActiveTcpConnectionPtr = std::unique_ptr; -class ActiveConnections; -using ActiveConnectionCollectionPtr = std::unique_ptr; - namespace { // Structure used to allow a unique_ptr to be captured in a posted lambda. See below. struct RebalancedSocket { @@ -106,44 +100,5 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks, }; using ActiveTcpListenerOptRef = absl::optional>; - -/** - * Wrapper for a group of active connections which are attached to the same filter chain context. - */ -class ActiveConnections : public Event::DeferredDeletable { -public: - ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain); - ~ActiveConnections() override; - - // listener filter chain pair is the owner of the connections - ActiveTcpListener& listener_; - const Network::FilterChain& filter_chain_; - // Owned connections - std::list connections_; -}; - -/** - * Wrapper for an active TCP connection owned by this handler. - */ -struct ActiveTcpConnection : LinkedObject, - public Event::DeferredDeletable, - public Network::ConnectionCallbacks, - Logger::Loggable { - ActiveTcpConnection(ActiveConnections& active_connections, - Network::ConnectionPtr&& new_connection, TimeSource& time_system, - std::unique_ptr&& stream_info); - ~ActiveTcpConnection() override; - - // Network::ConnectionCallbacks - void onEvent(Network::ConnectionEvent event) override; - void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} - - std::unique_ptr stream_info_; - ActiveConnections& active_connections_; - Network::ConnectionPtr connection_; - Stats::TimespanPtr conn_length_; -}; - } // namespace Server } // namespace Envoy diff --git a/source/server/config_validation/BUILD b/source/server/config_validation/BUILD index f33cd523ce856..bc6fd2391fcd5 100644 --- a/source/server/config_validation/BUILD +++ b/source/server/config_validation/BUILD @@ -26,6 +26,7 @@ envoy_cc_library( "//envoy/api:api_interface", "//envoy/filesystem:filesystem_interface", "//source/common/api:api_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/source/server/config_validation/api.cc b/source/server/config_validation/api.cc index 75f8e755797c2..c9d4a2546d625 100644 --- a/source/server/config_validation/api.cc +++ b/source/server/config_validation/api.cc @@ -8,8 +8,9 @@ namespace Api { ValidationImpl::ValidationImpl(Thread::ThreadFactory& thread_factory, Stats::Store& stats_store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, - Random::RandomGenerator& random_generator) - : Impl(thread_factory, stats_store, time_system, file_system, random_generator), + Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap) + : Impl(thread_factory, stats_store, time_system, file_system, random_generator, bootstrap), time_system_(time_system) {} Event::DispatcherPtr ValidationImpl::allocateDispatcher(const std::string& name) { diff --git a/source/server/config_validation/api.h b/source/server/config_validation/api.h index 6725362caa733..c1a64a65df54a 100644 --- a/source/server/config_validation/api.h +++ b/source/server/config_validation/api.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/api/api.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/timer.h" #include "envoy/filesystem/filesystem.h" @@ -17,7 +18,8 @@ class ValidationImpl : public Impl { public: ValidationImpl(Thread::ThreadFactory& thread_factory, Stats::Store& stats_store, Event::TimeSystem& time_system, Filesystem::Instance& file_system, - Random::RandomGenerator& random_generator); + Random::RandomGenerator& random_generator, + const envoy::config::bootstrap::v3::Bootstrap& bootstrap); Event::DispatcherPtr allocateDispatcher(const std::string& name) override; Event::DispatcherPtr allocateDispatcher(const std::string& name, diff --git a/source/server/config_validation/server.cc b/source/server/config_validation/server.cc index 3d37c7ac56c49..1eaac379016ab 100644 --- a/source/server/config_validation/server.cc +++ b/source/server/config_validation/server.cc @@ -45,8 +45,9 @@ ValidationInstance::ValidationInstance( : options_(options), validation_context_(options_.allowUnknownStaticFields(), !options.rejectUnknownDynamicFields(), !options.ignoreUnknownDynamicFields()), - stats_store_(store), api_(new Api::ValidationImpl(thread_factory, store, time_system, - file_system, random_generator_)), + stats_store_(store), + api_(new Api::ValidationImpl(thread_factory, store, time_system, file_system, + random_generator_, bootstrap_)), dispatcher_(api_->allocateDispatcher("main_thread")), singleton_manager_(new Singleton::ManagerImpl(api_->threadFactory())), access_log_manager_(options.fileFlushIntervalMsec(), *api_, *dispatcher_, access_log_lock, @@ -78,22 +79,21 @@ void ValidationInstance::initialize(const Options& options, // If we get all the way through that stripped-down initialization flow, to the point where we'd // be ready to serve, then the config has passed validation. // Handle configuration that needs to take place prior to the main configuration load. - envoy::config::bootstrap::v3::Bootstrap bootstrap; - InstanceUtil::loadBootstrapConfig(bootstrap, options, + InstanceUtil::loadBootstrapConfig(bootstrap_, options, messageValidationContext().staticValidationVisitor(), *api_); - Config::Utility::createTagProducer(bootstrap); - bootstrap.mutable_node()->set_hidden_envoy_deprecated_build_version(VersionInfo::version()); + Config::Utility::createTagProducer(bootstrap_); + bootstrap_.mutable_node()->set_hidden_envoy_deprecated_build_version(VersionInfo::version()); local_info_ = std::make_unique( - stats().symbolTable(), bootstrap.node(), bootstrap.node_context_params(), local_address, + stats().symbolTable(), bootstrap_.node(), bootstrap_.node_context_params(), local_address, options.serviceZone(), options.serviceClusterName(), options.serviceNodeName()); overload_manager_ = std::make_unique( - dispatcher(), stats(), threadLocal(), bootstrap.overload_manager(), + dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager(), messageValidationContext().staticValidationVisitor(), *api_, options_); - Configuration::InitialImpl initial_config(bootstrap, options); - initial_config.initAdminAccessLog(bootstrap, *this); + Configuration::InitialImpl initial_config(bootstrap_, options); + initial_config.initAdminAccessLog(bootstrap_, *this); admin_ = std::make_unique(initial_config.admin().address()); listener_manager_ = std::make_unique(*this, *this, *this, false, quic_stat_names_); @@ -107,7 +107,7 @@ void ValidationInstance::initialize(const Options& options, localInfo(), *secret_manager_, messageValidationContext(), *api_, http_context_, grpc_context_, router_context_, accessLogManager(), singletonManager(), options, quic_stat_names_); - config_.initialize(bootstrap, *this, *cluster_manager_factory_); + config_.initialize(bootstrap_, *this, *cluster_manager_factory_); runtime().initialize(clusterManager()); clusterManager().setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); }); } diff --git a/source/server/config_validation/server.h b/source/server/config_validation/server.h index 356769f2b5962..472f88d45fb7c 100644 --- a/source/server/config_validation/server.h +++ b/source/server/config_validation/server.h @@ -2,6 +2,7 @@ #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/core/v3/config_source.pb.h" #include "envoy/config/listener/v3/listener.pb.h" #include "envoy/config/listener/v3/listener_components.pb.h" @@ -195,6 +196,7 @@ class ValidationInstance final : Logger::Loggable, ProtobufMessage::ProdValidationContextImpl validation_context_; Stats::IsolatedStoreImpl& stats_store_; ThreadLocal::InstanceImpl thread_local_; + envoy::config::bootstrap::v3::Bootstrap bootstrap_; Api::ApiPtr api_; Event::DispatcherPtr dispatcher_; std::unique_ptr admin_; diff --git a/source/server/server.cc b/source/server/server.cc index 8ca82f4909315..6ab74b6007f7a 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -74,10 +74,10 @@ InstanceImpl::InstanceImpl( time_source_(time_system), restarter_(restarter), start_time_(time(nullptr)), original_start_time_(start_time_), stats_store_(store), thread_local_(tls), random_generator_(std::move(random_generator)), - api_(new Api::Impl(thread_factory, store, time_system, file_system, *random_generator_, - process_context ? ProcessContextOptRef(std::ref(*process_context)) - : absl::nullopt, - watermark_factory)), + api_(new Api::Impl( + thread_factory, store, time_system, file_system, *random_generator_, bootstrap_, + process_context ? ProcessContextOptRef(std::ref(*process_context)) : absl::nullopt, + watermark_factory)), dispatcher_(api_->allocateDispatcher("main_thread")), singleton_manager_(new Singleton::ManagerImpl(api_->threadFactory())), handler_(new ConnectionHandlerImpl(*dispatcher_, absl::nullopt)), diff --git a/source/server/server.h b/source/server/server.h index 36a017416317c..9234f6c2ae7b7 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -345,6 +345,7 @@ class InstanceImpl final : Logger::Loggable, Assert::ActionRegistrationPtr envoy_bug_action_registration_; ThreadLocal::Instance& thread_local_; Random::RandomGeneratorPtr random_generator_; + envoy::config::bootstrap::v3::Bootstrap bootstrap_; Api::ApiPtr api_; Event::DispatcherPtr dispatcher_; std::unique_ptr admin_; @@ -367,7 +368,6 @@ class InstanceImpl final : Logger::Loggable, std::unique_ptr worker_guard_dog_; bool terminated_; std::unique_ptr file_logger_; - envoy::config::bootstrap::v3::Bootstrap bootstrap_; ConfigTracker::EntryOwnerPtr config_tracker_entry_; SystemTime bootstrap_config_update_time_; Grpc::AsyncClientManagerPtr async_client_manager_; diff --git a/test/common/buffer/BUILD b/test/common/buffer/BUILD index bd01534ca6cae..85d7f1dbd516a 100644 --- a/test/common/buffer/BUILD +++ b/test/common/buffer/BUILD @@ -83,6 +83,18 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "buffer_memory_account_test", + srcs = ["buffer_memory_account_test.cc"], + deps = [ + "//source/common/buffer:buffer_lib", + "//test/integration:tracked_watermark_buffer_lib", + "//test/mocks/buffer:buffer_mocks", + "//test/mocks/http:stream_reset_handler_mock", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "zero_copy_input_stream_test", srcs = ["zero_copy_input_stream_test.cc"], diff --git a/test/common/buffer/buffer_memory_account_test.cc b/test/common/buffer/buffer_memory_account_test.cc new file mode 100644 index 0000000000000..a1c1cce2b3080 --- /dev/null +++ b/test/common/buffer/buffer_memory_account_test.cc @@ -0,0 +1,494 @@ +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/http/codec.h" + +#include "source/common/buffer/buffer_impl.h" + +#include "test/integration/tracked_watermark_buffer.h" +#include "test/mocks/http/stream_reset_handler.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Buffer { +namespace { + +using testing::_; + +using MemoryClassesToAccountsSet = std::array, + BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; + +constexpr uint64_t kMinimumBalanceToTrack = 1024 * 1024; +constexpr uint64_t kThresholdForFinalBucket = 128 * 1024 * 1024; + +// Gets the balance of an account assuming it's a BufferMemoryAccountImpl. +static int getBalance(const BufferMemoryAccountSharedPtr& account) { + return static_cast(account.get())->balance(); +} + +// Check the memory_classes_to_account is empty. +static void noAccountsTracked(MemoryClassesToAccountsSet& memory_classes_to_account) { + for (const auto& set : memory_classes_to_account) { + EXPECT_TRUE(set.empty()); + } +} + +class BufferMemoryAccountTest : public testing::Test { +protected: + TrackedWatermarkBufferFactory factory_{kMinimumBalanceToTrack}; + Http::MockStreamResetHandler mock_reset_handler_; +}; + +TEST_F(BufferMemoryAccountTest, ManagesAccountBalance) { + auto account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer(account); + ASSERT_EQ(getBalance(account), 0); + + // Check the balance increases as expected. + { + // New slice created + buffer.add("Hello"); + EXPECT_EQ(getBalance(account), 4096); + + // Should just be added to existing slice. + buffer.add(" World!"); + EXPECT_EQ(getBalance(account), 4096); + + // Trigger new slice creation with add. + const std::string long_string(4096, 'a'); + buffer.add(long_string); + EXPECT_EQ(getBalance(account), 8192); + + // AppendForTest also adds new slice. + buffer.appendSliceForTest("Extra Slice"); + EXPECT_EQ(getBalance(account), 12288); + } + + // Check the balance drains as slices are consumed. + { + // Shouldn't trigger slice free yet + buffer.drain(4095); + EXPECT_EQ(getBalance(account), 12288); + + // Trigger slice reclaim. + buffer.drain(1); + EXPECT_EQ(getBalance(account), 8192); + + // Reclaim next slice + buffer.drain(std::string("Hello World!").length()); + EXPECT_EQ(getBalance(account), 4096); + + // Reclaim remaining + buffer.drain(std::string("Extra Slice").length()); + EXPECT_EQ(getBalance(account), 0); + } + + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, BufferAccountsForUnownedSliceMovedInto) { + auto account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl accounted_buffer(account); + + Buffer::OwnedImpl unowned_buffer; + unowned_buffer.add("Unaccounted Slice"); + ASSERT_EQ(getBalance(account), 0); + + // Transfer over buffer + accounted_buffer.move(unowned_buffer); + EXPECT_EQ(getBalance(account), 4096); + + accounted_buffer.drain(accounted_buffer.length()); + EXPECT_EQ(getBalance(account), 0); + + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, BufferFragmentsShouldNotHaveAnAssociatedAccount) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + const char data[] = "hello world"; + BufferFragmentImpl frag(data, 11, nullptr); + buffer_one.addBufferFragment(frag); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(buffer_one.length(), 11); + + // Transfer over buffer + buffer_two.move(buffer_one); + EXPECT_EQ(getBalance(buffer_two_account), 0); + EXPECT_EQ(buffer_two.length(), 11); + + buffer_two.drain(buffer_two.length()); + EXPECT_EQ(getBalance(buffer_two_account), 0); + EXPECT_EQ(buffer_two.length(), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, SliceRemainsAttachToOriginalAccountWhenMoved) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + buffer_one.add("Charged to Account One"); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + // Transfer over buffer, still tied to account one. + buffer_two.move(buffer_one); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_two.drain(buffer_two.length()); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, + SliceRemainsAttachToOriginalAccountWhenMovedUnlessCoalescedIntoExistingSlice) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + buffer_one.add("Will Coalesce"); + buffer_two.add("To be Coalesce into:"); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + // Transfer over buffer, slices coalesce, crediting account one. + buffer_two.move(buffer_one); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + buffer_two.drain(std::string("To be Coalesce into:Will Coalesce").length()); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, SliceCanRemainAttachedToOriginalAccountWhenMovedAndCoalescedInto) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + auto buffer_three_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_three(buffer_three_account); + ASSERT_EQ(getBalance(buffer_three_account), 0); + + buffer_one.add("Will Coalesce"); + buffer_two.add("To be Coalesce into:"); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + // Transfer buffers, leading to slice coalescing in third buffer. + buffer_three.move(buffer_two); + buffer_three.move(buffer_one); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + EXPECT_EQ(getBalance(buffer_three_account), 0); + + buffer_three.drain(std::string("To be Coalesce into:Will Coalesce").length()); + EXPECT_EQ(getBalance(buffer_two_account), 0); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); + buffer_three_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, LinearizedBufferShouldChargeItsAssociatedAccount) { + auto buffer_one_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_one(buffer_one_account); + ASSERT_EQ(getBalance(buffer_one_account), 0); + + auto buffer_two_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_two(buffer_two_account); + ASSERT_EQ(getBalance(buffer_two_account), 0); + + auto buffer_three_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_three(buffer_three_account); + ASSERT_EQ(getBalance(buffer_three_account), 0); + + const std::string long_string(4096, 'a'); + buffer_one.add(long_string); + buffer_two.add(long_string); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + + // Move into the third buffer. + buffer_three.move(buffer_one); + buffer_three.move(buffer_two); + EXPECT_EQ(getBalance(buffer_one_account), 4096); + EXPECT_EQ(getBalance(buffer_two_account), 4096); + EXPECT_EQ(getBalance(buffer_three_account), 0); + + // Linearize, which does a copy out of the slices. + buffer_three.linearize(8192); + EXPECT_EQ(getBalance(buffer_one_account), 0); + EXPECT_EQ(getBalance(buffer_two_account), 0); + EXPECT_EQ(getBalance(buffer_three_account), 8192); + + buffer_one_account->clearDownstream(); + buffer_two_account->clearDownstream(); + buffer_three_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ManagesAccountBalanceWhenPrepending) { + auto prepend_to_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_to_prepend_to(prepend_to_account); + ASSERT_EQ(getBalance(prepend_to_account), 0); + + auto prepend_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer_to_prepend(prepend_account); + ASSERT_EQ(getBalance(prepend_account), 0); + + Buffer::OwnedImpl unowned_buffer_to_prepend; + + unowned_buffer_to_prepend.add("World"); + buffer_to_prepend.add("Goodbye World"); + EXPECT_EQ(getBalance(prepend_account), 4096); + + // Prepend the buffers. + buffer_to_prepend_to.prepend(buffer_to_prepend); + EXPECT_EQ(getBalance(prepend_account), 4096); + EXPECT_EQ(getBalance(prepend_to_account), 0); + + buffer_to_prepend_to.prepend(unowned_buffer_to_prepend); + EXPECT_EQ(getBalance(prepend_to_account), 4096); + + // Prepend a string view. + buffer_to_prepend_to.prepend("Hello "); + EXPECT_EQ(getBalance(prepend_to_account), 8192); + + prepend_account->clearDownstream(); + prepend_to_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ExtractingSliceWithExistingStorageCreditsAccountOnce) { + auto buffer_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer(buffer_account); + ASSERT_EQ(getBalance(buffer_account), 0); + + buffer.appendSliceForTest("Slice 1"); + buffer.appendSliceForTest("Slice 2"); + EXPECT_EQ(getBalance(buffer_account), 8192); + + // Account should only be credited when slice is extracted. + // Not on slice dtor. + { + auto slice = buffer.extractMutableFrontSlice(); + EXPECT_EQ(getBalance(buffer_account), 4096); + } + + EXPECT_EQ(getBalance(buffer_account), 4096); + + buffer_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, NewReservationSlicesOnlyChargedAfterCommit) { + auto buffer_account = factory_.createAccount(mock_reset_handler_); + Buffer::OwnedImpl buffer(buffer_account); + ASSERT_EQ(getBalance(buffer_account), 0); + + auto reservation = buffer.reserveForRead(); + EXPECT_EQ(getBalance(buffer_account), 0); + + // We should only be charged for the slices committed. + reservation.commit(16384); + EXPECT_EQ(getBalance(buffer_account), 16384); + + buffer_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ReservationShouldNotChargeForExistingSlice) { + auto buffer_account = factory_.createAccount(mock_reset_handler_); + + Buffer::OwnedImpl buffer(buffer_account); + ASSERT_EQ(getBalance(buffer_account), 0); + + buffer.add("Many bytes remaining in this slice to use for reservation."); + EXPECT_EQ(getBalance(buffer_account), 4096); + + // The account shouldn't be charged again at commit since the commit + // uses memory from the slice already charged for. + auto reservation = buffer.reserveForRead(); + reservation.commit(2000); + EXPECT_EQ(getBalance(buffer_account), 4096); + + buffer_account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, AccountShouldNotBeTrackedByFactoryUnlessAboveMinimumBalance) { + auto account = factory_.createAccount(mock_reset_handler_); + + // Check not tracked + factory_.inspectMemoryClasses(noAccountsTracked); + + // Still below minimum + account->charge(2020); + factory_.inspectMemoryClasses(noAccountsTracked); + + account->charge(kMinimumBalanceToTrack); + + // Check now tracked + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, ClearingDownstreamShouldUnregisterTrackedAccounts) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kMinimumBalanceToTrack); + + // Check tracked + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + account->clearDownstream(); + + // Check no longer tracked + factory_.inspectMemoryClasses(noAccountsTracked); + + account->credit(getBalance(account)); +} + +TEST_F(BufferMemoryAccountTest, AccountCanResetStream) { + auto account = factory_.createAccount(mock_reset_handler_); + + EXPECT_CALL(mock_reset_handler_, resetStream(_)); + account->resetDownstream(); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, FactoryTracksAccountCorrectlyAsBalanceIncreases) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kMinimumBalanceToTrack); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + for (size_t i = 0; i < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1; ++i) { + // Double the balance to enter the higher buckets. + account->charge(getBalance(account)); + factory_.inspectMemoryClasses([i](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[i].size(), 0); + EXPECT_EQ(memory_classes_to_account[i + 1].size(), 1); + }); + } + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, FactoryTracksAccountCorrectlyAsBalanceDecreases) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kThresholdForFinalBucket); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 1); + }); + + for (int i = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 2; i > 0; --i) { + // Halve the balance to enter the lower buckets. + account->credit(getBalance(account) / 2); + factory_.inspectMemoryClasses([i](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[i + 1].size(), 0); + EXPECT_EQ(memory_classes_to_account[i].size(), 1); + }); + } + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, SizeSaturatesInLargestBucket) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kThresholdForFinalBucket); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 1); + }); + + account->charge(getBalance(account)); + + // Remains in final bucket. + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 1); + }); + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST_F(BufferMemoryAccountTest, RemainsInSameBucketIfChangesWithinThreshold) { + auto account = factory_.createAccount(mock_reset_handler_); + account->charge(kMinimumBalanceToTrack); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + // Charge to see in same bucket. + account->charge(kMinimumBalanceToTrack - 1); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + // Credit to see in same bucket. + account->credit(kMinimumBalanceToTrack - 1); + + factory_.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + }); + + account->credit(getBalance(account)); + account->clearDownstream(); +} + +TEST(WatermarkBufferFactoryTest, CanConfigureMinimumTrackingAmount) { + TrackedWatermarkBufferFactory factory(4); + EXPECT_EQ(factory.bitshift(), 2); +} + +TEST(WatermarkBufferFactoryTest, ReleaseAssertIfAccountTrackingThresholdBytesIsNotPowerOfTwo) { + envoy::config::bootstrap::v3::BufferFactoryConfig config; + config.set_account_tracking_threshold_bytes(3); + EXPECT_DEATH(WatermarkBufferFactory{config}, + "Expected account_tracking_threshold_bytes to be a power of two."); +} + +} // namespace +} // namespace Buffer +} // namespace Envoy diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index 55a877a01370e..854be0f4dd5e1 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1265,277 +1265,6 @@ TEST_F(OwnedImplTest, FrontSlice) { EXPECT_EQ(1, buffer.frontSlice().len_); } -TEST(BufferMemoryAccountTest, ManagesAccountBalance) { - auto account = std::make_shared(); - Buffer::OwnedImpl buffer(account); - ASSERT_EQ(account->balance(), 0); - - // Check the balance increases as expected. - { - // New slice created - buffer.add("Hello"); - EXPECT_EQ(account->balance(), 4096); - - // Should just be added to existing slice. - buffer.add(" World!"); - EXPECT_EQ(account->balance(), 4096); - - // Trigger new slice creation with add. - const std::string long_string(4096, 'a'); - buffer.add(long_string); - EXPECT_EQ(account->balance(), 8192); - - // AppendForTest also adds new slice. - buffer.appendSliceForTest("Extra Slice"); - EXPECT_EQ(account->balance(), 12288); - } - - // Check the balance drains as slices are consumed. - { - // Shouldn't trigger slice free yet - buffer.drain(4095); - EXPECT_EQ(account->balance(), 12288); - - // Trigger slice reclaim. - buffer.drain(1); - EXPECT_EQ(account->balance(), 8192); - - // Reclaim next slice - buffer.drain(std::string("Hello World!").length()); - EXPECT_EQ(account->balance(), 4096); - - // Reclaim remaining - buffer.drain(std::string("Extra Slice").length()); - EXPECT_EQ(account->balance(), 0); - } -} - -TEST(BufferMemoryAccountTest, BufferAccountsForUnownedSliceMovedInto) { - auto account = std::make_shared(); - Buffer::OwnedImpl accounted_buffer(account); - - Buffer::OwnedImpl unowned_buffer; - unowned_buffer.add("Unaccounted Slice"); - ASSERT_EQ(account->balance(), 0); - - // Transfer over buffer - accounted_buffer.move(unowned_buffer); - EXPECT_EQ(account->balance(), 4096); - - accounted_buffer.drain(accounted_buffer.length()); - EXPECT_EQ(account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, BufferFragmentsShouldNotHaveAnAssociatedAccount) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - const char data[] = "hello world"; - BufferFragmentImpl frag(data, 11, nullptr); - buffer_one.addBufferFragment(frag); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_one.length(), 11); - - // Transfer over buffer - buffer_two.move(buffer_one); - EXPECT_EQ(buffer_two_account->balance(), 0); - EXPECT_EQ(buffer_two.length(), 11); - - buffer_two.drain(buffer_two.length()); - EXPECT_EQ(buffer_two_account->balance(), 0); - EXPECT_EQ(buffer_two.length(), 0); -} - -TEST(BufferMemoryAccountTest, SliceRemainsAttachToOriginalAccountWhenMoved) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - buffer_one.add("Charged to Account One"); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 0); - - // Transfer over buffer, still tied to account one. - buffer_two.move(buffer_one); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 0); - - buffer_two.drain(buffer_two.length()); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, - SliceRemainsAttachToOriginalAccountWhenMovedUnlessCoalescedIntoExistingSlice) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - buffer_one.add("Will Coalesce"); - buffer_two.add("To be Coalesce into:"); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - // Transfer over buffer, slices coalesce, crediting account one. - buffer_two.move(buffer_one); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - buffer_two.drain(std::string("To be Coalesce into:Will Coalesce").length()); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, SliceCanRemainAttachedToOriginalAccountWhenMovedAndCoalescedInto) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - auto buffer_three_account = std::make_shared(); - Buffer::OwnedImpl buffer_three(buffer_three_account); - ASSERT_EQ(buffer_three_account->balance(), 0); - - buffer_one.add("Will Coalesce"); - buffer_two.add("To be Coalesce into:"); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - // Transfer buffers, leading to slice coalescing in third buffer. - buffer_three.move(buffer_two); - buffer_three.move(buffer_one); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 4096); - EXPECT_EQ(buffer_three_account->balance(), 0); - - buffer_three.drain(std::string("To be Coalesce into:Will Coalesce").length()); - EXPECT_EQ(buffer_two_account->balance(), 0); -} - -TEST(BufferMemoryAccountTest, LinearizedBufferShouldChargeItsAssociatedAccount) { - auto buffer_one_account = std::make_shared(); - Buffer::OwnedImpl buffer_one(buffer_one_account); - ASSERT_EQ(buffer_one_account->balance(), 0); - - auto buffer_two_account = std::make_shared(); - Buffer::OwnedImpl buffer_two(buffer_two_account); - ASSERT_EQ(buffer_two_account->balance(), 0); - - auto buffer_three_account = std::make_shared(); - Buffer::OwnedImpl buffer_three(buffer_three_account); - ASSERT_EQ(buffer_three_account->balance(), 0); - - const std::string long_string(4096, 'a'); - buffer_one.add(long_string); - buffer_two.add(long_string); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - - // Move into the third buffer. - buffer_three.move(buffer_one); - buffer_three.move(buffer_two); - EXPECT_EQ(buffer_one_account->balance(), 4096); - EXPECT_EQ(buffer_two_account->balance(), 4096); - EXPECT_EQ(buffer_three_account->balance(), 0); - - // Linearize, which does a copy out of the slices. - buffer_three.linearize(8192); - EXPECT_EQ(buffer_one_account->balance(), 0); - EXPECT_EQ(buffer_two_account->balance(), 0); - EXPECT_EQ(buffer_three_account->balance(), 8192); -} - -TEST(BufferMemoryAccountTest, ManagesAccountBalanceWhenPrepending) { - auto prepend_to_account = std::make_shared(); - Buffer::OwnedImpl buffer_to_prepend_to(prepend_to_account); - ASSERT_EQ(prepend_to_account->balance(), 0); - - auto prepend_account = std::make_shared(); - Buffer::OwnedImpl buffer_to_prepend(prepend_account); - ASSERT_EQ(prepend_account->balance(), 0); - - Buffer::OwnedImpl unowned_buffer_to_prepend; - - unowned_buffer_to_prepend.add("World"); - buffer_to_prepend.add("Goodbye World"); - EXPECT_EQ(prepend_account->balance(), 4096); - - // Prepend the buffers. - buffer_to_prepend_to.prepend(buffer_to_prepend); - EXPECT_EQ(prepend_account->balance(), 4096); - EXPECT_EQ(prepend_to_account->balance(), 0); - - buffer_to_prepend_to.prepend(unowned_buffer_to_prepend); - EXPECT_EQ(prepend_to_account->balance(), 4096); - - // Prepend a string view. - buffer_to_prepend_to.prepend("Hello "); - EXPECT_EQ(prepend_to_account->balance(), 8192); -} - -TEST(BufferMemoryAccountTest, ExtractingSliceWithExistingStorageCreditsAccountOnce) { - auto buffer_account = std::make_shared(); - Buffer::OwnedImpl buffer(buffer_account); - ASSERT_EQ(buffer_account->balance(), 0); - - buffer.appendSliceForTest("Slice 1"); - buffer.appendSliceForTest("Slice 2"); - EXPECT_EQ(buffer_account->balance(), 8192); - - // Account should only be credited when slice is extracted. - // Not on slice dtor. - { - auto slice = buffer.extractMutableFrontSlice(); - EXPECT_EQ(buffer_account->balance(), 4096); - } - - EXPECT_EQ(buffer_account->balance(), 4096); -} - -TEST(BufferMemoryAccountTest, NewReservationSlicesOnlyChargedAfterCommit) { - auto buffer_account = std::make_shared(); - Buffer::OwnedImpl buffer(buffer_account); - ASSERT_EQ(buffer_account->balance(), 0); - - auto reservation = buffer.reserveForRead(); - EXPECT_EQ(buffer_account->balance(), 0); - - // We should only be charged for the slices committed. - reservation.commit(16384); - EXPECT_EQ(buffer_account->balance(), 16384); -} - -TEST(BufferMemoryAccountTest, ReservationShouldNotChargeForExistingSlice) { - auto buffer_account = std::make_shared(); - Buffer::OwnedImpl buffer(buffer_account); - ASSERT_EQ(buffer_account->balance(), 0); - - buffer.add("Many bytes remaining in this slice to use for reservation."); - EXPECT_EQ(buffer_account->balance(), 4096); - - // The account shouldn't be charged again at commit since the commit - // uses memory from the slice already charged for. - auto reservation = buffer.reserveForRead(); - reservation.commit(2000); - EXPECT_EQ(buffer_account->balance(), 4096); -} - } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/extensions/filters/common/rbac/matchers_test.cc b/test/extensions/filters/common/rbac/matchers_test.cc index 87d22517dd428..0979445343df8 100644 --- a/test/extensions/filters/common/rbac/matchers_test.cc +++ b/test/extensions/filters/common/rbac/matchers_test.cc @@ -4,6 +4,7 @@ #include "envoy/config/route/v3/route_components.pb.h" #include "envoy/type/matcher/v3/metadata.pb.h" +#include "source/common/network/address_impl.h" #include "source/common/network/utility.h" #include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/common/rbac/matchers.h" @@ -33,6 +34,10 @@ void checkMatcher( EXPECT_EQ(expected, matcher.matches(connection, headers, info)); } +PortRangeMatcher createPortRangeMatcher(envoy::type::v3::Int32Range range) { + return PortRangeMatcher(range); +} + TEST(AlwaysMatcher, AlwaysMatches) { checkMatcher(RBAC::AlwaysMatcher(), true); } TEST(AndMatcher, Permission_Set) { @@ -101,6 +106,12 @@ TEST(OrMatcher, Permission_Set) { checkMatcher(RBAC::OrMatcher(set), false, conn, headers, info); + perm = set.add_rules(); + perm->mutable_destination_port_range()->set_start(123); + perm->mutable_destination_port_range()->set_end(456); + + checkMatcher(RBAC::OrMatcher(set), false, conn, headers, info); + perm = set.add_rules(); perm->set_any(true); @@ -233,6 +244,58 @@ TEST(PortMatcher, PortMatcher) { checkMatcher(PortMatcher(456), false, conn, headers, info); } +// Test valid and invalid destination_port_range permission rule in RBAC. +TEST(PortRangeMatcher, PortRangeMatcher) { + Envoy::Network::MockConnection conn; + Envoy::Http::TestRequestHeaderMapImpl headers; + NiceMock info; + Envoy::Network::Address::InstanceConstSharedPtr addr = + Envoy::Network::Utility::parseInternetAddress("1.2.3.4", 456, false); + info.downstream_address_provider_->setLocalAddress(addr); + + // IP address with port 456 is in range [123, 789) and [456, 789), but not in range [123, 456) or + // [12, 34). + envoy::type::v3::Int32Range range; + range.set_start(123); + range.set_end(789); + checkMatcher(PortRangeMatcher(range), true, conn, headers, info); + + range.set_start(456); + range.set_end(789); + checkMatcher(PortRangeMatcher(range), true, conn, headers, info); + + range.set_start(123); + range.set_end(456); + checkMatcher(PortRangeMatcher(range), false, conn, headers, info); + + range.set_start(12); + range.set_end(34); + checkMatcher(PortRangeMatcher(range), false, conn, headers, info); + + // Only IP address is valid for the permission rule. + NiceMock info2; + Envoy::Network::Address::InstanceConstSharedPtr addr2 = + std::make_shared("test"); + info2.downstream_address_provider_->setLocalAddress(addr2); + checkMatcher(PortRangeMatcher(range), false, conn, headers, info2); + + // Invalid rule will cause an exception. + range.set_start(-1); + range.set_end(80); + EXPECT_THROW_WITH_REGEX(createPortRangeMatcher(range), EnvoyException, + "range start .* is out of bounds"); + + range.set_start(80); + range.set_end(65537); + EXPECT_THROW_WITH_REGEX(createPortRangeMatcher(range), EnvoyException, + "range end .* is out of bounds"); + + range.set_start(80); + range.set_end(80); + EXPECT_THROW_WITH_REGEX(createPortRangeMatcher(range), EnvoyException, + "range start .* cannot be greater or equal than range end .*"); +} + TEST(AuthenticatedMatcher, uriSanPeerCertificate) { Envoy::Network::MockConnection conn; auto ssl = std::make_shared(); diff --git a/test/integration/BUILD b/test/integration/BUILD index f1fdda3435d77..d7da6529dc28c 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -384,6 +384,7 @@ envoy_cc_test( ], deps = [ ":http_integration_lib", + ":http_protocol_integration_lib", ":socket_interface_swap_lib", ":tracked_watermark_buffer_lib", "//test/mocks/http:http_mocks", @@ -842,6 +843,7 @@ envoy_cc_test_library( "//test/test_common:test_time_system_interface", "//test/test_common:utility_lib", "@com_google_absl//absl/synchronization", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/transport_sockets/quic/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg_cc_proto", @@ -1348,6 +1350,7 @@ envoy_cc_test( srcs = ["tracked_watermark_buffer_test.cc"], deps = [ ":tracked_watermark_buffer_lib", + "//test/mocks/http:stream_reset_handler_mock", "//test/test_common:test_runtime_lib", ], ) diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index b11c916e568be..7eb6ec7ab6249 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -8,6 +8,7 @@ #include "source/common/buffer/buffer_impl.h" #include "test/integration/autonomous_upstream.h" +#include "test/integration/http_protocol_integration.h" #include "test/integration/tracked_watermark_buffer.h" #include "test/integration/utility.h" #include "test/mocks/http/mocks.h" @@ -20,20 +21,48 @@ namespace Envoy { namespace { -std::string ipVersionAndBufferAccountingTestParamsToString( - const ::testing::TestParamInfo>& params) { - return fmt::format( - "{}_{}", - TestUtility::ipTestParamsToString(::testing::TestParamInfo( - std::get<0>(params.param), params.index)), - std::get<1>(params.param) ? "with_per_stream_buffer_accounting" - : "without_per_stream_buffer_accounting"); +std::string protocolTestParamsAndBoolToString( + const ::testing::TestParamInfo>& params) { + return fmt::format("{}_{}", + HttpProtocolIntegrationTest::protocolTestParamsToString( + ::testing::TestParamInfo(std::get<0>(params.param), + /*an_index=*/0)), + std::get<1>(params.param) ? "with_per_stream_buffer_accounting" + : "without_per_stream_buffer_accounting"); +} + +void runOnWorkerThreadsAndWaitforCompletion(Server::Instance& server, std::function func) { + absl::Notification done_notification; + ThreadLocal::TypedSlotPtr<> slot; + Envoy::Thread::ThreadId main_tid; + server.dispatcher().post([&] { + slot = ThreadLocal::TypedSlot<>::makeUnique(server.threadLocal()); + slot->set( + [](Envoy::Event::Dispatcher&) -> std::shared_ptr { + return nullptr; + }); + + main_tid = server.api().threadFactory().currentThreadId(); + + slot->runOnAllThreads( + [main_tid, &server, &func](OptRef) { + // Run on the worker thread. + if (server.api().threadFactory().currentThreadId() != main_tid) { + func(); + } + }, + [&slot, &done_notification] { + slot.reset(nullptr); + done_notification.Notify(); + }); + }); + done_notification.WaitForNotification(); } } // namespace -class HttpBufferWatermarksTest +class Http2BufferWatermarksTest : public SocketInterfaceSwap, - public testing::TestWithParam>, + public testing::TestWithParam>, public HttpIntegrationTest { public: std::vector @@ -54,20 +83,21 @@ class HttpBufferWatermarksTest return responses; } - // TODO(kbaichoo): Parameterize on the client codec type when other protocols - // (H1, H3) support buffer accounting. - HttpBufferWatermarksTest() - : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) { + Http2BufferWatermarksTest() + : HttpIntegrationTest( + std::get<0>(GetParam()).downstream_protocol, std::get<0>(GetParam()).version, + ConfigHelper::httpProxyConfig( + /*downstream_is_quic=*/std::get<0>(GetParam()).downstream_protocol == + Http::CodecType::HTTP3)) { config_helper_.addRuntimeOverride("envoy.test_only.per_stream_buffer_accounting", streamBufferAccounting() ? "true" : "false"); setServerBufferFactory(buffer_factory_); - setDownstreamProtocol(Http::CodecClient::Type::HTTP2); - setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); + setUpstreamProtocol(std::get<0>(GetParam()).upstream_protocol); } protected: std::shared_ptr buffer_factory_ = - std::make_shared(); + std::make_shared(1024 * 1024); // Track >= 1MB bool streamBufferAccounting() { return std::get<1>(GetParam()); } @@ -91,14 +121,20 @@ class HttpBufferWatermarksTest } }; +// Run the tests using HTTP2 only since its the only protocol that's fully +// supported. +// TODO(kbaichoo): Instantiate with H3 and H1 as well when their buffers are +// bounded to accounts. INSTANTIATE_TEST_SUITE_P( - IpVersions, HttpBufferWatermarksTest, - testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), - ipVersionAndBufferAccountingTestParamsToString); + IpVersions, Http2BufferWatermarksTest, + testing::Combine(testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP2}, {FakeHttpConnection::Type::HTTP2})), + testing::Bool()), + protocolTestParamsAndBoolToString); // We should create four buffers each billing the same downstream request's // account which originated the chain. -TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { +TEST_P(Http2BufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { FakeStreamPtr upstream_request1; FakeStreamPtr upstream_request2; default_request_headers_.setContentLength(1000); @@ -153,7 +189,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldCreateFourBuffersPerAccount) { EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } -TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { +TEST_P(Http2BufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { const int num_requests = 5; const uint32_t request_body_size = 4096; const uint32_t response_body_size = 4096; @@ -188,7 +224,7 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) { } } -TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { +TEST_P(Http2BufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { const int num_requests = 5; const uint32_t request_body_size = 4096; const uint32_t response_body_size = 16384; @@ -224,4 +260,135 @@ TEST_P(HttpBufferWatermarksTest, ShouldTrackAllocatedBytesToDownstream) { } } +// Focuses on tests using the various codec. Currently, the accounting is only +// fully wired through with H2, but it's important to test that H1 and H3 end +// up notifying the BufferMemoryAccount when the dtor of the downstream stream +// occurs. +class ProtocolsBufferWatermarksTest + : public testing::TestWithParam>, + public HttpIntegrationTest { +public: + ProtocolsBufferWatermarksTest() + : HttpIntegrationTest( + std::get<0>(GetParam()).downstream_protocol, std::get<0>(GetParam()).version, + ConfigHelper::httpProxyConfig( + /*downstream_is_quic=*/std::get<0>(GetParam()).downstream_protocol == + Http::CodecType::HTTP3)) { + config_helper_.addRuntimeOverride("envoy.test_only.per_stream_buffer_accounting", + streamBufferAccounting() ? "true" : "false"); + setServerBufferFactory(buffer_factory_); + setUpstreamProtocol(std::get<0>(GetParam()).upstream_protocol); + } + +protected: + std::shared_ptr buffer_factory_ = + std::make_shared(); + + bool streamBufferAccounting() { return std::get<1>(GetParam()); } +}; + +INSTANTIATE_TEST_SUITE_P( + IpVersions, ProtocolsBufferWatermarksTest, + testing::Combine(testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP1, Http::CodecType::HTTP2, Http::CodecType::HTTP3}, + {FakeHttpConnection::Type::HTTP2})), + testing::Bool()), + protocolTestParamsAndBoolToString); + +TEST_P(ProtocolsBufferWatermarksTest, AccountShouldBeRegisteredAndUnregisteredOnce) { + FakeStreamPtr upstream_request1; + default_request_headers_.setContentLength(1000); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends the first request. + auto response1 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request1 = std::move(upstream_request_); + + if (streamBufferAccounting()) { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1); + } else { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0); + } + + upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request1->encodeData(1000, true); + ASSERT_TRUE(response1->waitForEndStream()); + ASSERT_TRUE(upstream_request1->complete()); + + // Check single call to unregister if stream account, 0 otherwise + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)); + } else { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0)); + } +} + +TEST_P(ProtocolsBufferWatermarksTest, ResettingStreamUnregistersAccount) { + FakeStreamPtr upstream_request1; + default_request_headers_.setContentLength(1000); + // H1 on RST ends up leveraging idle timeout if no active stream on the + // connection. + config_helper_.setDownstreamHttpIdleTimeout(std::chrono::milliseconds(100)); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends the first request. + auto response1 = codec_client_->makeRequestWithBody(default_request_headers_, 1000); + waitForNextUpstreamRequest(); + upstream_request1 = std::move(upstream_request_); + + if (streamBufferAccounting()) { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1); + } else { + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0); + } + + if (streamBufferAccounting()) { + // Reset the downstream via the account interface on the worker thread. + EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1); + Buffer::BufferMemoryAccountSharedPtr account; + auto& server = test_server_->server(); + + // Get access to the account. + buffer_factory_->inspectAccounts( + [&account](Buffer::TrackedWatermarkBufferFactory::AccountToBoundBuffersMap& map) { + for (auto& [acct, _] : map) { + account = acct; + } + }, + server); + + // Reset the stream from the worker. + runOnWorkerThreadsAndWaitforCompletion(server, [&account]() { account->resetDownstream(); }); + + if (std::get<0>(GetParam()).downstream_protocol == Http::CodecType::HTTP1) { + // For H1, we use idleTimeouts to cancel streams unless there was an + // explicit protocol error prior to sending a response to the downstream. + // Since that's not the case, the reset will fire twice, once due to + // overload manager, and once due to timeout which will close the + // connection. + ASSERT_TRUE(codec_client_->waitForDisconnect(std::chrono::milliseconds(10000))); + } else { + ASSERT_TRUE(response1->waitForReset()); + EXPECT_EQ(response1->resetReason(), Http::StreamResetReason::RemoteReset); + } + } else { + upstream_request1->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request1->encodeData(1000, true); + ASSERT_TRUE(response1->waitForEndStream()); + ASSERT_TRUE(upstream_request1->complete()); + } + + // Check single call to unregister if stream account, 0 otherwise + if (streamBufferAccounting()) { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)); + } else { + EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0)); + } +} + } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 1a14d888a1d52..8d5dabce1ad0c 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -9,6 +9,15 @@ namespace Envoy { namespace Buffer { +TrackedWatermarkBufferFactory::TrackedWatermarkBufferFactory() : TrackedWatermarkBufferFactory(0) {} + +TrackedWatermarkBufferFactory::TrackedWatermarkBufferFactory(uint32_t min_tracking_bytes) + : WatermarkBufferFactory([min_tracking_bytes]() { + auto config = envoy::config::bootstrap::v3::BufferFactoryConfig(); + config.set_account_tracking_threshold_bytes(min_tracking_bytes); + return config; + }()) {} + TrackedWatermarkBufferFactory::~TrackedWatermarkBufferFactory() { ASSERT(active_buffer_count_ == 0); } @@ -74,6 +83,21 @@ TrackedWatermarkBufferFactory::createBuffer(std::function below_low_wate below_low_watermark, above_high_watermark, above_overflow_watermark); } +BufferMemoryAccountSharedPtr +TrackedWatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) { + auto account = WatermarkBufferFactory::createAccount(reset_handler); + absl::MutexLock lock(&mutex_); + ++total_accounts_created_; + return account; +} + +void TrackedWatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, + int current_class) { + WatermarkBufferFactory::unregisterAccount(account, current_class); + absl::MutexLock lock(&mutex_); + ++total_accounts_unregistered_; +} + uint64_t TrackedWatermarkBufferFactory::numBuffersCreated() const { absl::MutexLock lock(&mutex_); return buffer_infos_.size(); @@ -139,6 +163,21 @@ std::pair TrackedWatermarkBufferFactory::highWatermarkRange( return std::make_pair(min_watermark, max_watermark); } +uint64_t TrackedWatermarkBufferFactory::numAccountsCreated() const { + absl::MutexLock lock(&mutex_); + return total_accounts_created_; +} + +bool TrackedWatermarkBufferFactory::waitForExpectedAccountUnregistered( + uint64_t expected_accounts_unregistered, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, expected_accounts_unregistered]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + return expected_accounts_unregistered == total_accounts_unregistered_; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); +} + bool TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds( uint64_t byte_size, std::chrono::milliseconds timeout) { absl::MutexLock lock(&mutex_); @@ -196,6 +235,11 @@ void TrackedWatermarkBufferFactory::inspectAccounts( done_notification.WaitForNotification(); } +void TrackedWatermarkBufferFactory::inspectMemoryClasses( + std::function func) { + func(size_class_account_sets_); +} + void TrackedWatermarkBufferFactory::setExpectedAccountBalance(uint64_t byte_size_per_account, uint32_t num_accounts) { absl::MutexLock lock(&mutex_); @@ -231,7 +275,7 @@ void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() { // This is thread safe since this function should run on the only Envoy worker // thread. for (auto& acc : account_infos_) { - if (static_cast(acc.first.get())->balance() < + if (static_cast(acc.first.get())->balance() < expected_balances_->balance_per_account_) { return; } diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 999e955037fc6..d308f8c53f8ad 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -59,14 +59,18 @@ class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { }; // Factory that tracks how the created buffers are used. -class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { +class TrackedWatermarkBufferFactory : public WatermarkBufferFactory { public: - TrackedWatermarkBufferFactory() = default; + // Use the default minimum tracking threshold. + TrackedWatermarkBufferFactory(); + TrackedWatermarkBufferFactory(uint32_t min_tracking_bytes); ~TrackedWatermarkBufferFactory() override; // Buffer::WatermarkFactory Buffer::InstancePtr createBuffer(std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) override; + BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; + void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class) override; // Number of buffers created. uint64_t numBuffersCreated() const; @@ -82,6 +86,17 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // functionality is disabled. std::pair highWatermarkRange() const; + // Number of accounts created. + uint64_t numAccountsCreated() const; + + // Waits for the expected number of accounts unregistered. Unlike + // numAccountsCreated, there are no pre-existing hooks into Envoy when an + // account unregistered call occurs as it depends upon deferred delete. + // This creates the synchronization needed. + bool waitForExpectedAccountUnregistered( + uint64_t expected_accounts_unregistered, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + // Total bytes currently buffered across all known buffers. uint64_t totalBytesBuffered() const { absl::MutexLock lock(&mutex_); @@ -116,9 +131,17 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { using AccountToBoundBuffersMap = absl::flat_hash_map>; + // Used to inspect all accounts tied to any buffer created from this factory. void inspectAccounts(std::function func, Server::Instance& server); + // Used to inspect the memory class to accounts within that class structure. + // This differs from inspectAccounts as that has all accounts bounded to an + // active buffer, while this might not track certain accounts (e.g. below + // thresholds.) As implemented this is NOT thread-safe! + void inspectMemoryClasses( + std::function func); + private: // Remove "dangling" accounts; accounts where the account_info map is the only // entity still pointing to the account. @@ -148,6 +171,10 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t active_buffer_count_ ABSL_GUARDED_BY(mutex_) = 0; // total bytes buffered across all buffers. uint64_t total_buffer_size_ ABSL_GUARDED_BY(mutex_) = 0; + // total number of accounts created + uint64_t total_accounts_created_ ABSL_GUARDED_BY(mutex_) = 0; + // total number of accounts unregistered + uint64_t total_accounts_unregistered_ ABSL_GUARDED_BY(mutex_) = 0; // Info about the buffer, by buffer idx. absl::node_hash_map buffer_infos_ ABSL_GUARDED_BY(mutex_); // The expected balances for the accounts. If set, when a buffer updates its diff --git a/test/integration/tracked_watermark_buffer_test.cc b/test/integration/tracked_watermark_buffer_test.cc index 734fa5cd31aee..782fcc71a04d7 100644 --- a/test/integration/tracked_watermark_buffer_test.cc +++ b/test/integration/tracked_watermark_buffer_test.cc @@ -7,6 +7,7 @@ #include "test/integration/tracked_watermark_buffer.h" #include "test/mocks/common.h" +#include "test/mocks/http/stream_reset_handler.h" #include "test/test_common/test_runtime.h" #include "test/test_common/thread_factory_for_test.h" @@ -22,6 +23,7 @@ namespace { class TrackedWatermarkBufferTest : public testing::Test { public: TrackedWatermarkBufferFactory factory_; + Http::MockStreamResetHandler mock_reset_handler_; }; TEST_F(TrackedWatermarkBufferTest, WatermarkFunctions) { @@ -131,7 +133,7 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfBuffersActivelyBound) { auto buffer1 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer2 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer3 = factory_.createBuffer([]() {}, []() {}, []() {}); - BufferMemoryAccountSharedPtr account = std::make_shared(); + auto account = factory_.createAccount(mock_reset_handler_); ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); buffer1->bindAccount(account); @@ -141,7 +143,8 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfBuffersActivelyBound) { buffer3->bindAccount(account); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 3)); - // Release test access to the account. + // Release test and account access to shared_this. + account->clearDownstream(); account.reset(); buffer3.reset(); @@ -156,7 +159,7 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { auto buffer1 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer2 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer3 = factory_.createBuffer([]() {}, []() {}, []() {}); - BufferMemoryAccountSharedPtr account1 = std::make_shared(); + auto account1 = factory_.createAccount(mock_reset_handler_); ASSERT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); buffer1->bindAccount(account1); @@ -164,10 +167,12 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { buffer2->bindAccount(account1); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 2)); - // Release test access to the account. + // Release test and account access to shared_this. + account1->clearDownstream(); account1.reset(); - buffer3->bindAccount(std::make_shared()); + auto account2 = factory_.createAccount(mock_reset_handler_); + buffer3->bindAccount(account2); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(2, 3)); buffer2.reset(); @@ -175,6 +180,10 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { buffer1.reset(); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(1, 1)); + // Release test and account access to shared_this. + account2->clearDownstream(); + account2.reset(); + buffer3.reset(); EXPECT_TRUE(factory_.waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0)); } @@ -182,8 +191,8 @@ TEST_F(TrackedWatermarkBufferTest, TracksNumberOfAccountsActive) { TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrueWhenConditionsMet) { auto buffer1 = factory_.createBuffer([]() {}, []() {}, []() {}); auto buffer2 = factory_.createBuffer([]() {}, []() {}, []() {}); - BufferMemoryAccountSharedPtr account1 = std::make_shared(); - BufferMemoryAccountSharedPtr account2 = std::make_shared(); + auto account1 = factory_.createAccount(mock_reset_handler_); + auto account2 = factory_.createAccount(mock_reset_handler_); buffer1->bindAccount(account1); buffer2->bindAccount(account2); @@ -194,6 +203,9 @@ TEST_F(TrackedWatermarkBufferTest, WaitForExpectedAccountBalanceShouldReturnTrue buffer2->add("Now we have expected balances!"); EXPECT_TRUE(factory_.waitForExpectedAccountBalanceWithTimeout(std::chrono::seconds(0))); + + account1->clearDownstream(); + account2->clearDownstream(); } } // namespace diff --git a/test/integration/utility.cc b/test/integration/utility.cc index c6746cae7869d..a6eb50a495aab 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -5,6 +5,7 @@ #include #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/extensions/transport_sockets/quic/v3/quic_transport.pb.h" #include "envoy/network/connection.h" @@ -187,8 +188,9 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt NiceMock random; Event::GlobalTimeSystem time_system; NiceMock random_generator; + envoy::config::bootstrap::v3::Bootstrap bootstrap; Api::Impl api(Thread::threadFactoryForTest(), mock_stats_store, time_system, - Filesystem::fileSystemForTest(), random_generator); + Filesystem::fileSystemForTest(), random_generator, bootstrap); Event::DispatcherPtr dispatcher(api.allocateDispatcher("test_thread")); TestConnectionCallbacks connection_callbacks(*dispatcher); diff --git a/test/mocks/api/mocks.cc b/test/mocks/api/mocks.cc index dfd9345ae8063..c3eeed40d0c5c 100644 --- a/test/mocks/api/mocks.cc +++ b/test/mocks/api/mocks.cc @@ -8,6 +8,7 @@ using testing::_; using testing::Invoke; +using testing::ReturnRef; namespace Envoy { namespace Api { @@ -16,6 +17,7 @@ MockApi::MockApi() { ON_CALL(*this, fileSystem()).WillByDefault(ReturnRef(file_system_)); ON_CALL(*this, rootScope()).WillByDefault(ReturnRef(stats_store_)); ON_CALL(*this, randomGenerator()).WillByDefault(ReturnRef(random_)); + ON_CALL(*this, bootstrap()).WillByDefault(ReturnRef(empty_bootstrap_)); } MockApi::~MockApi() = default; diff --git a/test/mocks/api/mocks.h b/test/mocks/api/mocks.h index 94b5db59f99c2..ef6f02c999cbd 100644 --- a/test/mocks/api/mocks.h +++ b/test/mocks/api/mocks.h @@ -5,6 +5,7 @@ #include "envoy/api/api.h" #include "envoy/api/os_sys_calls.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/event/timer.h" @@ -47,12 +48,14 @@ class MockApi : public Api { MOCK_METHOD(Thread::ThreadFactory&, threadFactory, ()); MOCK_METHOD(Stats::Scope&, rootScope, ()); MOCK_METHOD(Random::RandomGenerator&, randomGenerator, ()); + MOCK_METHOD(const envoy::config::bootstrap::v3::Bootstrap&, bootstrap, (), (const)); MOCK_METHOD(ProcessContextOptRef, processContext, ()); testing::NiceMock file_system_; Event::GlobalTimeSystem time_system_; testing::NiceMock stats_store_; testing::NiceMock random_; + envoy::config::bootstrap::v3::Bootstrap empty_bootstrap_; }; class MockOsSysCalls : public OsSysCallsImpl { diff --git a/test/mocks/buffer/mocks.h b/test/mocks/buffer/mocks.h index 22a215206939b..c3579a3b8d7d4 100644 --- a/test/mocks/buffer/mocks.h +++ b/test/mocks/buffer/mocks.h @@ -87,6 +87,8 @@ class MockBufferFactory : public Buffer::WatermarkFactory { MOCK_METHOD(Buffer::Instance*, createBuffer_, (std::function below_low, std::function above_high, std::function above_overflow)); + + MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, createAccount, (Http::StreamResetHandler&)); }; MATCHER_P(BufferEqual, rhs, testing::PrintToString(*rhs)) { diff --git a/test/mocks/http/BUILD b/test/mocks/http/BUILD index 51c6c7f2fae1d..063aa41e96586 100644 --- a/test/mocks/http/BUILD +++ b/test/mocks/http/BUILD @@ -108,3 +108,11 @@ envoy_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_cc_mock( + name = "stream_reset_handler_mock", + hdrs = ["stream_reset_handler.h"], + deps = [ + "//envoy/http:stream_reset_handler_interface", + ], +) diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 1a3d4e8bcae67..19181d8c26ed6 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -29,7 +29,11 @@ MockStream::MockStream() { [this](Buffer::BufferMemoryAccountSharedPtr account) -> void { account_ = account; })); } -MockStream::~MockStream() = default; +MockStream::~MockStream() { + if (account_) { + account_->clearDownstream(); + } +} } // namespace Http } // namespace Envoy diff --git a/test/mocks/http/stream_reset_handler.h b/test/mocks/http/stream_reset_handler.h new file mode 100644 index 0000000000000..6469a7b124cc0 --- /dev/null +++ b/test/mocks/http/stream_reset_handler.h @@ -0,0 +1,19 @@ +#pragma once + +#include "envoy/http/stream_reset_handler.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Http { + +class MockStreamResetHandler : public StreamResetHandler { +public: + MockStreamResetHandler() = default; + + // Http::StreamResetHandler + MOCK_METHOD(void, resetStream, (StreamResetReason reason)); +}; + +} // namespace Http +} // namespace Envoy diff --git a/test/server/config_validation/BUILD b/test/server/config_validation/BUILD index 51ebda4ae37c4..03cddb20150e0 100644 --- a/test/server/config_validation/BUILD +++ b/test/server/config_validation/BUILD @@ -74,6 +74,7 @@ envoy_cc_test( "//test/test_common:environment_lib", "//test/test_common:network_utility_lib", "//test/test_common:test_time_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", ], ) diff --git a/test/server/config_validation/dispatcher_test.cc b/test/server/config_validation/dispatcher_test.cc index 608135be22ae0..e72afe4cdc223 100644 --- a/test/server/config_validation/dispatcher_test.cc +++ b/test/server/config_validation/dispatcher_test.cc @@ -1,5 +1,7 @@ #include +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" + #include "source/common/common/thread.h" #include "source/common/event/dispatcher_impl.h" #include "source/common/event/libevent.h" @@ -24,7 +26,7 @@ class ConfigValidation : public testing::TestWithParam( Thread::threadFactoryForTest(), stats_store_, test_time_.timeSystem(), - Filesystem::fileSystemForTest(), random_generator_); + Filesystem::fileSystemForTest(), random_generator_, bootstrap_); dispatcher_ = validation_->allocateDispatcher("test_thread"); } @@ -32,6 +34,7 @@ class ConfigValidation : public testing::TestWithParam random_generator_; + envoy::config::bootstrap::v3::Bootstrap bootstrap_; private: // Using config validation API. diff --git a/test/test_common/BUILD b/test/test_common/BUILD index 75a5445637854..97475f1e989d6 100644 --- a/test/test_common/BUILD +++ b/test/test_common/BUILD @@ -135,6 +135,7 @@ envoy_cc_test_library( "//source/common/protobuf:utility_lib", "//source/common/stats:stats_lib", "//test/mocks/stats:stats_mocks", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index 35b5305cd0a9b..1461cd0a9923c 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -12,6 +12,7 @@ #include "envoy/buffer/buffer.h" #include "envoy/common/platform.h" +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/listener/v3/listener.pb.h" @@ -399,6 +400,7 @@ class TestImplProvider { Event::GlobalTimeSystem global_time_system_; testing::NiceMock default_stats_store_; testing::NiceMock mock_random_generator_; + envoy::config::bootstrap::v3::Bootstrap empty_bootstrap_; }; class TestImpl : public TestImplProvider, public Impl { @@ -408,7 +410,7 @@ class TestImpl : public TestImplProvider, public Impl { Random::RandomGenerator* random = nullptr) : Impl(thread_factory, stats_store ? *stats_store : default_stats_store_, time_system ? *time_system : global_time_system_, file_system, - random ? *random : mock_random_generator_) {} + random ? *random : mock_random_generator_, empty_bootstrap_) {} }; ApiPtr createApiForTest() {