diff --git a/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto b/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto index f3b5fae378aa7..e7a19dd64d14c 100644 --- a/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto +++ b/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto @@ -33,7 +33,7 @@ message BandwidthLimit { Egress = 2; // Filter enabled for both Ingress and Egress traffic. - IngressAndEgress = 4; + IngressAndEgress = 3; } // The human readable prefix to use when emitting stats. @@ -55,6 +55,8 @@ message BandwidthLimit { // google.protobuf.UInt64Value limit_kbps = 3 [(validate.rules).uint64 = {gte: 1}]; - // Optional fill_rate for the token buckets (per second). Defaults to 64. + // Optional fill_rate for the token buckets (per second). Defaults to 16. + // .. note:: + // In the current implementation, the fill_rate must be <= 32 to avoid too aggressive refills. google.protobuf.UInt32Value fill_rate = 4 [(validate.rules).uint32 = {gte: 1}]; } diff --git a/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto b/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto index f3b5fae378aa7..e7a19dd64d14c 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto +++ b/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto @@ -33,7 +33,7 @@ message BandwidthLimit { Egress = 2; // Filter enabled for both Ingress and Egress traffic. - IngressAndEgress = 4; + IngressAndEgress = 3; } // The human readable prefix to use when emitting stats. @@ -55,6 +55,8 @@ message BandwidthLimit { // google.protobuf.UInt64Value limit_kbps = 3 [(validate.rules).uint64 = {gte: 1}]; - // Optional fill_rate for the token buckets (per second). Defaults to 64. + // Optional fill_rate for the token buckets (per second). Defaults to 16. + // .. note:: + // In the current implementation, the fill_rate must be <= 32 to avoid too aggressive refills. google.protobuf.UInt32Value fill_rate = 4 [(validate.rules).uint32 = {gte: 1}]; } diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 5e7de9e6bb1a7..0f96cc1568896 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -4,11 +4,17 @@ namespace Envoy { -TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate) +TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate, + bool allow_multiple_resets) : max_tokens_(max_tokens), fill_rate_(std::abs(fill_rate)), tokens_(max_tokens), - last_fill_(time_source.monotonicTime()), time_source_(time_source) {} + last_fill_(time_source.monotonicTime()), time_source_(time_source) { + if (!allow_multiple_resets) + // initialize only when multiple resets are not allowed. + reset_once_ = absl::optional(false); +} uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { + absl::WriterMutexLock lock(&mutex_); if (tokens_ < max_tokens_) { const auto time_now = time_source_.monotonicTime(); tokens_ = std::min((std::chrono::duration(time_now - last_fill_).count() * fill_rate_) + @@ -31,6 +37,7 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { // If there are tokens available, return immediately. + absl::ReaderMutexLock lock(&mutex_); if (tokens_ >= 1) { return std::chrono::milliseconds(0); } @@ -40,8 +47,14 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { void TokenBucketImpl::reset(uint64_t num_tokens) { ASSERT(num_tokens <= max_tokens_); + absl::WriterMutexLock lock(&mutex_); + // Don't reset if reset before and multiple resets aren't allowed. + if (reset_once_.has_value() && reset_once_.value()) { + return; + } tokens_ = num_tokens; last_fill_ = time_source_.monotonicTime(); + reset_once_ = absl::optional(true); } } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 644a4185dd5ab..8a8b8d32d9c9e 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -5,10 +5,12 @@ #include "common/common/utility.h" +#include "absl/synchronization/mutex.h" + namespace Envoy { /** - * A class that implements token bucket interface (not thread-safe). + * A class that implements token bucket interface. */ class TokenBucketImpl : public TokenBucket { public: @@ -18,7 +20,8 @@ class TokenBucketImpl : public TokenBucket { * @param fill_rate supplies the number of tokens that will return to the bucket on each second. * The default is 1. */ - explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1); + explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1, + bool allow_multiple_resets = false); // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; @@ -29,8 +32,10 @@ class TokenBucketImpl : public TokenBucket { const double max_tokens_; const double fill_rate_; double tokens_; + absl::optional reset_once_; MonotonicTime last_fill_; TimeSource& time_source_; + absl::Mutex mutex_; }; } // namespace Envoy diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 8a48fb6782644..fc846c73621c0 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -50,6 +50,7 @@ EXTENSIONS = { "envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:config", "envoy.filters.http.aws_lambda": "//source/extensions/filters/http/aws_lambda:config", "envoy.filters.http.aws_request_signing": "//source/extensions/filters/http/aws_request_signing:config", + "envoy.filters.http.bandwidth_limit": "//source/extensions/filters/http/bandwidth_limit:config", "envoy.filters.http.buffer": "//source/extensions/filters/http/buffer:config", "envoy.filters.http.cache": "//source/extensions/filters/http/cache:config", "envoy.filters.http.cdn_loop": "//source/extensions/filters/http/cdn_loop:config", diff --git a/source/extensions/filters/http/bandwidth_limit/BUILD b/source/extensions/filters/http/bandwidth_limit/BUILD new file mode 100644 index 0000000000000..8cad9ac8d72a4 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/BUILD @@ -0,0 +1,46 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +# Local Bandwidthlimit HTTP L7 filter +# Public docs: docs/root/configuration/http_filters/bandwidth_limit_filter.rst + +envoy_extension_package() + +envoy_cc_library( + name = "bandwidth_limit_lib", + srcs = ["bandwidth_limit.cc"], + hdrs = ["bandwidth_limit.h"], + deps = [ + "//include/envoy/http:codes_interface", + "//include/envoy/server:filter_config_interface", + "//include/envoy/stats:stats_macros", + "//source/common/common:utility_lib", + "//source/common/http:header_utility_lib", + "//source/common/http:headers_lib", + "//source/common/router:header_parser_lib", + "//source/common/runtime:runtime_lib", + "//source/extensions/filters/http/common:stream_rate_limiter_lib", + "@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto", + "@envoy_api//envoy/type/v3:pkg_cc_proto", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + security_posture = "unknown", + deps = [ + ":bandwidth_limit_lib", + "//include/envoy/http:filter_interface", + "//source/common/protobuf:utility_lib", + "//source/extensions/filters/http/common:factory_base_lib", + "@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc new file mode 100644 index 0000000000000..323ea8572ed32 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc @@ -0,0 +1,153 @@ +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" + +#include +#include + +#include "envoy/http/codes.h" + +#include "common/http/utility.h" + +using envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit; +using Envoy::Extensions::HttpFilters::Common::StreamRateLimiter; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +FilterConfig::FilterConfig(const BandwidthLimit& config, Stats::Scope& scope, + Runtime::Loader& runtime, TimeSource& time_source, bool per_route) + : stats_(generateStats(config.stat_prefix(), scope)), runtime_(runtime), scope_(scope), + time_source_(time_source), + limit_kbps_(config.has_limit_kbps() ? config.limit_kbps().value() : 0), + enable_mode_(config.enable_mode()), + fill_rate_(config.has_fill_rate() ? config.fill_rate().value() + : StreamRateLimiter::DefaultFillRate) { + if (per_route && !config.has_limit_kbps()) { + throw EnvoyException("bandwidthlimitfilter: limit must be set for per route filter config"); + } + + if (fill_rate_ > MaxFillRate) { + throw EnvoyException("bandwidthlimitfilter: fill rate must be <= 32"); + } + // The token bucket is configured with a max token count of the number of ticks per second, + // and refills at the same rate, so that we have a per second limit which refills gradually in + // 1/fill_rate intervals. + token_bucket_ = std::make_shared(fill_rate_, time_source, fill_rate_); +} + +BandwidthLimitStats FilterConfig::generateStats(const std::string& prefix, Stats::Scope& scope) { + const std::string final_prefix = prefix + ".http_bandwidth_limit"; + return {ALL_BANDWIDTH_LIMIT_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; +} + +// BandwidthLimiter members + +Http::FilterHeadersStatus BandwidthLimiter::decodeHeaders(Http::RequestHeaderMap&, bool) { + const auto* config = getConfig(); + + auto mode = config->enable_mode(); + ENVOY_LOG(trace, "BandwidthLimiter: decode headers: mode={}", static_cast(mode)); + + if (mode & BandwidthLimit::Ingress) { + config->stats().enabled_.inc(); + ingress_limiter_ = std::make_unique( + config_->limit(), decoder_callbacks_->decoderBufferLimit(), + [this] { decoder_callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); }, + [this] { decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); }, + [this](Buffer::Instance& data, bool end_stream) { + decoder_callbacks_->injectDecodedDataToFilterChain(data, end_stream); + }, + [this] { decoder_callbacks_->continueDecoding(); }, config_->timeSource(), + decoder_callbacks_->dispatcher(), decoder_callbacks_->scope(), config_->tokenBucket(), + config_->fill_rate()); + } + ENVOY_LOG(trace, "BandwidthLimiter: decode headers: ingress_limiter_={}", + ingress_limiter_ ? true : false); + + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus BandwidthLimiter::decodeData(Buffer::Instance& data, bool end_stream) { + if (ingress_limiter_ != nullptr) { + ingress_limiter_->writeData(data, end_stream); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + ENVOY_LOG(trace, "BandwidthLimiter: decode data: ingress_limiter_ not set"); + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus BandwidthLimiter::decodeTrailers(Http::RequestTrailerMap&) { + if (ingress_limiter_ != nullptr) { + return ingress_limiter_->onTrailers() ? Http::FilterTrailersStatus::StopIteration + : Http::FilterTrailersStatus::Continue; + } + return Http::FilterTrailersStatus::Continue; +} + +Http::FilterHeadersStatus BandwidthLimiter::encodeHeaders(Http::ResponseHeaderMap&, bool) { + const auto* config = getConfig(); + + auto mode = config->enable_mode(); + ENVOY_LOG(trace, "BandwidthLimiter: encode headers: mode={}", static_cast(mode)); + + if (mode & BandwidthLimit::Egress) { + config->stats().enabled_.inc(); + + egress_limiter_ = std::make_unique( + config_->limit(), encoder_callbacks_->encoderBufferLimit(), + [this] { encoder_callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); }, + [this] { encoder_callbacks_->onEncoderFilterBelowWriteBufferLowWatermark(); }, + [this](Buffer::Instance& data, bool end_stream) { + encoder_callbacks_->injectEncodedDataToFilterChain(data, end_stream); + }, + [this] { encoder_callbacks_->continueEncoding(); }, config_->timeSource(), + encoder_callbacks_->dispatcher(), encoder_callbacks_->scope(), config_->tokenBucket(), + config_->fill_rate()); + } + ENVOY_LOG(trace, "BandwidthLimiter: encode headers: egress_limiter_={}", + egress_limiter_ ? true : false); + + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus BandwidthLimiter::encodeData(Buffer::Instance& data, bool end_stream) { + if (egress_limiter_ != nullptr) { + egress_limiter_->writeData(data, end_stream); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + ENVOY_LOG(trace, "BandwidthLimiter: encode data: egress_limiter_ not set"); + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus BandwidthLimiter::encodeTrailers(Http::ResponseTrailerMap&) { + if (egress_limiter_ != nullptr) { + return egress_limiter_->onTrailers() ? Http::FilterTrailersStatus::StopIteration + : Http::FilterTrailersStatus::Continue; + } + return Http::FilterTrailersStatus::Continue; +} + +const FilterConfig* BandwidthLimiter::getConfig() const { + const auto* config = Http::Utility::resolveMostSpecificPerFilterConfig( + "envoy.filters.http.bandwidth_limit", decoder_callbacks_->route()); + if (config) { + return config; + } + + return config_.get(); +} + +void BandwidthLimiter::onDestroy() { + if (ingress_limiter_ != nullptr) { + ingress_limiter_->destroy(); + } + if (egress_limiter_ != nullptr) { + egress_limiter_->destroy(); + } +} + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.h b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.h new file mode 100644 index 0000000000000..8d6007ee4708f --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.h @@ -0,0 +1,139 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.h" +#include "envoy/http/filter.h" +#include "envoy/runtime/runtime.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats_macros.h" + +#include "common/common/assert.h" +#include "common/common/token_bucket_impl.h" +#include "common/http/header_map_impl.h" +#include "common/router/header_parser.h" +#include "common/runtime/runtime_protos.h" + +#include "extensions/filters/http/common/stream_rate_limiter.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +/** + * All bandwidth limit stats. @see stats_macros.h + */ +#define ALL_BANDWIDTH_LIMIT_STATS(COUNTER) \ + COUNTER(enabled) \ + COUNTER(enforced) \ + COUNTER(bandwidth_limited) \ + COUNTER(bandwidth_usage) \ + COUNTER(ok) + +/** + * Struct definition for all bandwidth limit stats. @see stats_macros.h + */ +struct BandwidthLimitStats { + ALL_BANDWIDTH_LIMIT_STATS(GENERATE_COUNTER_STRUCT) +}; + +/** + * Configuration for the HTTP bandwidth limit filter. + */ +class FilterConfig : public ::Envoy::Router::RouteSpecificFilterConfig { +public: + using EnableMode = + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit_EnableMode; + + static constexpr uint64_t MaxFillRate = 32; + + FilterConfig( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& config, + Stats::Scope& scope, Runtime::Loader& runtime, TimeSource& time_source, + bool per_route = false); + ~FilterConfig() override = default; + Runtime::Loader& runtime() { return runtime_; } + BandwidthLimitStats& stats() const { return stats_; } + Stats::Scope& scope() { return scope_; } + TimeSource& timeSource() { return time_source_; } + // Must call enabled() before calling limit(). + uint64_t limit() const { return limit_kbps_; } + EnableMode enable_mode() const { return enable_mode_; }; + std::shared_ptr tokenBucket() { return token_bucket_; } + const std::shared_ptr tokenBucket() const { return token_bucket_; } + uint64_t fill_rate() const { return fill_rate_; } + +private: + friend class FilterTest; + + static BandwidthLimitStats generateStats(const std::string& prefix, Stats::Scope& scope); + + mutable BandwidthLimitStats stats_; + Runtime::Loader& runtime_; + Stats::Scope& scope_; + TimeSource& time_source_; + const uint64_t limit_kbps_; + const EnableMode enable_mode_; + const uint64_t fill_rate_; + // Filter chain's shared token bucket + std::shared_ptr token_bucket_; +}; + +using FilterConfigSharedPtr = std::shared_ptr; + +/** + * HTTP bandwidth limit filter. Depending on the route configuration, this filter calls consults + * with local token bucket before allowing further filter iteration. + */ +class BandwidthLimiter : public Http::StreamFilter, Logger::Loggable { +public: + BandwidthLimiter(FilterConfigSharedPtr config) : config_(config) {} + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override; + Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override; + + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { + decoder_callbacks_ = &callbacks; + } + + // Http::StreamEncoderFilter + Http::FilterHeadersStatus encode100ContinueHeaders(Http::ResponseHeaderMap&) override { + return Http::FilterHeadersStatus::Continue; + } + + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap&, bool) override; + Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override; + + Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap&) override { + return Http::FilterMetadataStatus::Continue; + } + + void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override { + encoder_callbacks_ = &callbacks; + } + + // Http::StreamFilterBase + void onDestroy() override; + +private: + friend class FilterTest; + const FilterConfig* getConfig() const; + + Http::StreamDecoderFilterCallbacks* decoder_callbacks_{}; + Http::StreamEncoderFilterCallbacks* encoder_callbacks_{}; + FilterConfigSharedPtr config_; + std::unique_ptr ingress_limiter_; + std::unique_ptr egress_limiter_; +}; + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/bandwidth_limit/config.cc b/source/extensions/filters/http/bandwidth_limit/config.cc new file mode 100644 index 0000000000000..2ed2118fea451 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/config.cc @@ -0,0 +1,43 @@ +#include "extensions/filters/http/bandwidth_limit/config.h" + +#include + +#include "envoy/registry/registry.h" + +#include "common/protobuf/utility.h" + +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +Http::FilterFactoryCb BandwidthLimitFilterConfig::createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& proto_config, + const std::string&, Server::Configuration::FactoryContext& context) { + FilterConfigSharedPtr filter_config = std::make_shared( + proto_config, context.scope(), context.runtime(), context.timeSource()); + return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared(filter_config)); + }; +} + +Router::RouteSpecificFilterConfigConstSharedPtr +BandwidthLimitFilterConfig::createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& proto_config, + Server::Configuration::ServerFactoryContext& context, ProtobufMessage::ValidationVisitor&) { + return std::make_shared(proto_config, context.scope(), context.runtime(), + context.timeSource(), true); +} + +/** + * Static registration for the bandwidth limit filter. @see RegisterFactory. + */ +REGISTER_FACTORY(BandwidthLimitFilterConfig, + Server::Configuration::NamedHttpFilterConfigFactory){"envoy.bandwidth_limit"}; + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/bandwidth_limit/config.h b/source/extensions/filters/http/bandwidth_limit/config.h new file mode 100644 index 0000000000000..e9945bbf745d8 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/config.h @@ -0,0 +1,37 @@ +#pragma once + +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.h" +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.validate.h" + +#include "extensions/filters/http/common/factory_base.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +/** + * Config registration for the bandwidth limit filter. @see NamedHttpFilterConfigFactory. + */ +class BandwidthLimitFilterConfig + : public Common::FactoryBase< + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit> { +public: + BandwidthLimitFilterConfig() : FactoryBase("envoy.filters.http.bandwidth_limit") {} + +private: + Http::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& + proto_config, + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override; + + Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& + proto_config, + Server::Configuration::ServerFactoryContext&, ProtobufMessage::ValidationVisitor&) override; +}; + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/common/BUILD b/source/extensions/filters/http/common/BUILD index a0c427cf97838..14bb3b2cf52a7 100644 --- a/source/extensions/filters/http/common/BUILD +++ b/source/extensions/filters/http/common/BUILD @@ -56,3 +56,18 @@ envoy_cc_library( "//source/extensions/filters/http:well_known_names", ], ) + +envoy_cc_library( + name = "stream_rate_limiter_lib", + srcs = ["stream_rate_limiter.cc"], + hdrs = ["stream_rate_limiter.h"], + deps = [ + "//include/envoy/event:dispatcher_interface", + "//include/envoy/event:timer_interface", + "//source/common/buffer:watermark_buffer_lib", + "//source/common/common:assert_lib", + "//source/common/common:empty_string", + "//source/common/common:token_bucket_impl_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/common/stream_rate_limiter.cc b/source/extensions/filters/http/common/stream_rate_limiter.cc new file mode 100644 index 0000000000000..f1ddd62def3c9 --- /dev/null +++ b/source/extensions/filters/http/common/stream_rate_limiter.cc @@ -0,0 +1,120 @@ +#include "extensions/filters/http/common/stream_rate_limiter.h" + +#include + +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" + +#include "common/common/assert.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Common { + +StreamRateLimiter::StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data, + std::function pause_data_cb, + std::function resume_data_cb, + std::function write_data_cb, + std::function continue_cb, TimeSource& time_source, + Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope, + std::shared_ptr token_bucket, + uint64_t fill_rate) + : // bytes_per_time_slice is KiB converted to bytes divided by the number of ticks per second. + bytes_per_time_slice_((max_kbps * 1024) / fill_rate), write_data_cb_(write_data_cb), + continue_cb_(continue_cb), scope_(scope), token_bucket_(std::move(token_bucket)), + token_timer_(dispatcher.createTimer([this] { onTokenTimer(); })), + buffer_(resume_data_cb, pause_data_cb, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }) { + ASSERT(bytes_per_time_slice_ > 0); + ASSERT(max_buffered_data > 0); + if (!token_bucket_) { + // Initialize a new token bucket if caller didn't provide one. + // The token bucket is configured with a max token count of the number of ticks per second, + // and refills at the same rate, so that we have a per second limit which refills gradually in + // 1/fill_rate intervals. + token_bucket_ = std::make_shared(fill_rate, time_source, fill_rate); + } + buffer_.setWatermarks(max_buffered_data); +} + +void StreamRateLimiter::onTokenTimer() { + // TODO(nitgoy): remove all debug trace logs before final merge + ENVOY_LOG(trace, "stream limiter: timer wakeup: buffered={}", buffer_.length()); + Buffer::OwnedImpl data_to_write; + + if (!saw_data_) { + // The first time we see any data on this stream (via writeData()), reset the number of tokens + // to 1. This will ensure that we start pacing the data at the desired rate (and don't send a + // full 1 sec of data right away which might not introduce enough delay for a stream that + // doesn't have enough data to span more than 1s of rate allowance). Once we reset, we will + // subsequently allow for bursting within the second to account for our data provider being + // bursty. The shared token bucket will reset only first time even when called reset from + // multiple streams. + token_bucket_->reset(1); + saw_data_ = true; + } + + // Compute the number of tokens needed (rounded up), try to obtain that many tickets, and then + // figure out how many bytes to write given the number of tokens we actually got. + const uint64_t tokens_needed = + (buffer_.length() + bytes_per_time_slice_ - 1) / bytes_per_time_slice_; + const uint64_t tokens_obtained = token_bucket_->consume(tokens_needed, true); + const uint64_t bytes_to_write = + std::min(tokens_obtained * bytes_per_time_slice_, buffer_.length()); + ENVOY_LOG(trace, "stream limiter: tokens_needed={} tokens_obtained={} to_write={}", tokens_needed, + tokens_obtained, bytes_to_write); + + // Move the data to write into the output buffer with as little copying as possible. + // NOTE: This might be moving zero bytes, but that should work fine. + data_to_write.move(buffer_, bytes_to_write); + + // If the buffer still contains data in it, we couldn't get enough tokens, so schedule the next + // token available time. + // In case of a shared token bucket, this algorithm will prioritize one stream at a time. + // TODO(nitgoy): add round-robin and other policies for rationing bandwidth. + if (buffer_.length() > 0) { + const std::chrono::milliseconds ms = token_bucket_->nextTokenAvailable(); + if (ms.count() > 0) { + ENVOY_LOG(trace, "stream limiter: scheduling wakeup for {}ms", ms.count()); + token_timer_->enableTimer(ms, &scope_); + } + } + + // Write the data out, indicating end stream if we saw end stream, there is no further data to + // send, and there are no trailers. + write_data_cb_(data_to_write, saw_end_stream_ && buffer_.length() == 0 && !saw_trailers_); + + // If there is no more data to send and we saw trailers, we need to continue iteration to release + // the trailers to further filters. + if (buffer_.length() == 0 && saw_trailers_) { + continue_cb_(); + } +} + +void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_stream) { + ENVOY_LOG(trace, "stream limiter: incoming data length={} buffered={}", incoming_buffer.length(), + buffer_.length()); + buffer_.move(incoming_buffer); + saw_end_stream_ = end_stream; + if (!token_timer_->enabled()) { + // TODO(mattklein123): In an optimal world we would be able to continue iteration with the data + // we want in the buffer, but have a way to clear end_stream in case we can't send it all. + // The filter API does not currently support that and it will not be a trivial change to add. + // Instead we cheat here by scheduling the token timer to run immediately after the stack is + // unwound, at which point we can directly called encode/decodeData. + token_timer_->enableTimer(std::chrono::milliseconds(0), &scope_); + ENVOY_LOG(trace, "stream limiter: token timer is{}enabled for first time.", + token_timer_->enabled() ? " " : " not "); + } +} + +bool StreamRateLimiter::onTrailers() { + saw_end_stream_ = true; + saw_trailers_ = true; + return buffer_.length() > 0; +} +} // namespace Common +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/common/stream_rate_limiter.h b/source/extensions/filters/http/common/stream_rate_limiter.h new file mode 100644 index 0000000000000..94b4c0f53a185 --- /dev/null +++ b/source/extensions/filters/http/common/stream_rate_limiter.h @@ -0,0 +1,93 @@ +#pragma once +#include +#include +#include +#include + +#include "envoy/event/timer.h" +#include "envoy/runtime/runtime.h" + +#include "common/buffer/watermark_buffer.h" +#include "common/common/token_bucket_impl.h" + +namespace Envoy { + +class ScopeTrackedObject; + +namespace Event { +class Timer; +} // namespace Event + +namespace Extensions { +namespace HttpFilters { +namespace Common { + +/** + * An HTTP stream rate limiter. Used in the fault filter and bandwidth filter. + */ +class StreamRateLimiter : Logger::Loggable { +public: + // We currently divide each second into 16 segments for the token bucket. Thus, the rate limit is + // KiB per second, divided into 16 segments, ~63ms apart. 16 is used because it divides into 1024 + // evenly. + static constexpr uint64_t DefaultFillRate = 16; + + /** + * @param max_kbps maximum rate in KiB/s. + * @param max_buffered_data maximum data to buffer before invoking the pause callback. + * @param pause_data_cb callback invoked when the limiter has buffered too much data. + * @param resume_data_cb callback invoked when the limiter has gone under the buffer limit. + * @param write_data_cb callback invoked to write data to the stream. + * @param continue_cb callback invoked to continue the stream. This is only used to continue + * trailers that have been paused during body flush. + * @param time_source the time source to run the token bucket with. + * @param dispatcher the stream's dispatcher to use for creating timers. + * @param scope the stream's scope + */ + StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data, + std::function pause_data_cb, std::function resume_data_cb, + std::function write_data_cb, + std::function continue_cb, TimeSource& time_source, + Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope, + std::shared_ptr token_bucket = nullptr, + uint64_t fill_rate = DefaultFillRate); + + /** + * Called by the stream to write data. All data writes happen asynchronously, the stream should + * be stopped after this call (all data will be drained from incoming_buffer). + */ + void writeData(Buffer::Instance& incoming_buffer, bool end_stream); + + /** + * Called if the stream receives trailers. + * Returns true if the read buffer is not completely drained yet. + */ + bool onTrailers(); + + /** + * Like the owning filter, we must handle inline destruction, so we have a destroy() method which + * kills any callbacks. + */ + void destroy() { token_timer_.reset(); } + bool destroyed() { return token_timer_ == nullptr; } + +private: + using TimerPtr = std::unique_ptr; + + void onTokenTimer(); + + const uint64_t bytes_per_time_slice_; + const std::function write_data_cb_; + const std::function continue_cb_; + const ScopeTrackedObject& scope_; + std::shared_ptr token_bucket_; + Event::TimerPtr token_timer_; + bool saw_data_{}; + bool saw_end_stream_{}; + bool saw_trailers_{}; + Buffer::WatermarkBuffer buffer_; +}; +} // namespace Common +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/fault/BUILD b/source/extensions/filters/http/fault/BUILD index a518d60f37e13..32d6ef31d42bf 100644 --- a/source/extensions/filters/http/fault/BUILD +++ b/source/extensions/filters/http/fault/BUILD @@ -36,6 +36,7 @@ envoy_cc_library( "//source/common/stats:utility_lib", "//source/extensions/filters/common/fault:fault_config_lib", "//source/extensions/filters/http:well_known_names", + "//source/extensions/filters/http/common:stream_rate_limiter_lib", "@envoy_api//envoy/extensions/filters/http/fault/v3:pkg_cc_proto", "@envoy_api//envoy/type/v3:pkg_cc_proto", ], diff --git a/source/extensions/filters/http/fault/fault_filter.cc b/source/extensions/filters/http/fault/fault_filter.cc index 4b98e5583a89c..55e1111794bb5 100644 --- a/source/extensions/filters/http/fault/fault_filter.cc +++ b/source/extensions/filters/http/fault/fault_filter.cc @@ -208,7 +208,7 @@ void FaultFilter::maybeSetupResponseRateLimit(const Http::RequestHeaderMap& requ config_->stats().response_rl_injected_.inc(); - response_limiter_ = std::make_unique( + response_limiter_ = std::make_unique( rate_kbps.value(), encoder_callbacks_->encoderBufferLimit(), [this] { encoder_callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); }, [this] { encoder_callbacks_->onEncoderFilterBelowWriteBufferLowWatermark(); }, @@ -502,104 +502,13 @@ Http::FilterDataStatus FaultFilter::encodeData(Buffer::Instance& data, bool end_ Http::FilterTrailersStatus FaultFilter::encodeTrailers(Http::ResponseTrailerMap&) { if (response_limiter_ != nullptr) { - return response_limiter_->onTrailers(); + return response_limiter_->onTrailers() ? Http::FilterTrailersStatus::StopIteration + : Http::FilterTrailersStatus::Continue; } return Http::FilterTrailersStatus::Continue; } -StreamRateLimiter::StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data, - std::function pause_data_cb, - std::function resume_data_cb, - std::function write_data_cb, - std::function continue_cb, TimeSource& time_source, - Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope) - : // bytes_per_time_slice is KiB converted to bytes divided by the number of ticks per second. - bytes_per_time_slice_((max_kbps * 1024) / SecondDivisor), write_data_cb_(write_data_cb), - continue_cb_(continue_cb), scope_(scope), - // The token bucket is configured with a max token count of the number of ticks per second, - // and refills at the same rate, so that we have a per second limit which refills gradually in - // ~63ms intervals. - token_bucket_(SecondDivisor, time_source, SecondDivisor), - token_timer_(dispatcher.createTimer([this] { onTokenTimer(); })), - buffer_(resume_data_cb, pause_data_cb, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }) { - ASSERT(bytes_per_time_slice_ > 0); - ASSERT(max_buffered_data > 0); - buffer_.setWatermarks(max_buffered_data); -} - -void StreamRateLimiter::onTokenTimer() { - ENVOY_LOG(trace, "limiter: timer wakeup: buffered={}", buffer_.length()); - Buffer::OwnedImpl data_to_write; - - if (!saw_data_) { - // The first time we see any data on this stream (via writeData()), reset the number of tokens - // to 1. This will ensure that we start pacing the data at the desired rate (and don't send a - // full 1s of data right away which might not introduce enough delay for a stream that doesn't - // have enough data to span more than 1s of rate allowance). Once we reset, we will subsequently - // allow for bursting within the second to account for our data provider being bursty. - token_bucket_.reset(1); - saw_data_ = true; - } - - // Compute the number of tokens needed (rounded up), try to obtain that many tickets, and then - // figure out how many bytes to write given the number of tokens we actually got. - const uint64_t tokens_needed = - (buffer_.length() + bytes_per_time_slice_ - 1) / bytes_per_time_slice_; - const uint64_t tokens_obtained = token_bucket_.consume(tokens_needed, true); - const uint64_t bytes_to_write = - std::min(tokens_obtained * bytes_per_time_slice_, buffer_.length()); - ENVOY_LOG(trace, "limiter: tokens_needed={} tokens_obtained={} to_write={}", tokens_needed, - tokens_obtained, bytes_to_write); - - // Move the data to write into the output buffer with as little copying as possible. - // NOTE: This might be moving zero bytes, but that should work fine. - data_to_write.move(buffer_, bytes_to_write); - - // If the buffer still contains data in it, we couldn't get enough tokens, so schedule the next - // token available time. - if (buffer_.length() > 0) { - const std::chrono::milliseconds ms = token_bucket_.nextTokenAvailable(); - if (ms.count() > 0) { - ENVOY_LOG(trace, "limiter: scheduling wakeup for {}ms", ms.count()); - token_timer_->enableTimer(ms, &scope_); - } - } - - // Write the data out, indicating end stream if we saw end stream, there is no further data to - // send, and there are no trailers. - write_data_cb_(data_to_write, saw_end_stream_ && buffer_.length() == 0 && !saw_trailers_); - - // If there is no more data to send and we saw trailers, we need to continue iteration to release - // the trailers to further filters. - if (buffer_.length() == 0 && saw_trailers_) { - continue_cb_(); - } -} - -void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_stream) { - ENVOY_LOG(trace, "limiter: incoming data length={} buffered={}", incoming_buffer.length(), - buffer_.length()); - buffer_.move(incoming_buffer); - saw_end_stream_ = end_stream; - if (!token_timer_->enabled()) { - // TODO(mattklein123): In an optimal world we would be able to continue iteration with the data - // we want in the buffer, but have a way to clear end_stream in case we can't send it all. - // The filter API does not currently support that and it will not be a trivial change to add. - // Instead we cheat here by scheduling the token timer to run immediately after the stack is - // unwound, at which point we can directly called encode/decodeData. - token_timer_->enableTimer(std::chrono::milliseconds(0), &scope_); - } -} - -Http::FilterTrailersStatus StreamRateLimiter::onTrailers() { - saw_end_stream_ = true; - saw_trailers_ = true; - return buffer_.length() > 0 ? Http::FilterTrailersStatus::StopIteration - : Http::FilterTrailersStatus::Continue; -} - } // namespace Fault } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/fault/fault_filter.h b/source/extensions/filters/http/fault/fault_filter.h index e1ff4d275937d..a52b91cbba102 100644 --- a/source/extensions/filters/http/fault/fault_filter.h +++ b/source/extensions/filters/http/fault/fault_filter.h @@ -19,6 +19,7 @@ #include "common/stats/symbol_table_impl.h" #include "extensions/filters/common/fault/fault_config.h" +#include "extensions/filters/http/common/stream_rate_limiter.h" namespace Envoy { namespace Extensions { @@ -145,67 +146,6 @@ class FaultFilterConfig { using FaultFilterConfigSharedPtr = std::shared_ptr; -/** - * An HTTP stream rate limiter. Split out for ease of testing and potential code reuse elsewhere. - */ -class StreamRateLimiter : Logger::Loggable { -public: - /** - * @param max_kbps maximum rate in KiB/s. - * @param max_buffered_data maximum data to buffer before invoking the pause callback. - * @param pause_data_cb callback invoked when the limiter has buffered too much data. - * @param resume_data_cb callback invoked when the limiter has gone under the buffer limit. - * @param write_data_cb callback invoked to write data to the stream. - * @param continue_cb callback invoked to continue the stream. This is only used to continue - * trailers that have been paused during body flush. - * @param time_source the time source to run the token bucket with. - * @param dispatcher the stream's dispatcher to use for creating timers. - * @param scope the stream's scope - */ - StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data, - std::function pause_data_cb, std::function resume_data_cb, - std::function write_data_cb, - std::function continue_cb, TimeSource& time_source, - Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope); - - /** - * Called by the stream to write data. All data writes happen asynchronously, the stream should - * be stopped after this call (all data will be drained from incoming_buffer). - */ - void writeData(Buffer::Instance& incoming_buffer, bool end_stream); - - /** - * Called if the stream receives trailers. - */ - Http::FilterTrailersStatus onTrailers(); - - /** - * Like the owning filter, we must handle inline destruction, so we have a destroy() method which - * kills any callbacks. - */ - void destroy() { token_timer_.reset(); } - bool destroyed() { return token_timer_ == nullptr; } - -private: - void onTokenTimer(); - - // We currently divide each second into 16 segments for the token bucket. Thus, the rate limit is - // KiB per second, divided into 16 segments, ~63ms apart. 16 is used because it divides into 1024 - // evenly. - static constexpr uint64_t SecondDivisor = 16; - - const uint64_t bytes_per_time_slice_; - const std::function write_data_cb_; - const std::function continue_cb_; - const ScopeTrackedObject& scope_; - TokenBucketImpl token_bucket_; - Event::TimerPtr token_timer_; - bool saw_data_{}; - bool saw_end_stream_{}; - bool saw_trailers_{}; - Buffer::WatermarkBuffer buffer_; -}; - using AbortHttpAndGrpcStatus = std::pair, absl::optional>; /** @@ -278,7 +218,7 @@ class FaultFilter : public Http::StreamFilter, Logger::Loggable downstream_cluster_storage_; const FaultSettings* fault_settings_; bool fault_active_{}; - std::unique_ptr response_limiter_; + std::unique_ptr response_limiter_; std::string downstream_cluster_delay_percent_key_{}; std::string downstream_cluster_abort_percent_key_{}; std::string downstream_cluster_delay_duration_key_{}; diff --git a/source/extensions/filters/http/well_known_names.h b/source/extensions/filters/http/well_known_names.h index e869e3fc9bbd3..27702214936db 100644 --- a/source/extensions/filters/http/well_known_names.h +++ b/source/extensions/filters/http/well_known_names.h @@ -14,6 +14,8 @@ class HttpFilterNameValues { public: // Buffer filter const std::string Buffer = "envoy.filters.http.buffer"; + // Bandwidthlimit filter + const std::string BandwidthLimit = "envoy.filters.http.bandwidth_limit"; // Cache filter const std::string Cache = "envoy.filters.http.cache"; // CDN Loop filter diff --git a/test/extensions/filters/http/bandwidth_limit/BUILD b/test/extensions/filters/http/bandwidth_limit/BUILD new file mode 100644 index 0000000000000..b5e052cae0706 --- /dev/null +++ b/test/extensions/filters/http/bandwidth_limit/BUILD @@ -0,0 +1,37 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_extension_cc_test( + name = "filter_test", + srcs = ["filter_test.cc"], + extension_name = "envoy.filters.http.bandwidth_limit", + deps = [ + "//source/common/common:utility_lib", + "//source/common/http:header_utility_lib", + "//source/common/http:headers_lib", + "//source/common/router:header_parser_lib", + "//source/common/runtime:runtime_lib", + "//source/extensions/filters/http/bandwidth_limit:bandwidth_limit_lib", + "//test/mocks/server:server_mocks", + ], +) + +envoy_extension_cc_test( + name = "config_test", + srcs = ["config_test.cc"], + extension_name = "envoy.filters.http.bandwidth_limit", + deps = [ + "//source/extensions/filters/http/bandwidth_limit:config", + "//test/mocks/server:server_mocks", + ], +) diff --git a/test/extensions/filters/http/bandwidth_limit/config_test.cc b/test/extensions/filters/http/bandwidth_limit/config_test.cc new file mode 100644 index 0000000000000..ab7e8769a4d49 --- /dev/null +++ b/test/extensions/filters/http/bandwidth_limit/config_test.cc @@ -0,0 +1,140 @@ +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" +#include "extensions/filters/http/bandwidth_limit/config.h" + +#include "test/mocks/server/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +using EnableMode = + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit_EnableMode; + +TEST(Factory, GlobalEmptyConfig) { + const std::string yaml = R"( + stat_prefix: test + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + auto callback = factory.createFilterFactoryFromProto(*proto_config, "stats", context); + Http::MockFilterChainFactoryCallbacks filter_callback; + EXPECT_CALL(filter_callback, addStreamFilter(_)); + callback(filter_callback); +} + +TEST(Factory, RouteSpecificFilterConfig) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: IngressAndEgress + limit_kbps: 10 + fill_rate: 32 + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + const auto route_config = factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor()); + const auto* config = dynamic_cast(route_config.get()); + EXPECT_EQ(config->limit(), 10); + EXPECT_EQ(config->fill_rate(), 32); + EXPECT_EQ(config->enable_mode(), EnableMode::BandwidthLimit_EnableMode_IngressAndEgress); + EXPECT_FALSE(config->tokenBucket() == nullptr); +} + +TEST(Factory, RouteSpecificFilterConfigDisabledByDefault) { + const std::string config_yaml = R"( + stat_prefix: test + limit_kbps: 10 + fill_rate: 32 + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + const auto route_config = factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor()); + const auto* config = dynamic_cast(route_config.get()); + EXPECT_EQ(config->enable_mode(), EnableMode::BandwidthLimit_EnableMode_Disabled); + EXPECT_EQ(config->limit(), 10); + EXPECT_EQ(config->fill_rate(), 32); +} + +TEST(Factory, RouteSpecificFilterConfigDefaultFillRate) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: IngressAndEgress + limit_kbps: 10 + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + const auto route_config = factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor()); + const auto* config = dynamic_cast(route_config.get()); + EXPECT_EQ(config->limit(), 10); + EXPECT_EQ(config->fill_rate(), 16); +} + +TEST(Factory, PerRouteConfigNoLimits) { + const std::string config_yaml = R"( + stat_prefix: test + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + EXPECT_THROW(factory.createRouteSpecificFilterConfig(*proto_config, context, + ProtobufMessage::getNullValidationVisitor()), + EnvoyException); +} + +TEST(Factory, FillRateTooHigh) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: IngressAndEgress + limit_kbps: 10 + fill_rate: 33 + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + EXPECT_THROW(factory.createRouteSpecificFilterConfig(*proto_config, context, + ProtobufMessage::getNullValidationVisitor()), + EnvoyException); +} + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/bandwidth_limit/filter_test.cc b/test/extensions/filters/http/bandwidth_limit/filter_test.cc new file mode 100644 index 0000000000000..cfe2c6cb26b48 --- /dev/null +++ b/test/extensions/filters/http/bandwidth_limit/filter_test.cc @@ -0,0 +1,370 @@ +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.h" + +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" + +#include "test/mocks/http/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AnyNumber; +using testing::Matcher; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +class FilterTest : public testing::Test { +public: + FilterTest() = default; + + void setup(const std::string& yaml) { + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit config; + TestUtility::loadFromYaml(yaml, config); + config_ = std::make_shared(config, stats_, runtime_, time_system_, true); + filter_ = std::make_shared(config_); + filter_->setDecoderFilterCallbacks(decoder_filter_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_filter_callbacks_); + } + + uint64_t findCounter(const std::string& name) { + const auto counter = TestUtility::findCounter(stats_, name); + return counter != nullptr ? counter->value() : 0; + } + + NiceMock stats_; + NiceMock decoder_filter_callbacks_; + NiceMock encoder_filter_callbacks_; + NiceMock runtime_; + std::shared_ptr config_; + std::shared_ptr filter_; + Http::TestRequestHeaderMapImpl request_headers_; + Http::TestRequestTrailerMapImpl request_trailers_; + Http::TestResponseHeaderMapImpl response_headers_; + Http::TestResponseTrailerMapImpl response_trailers_; + Buffer::OwnedImpl data_; + Event::SimulatedTimeSystem time_system_; +}; + +TEST_F(FilterTest, Disabled) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: Disabled + limit_kbps: 10 + fill_rate: 32 + )"; + setup(fmt::format(config_yaml, "1")); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data_, false)); + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(0U, findCounter("test.http_bandwidth_limit.enabled")); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(0U, findCounter("test.http_bandwidth_limit.enabled")); +} + +TEST_F(FilterTest, BandwidthLimitOnDecode) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: Ingress + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + + ON_CALL(decoder_filter_callbacks_, decoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* token_timer = + new NiceMock(&decoder_filter_callbacks_.dispatcher_); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(16UL, config_->fill_rate()); + + // Send a small amount of data which should be within limit. + Buffer::OwnedImpl data1("hello"); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data1, false)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual("hello"), false)); + token_timer->invokeCallback(); + + // Advance time by 1s which should refill all tokens. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Send 1152 bytes of data which is 1s + 2 refill cycles of data. + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterAboveWriteBufferHighWatermark()); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data2(std::string(1152, 'a')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data2, false)); + + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterBelowWriteBufferLowWatermark()); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'a')), false)); + token_timer->invokeCallback(); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(64, 'a')), false)); + token_timer->invokeCallback(); + + // Get new data with current data buffered, not end_stream. + Buffer::OwnedImpl data3(std::string(64, 'b')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data3, false)); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(64, 'a')), false)); + token_timer->invokeCallback(); + + // Fire timer, also advance time. No timer enable because there is nothing buffered. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(64, 'b')), false)); + token_timer->invokeCallback(); + + // Advance time by 1s for a full refill. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Now send 1024 in one shot with end_stream true which should go through and end the stream. + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data4(std::string(1024, 'c')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data4, true)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'c')), true)); + token_timer->invokeCallback(); + + filter_->onDestroy(); +} + +TEST_F(FilterTest, BandwidthLimitOnEncode) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: Egress + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + EXPECT_CALL(encoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + + ON_CALL(encoder_filter_callbacks_, encoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* token_timer = + new NiceMock(&encoder_filter_callbacks_.dispatcher_); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(16UL, config_->fill_rate()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, + filter_->encode100ContinueHeaders(response_headers_)); + Http::MetadataMap metadata_map; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + + // Send a small amount of data which should be within limit. + Buffer::OwnedImpl data1("hello"); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data1, false)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual("hello"), false)); + token_timer->invokeCallback(); + + // Advance time by 1s which should refill all tokens. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Send 1152 bytes of data which is 1s + 2 refill cycles of data. + EXPECT_CALL(encoder_filter_callbacks_, onEncoderFilterAboveWriteBufferHighWatermark()); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data2(std::string(1152, 'a')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data2, false)); + + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, onEncoderFilterBelowWriteBufferLowWatermark()); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(1024, 'a')), false)); + token_timer->invokeCallback(); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(64, 'a')), false)); + token_timer->invokeCallback(); + + // Get new data with current data buffered, not end_stream. + Buffer::OwnedImpl data3(std::string(64, 'b')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data3, false)); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(64, 'a')), false)); + token_timer->invokeCallback(); + + // Fire timer, also advance time. No time enable because there is nothing buffered. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(64, 'b')), false)); + token_timer->invokeCallback(); + + // Advance time by 1s for a full refill. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Now send 1024 in one shot with end_stream true which should go through and end the stream. + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data4(std::string(1024, 'c')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data4, true)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(1024, 'c')), true)); + token_timer->invokeCallback(); + + filter_->onDestroy(); +} + +TEST_F(FilterTest, BandwidthLimitOnDecodeAndEncode) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: IngressAndEgress + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(encoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + + ON_CALL(decoder_filter_callbacks_, decoderBufferLimit()).WillByDefault(Return(1050)); + ON_CALL(encoder_filter_callbacks_, encoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* decode_timer = + new NiceMock(&decoder_filter_callbacks_.dispatcher_); + Event::MockTimer* encode_timer = + new NiceMock(&encoder_filter_callbacks_.dispatcher_); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(16UL, config_->fill_rate()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, + filter_->encode100ContinueHeaders(response_headers_)); + Http::MetadataMap metadata_map; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + + // Send small amount of data from both sides which should be within initial bucket limit. + Buffer::OwnedImpl dec_data1("hello"); + EXPECT_CALL(*decode_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data1, false)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual("hello"), false)); + decode_timer->invokeCallback(); + + Buffer::OwnedImpl enc_data1("world!"); + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data1, false)); + + // Encoder will not be able to write any bytes due to insufficient tokens. + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string("")), false)); + encode_timer->invokeCallback(); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual("world!"), false)); + encode_timer->invokeCallback(); + + // Advance time by 1s which should refill all tokens. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + // Send 1088 bytes of data on request path which is 1s + 1 refill cycle of data. + // Send 128 bytes of data on response path which is 2 refill cycles of data. + Buffer::OwnedImpl dec_data2(std::string(1088, 'd')); + Buffer::OwnedImpl enc_data2(std::string(128, 'e')); + + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterAboveWriteBufferHighWatermark()); + EXPECT_CALL(*decode_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data2, false)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data2, false)); + + EXPECT_CALL(*decode_timer, enableTimer(std::chrono::milliseconds(63), _)); + // EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterBelowWriteBufferLowWatermark()); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'd')), false)); + decode_timer->invokeCallback(); + + // Encoder will not be able to write any bytes due to insufficient tokens. + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string("")), false)); + encode_timer->invokeCallback(); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(64, 'd')), false)); + decode_timer->invokeCallback(); + // Encoder will not be able to write any bytes due to insufficient tokens. + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string("")), false)); + encode_timer->invokeCallback(); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(64, 'e')), false)); + encode_timer->invokeCallback(); + + // Get new data with current data buffered, not end_stream. + Buffer::OwnedImpl data3(std::string(64, 'b')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data3, false)); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(63), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(64, 'e')), false)); + encode_timer->invokeCallback(); + + // Fire timer, also advance time. No time enable because there is nothing buffered. + time_system_.advanceTimeWait(std::chrono::milliseconds(63)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(64, 'b')), false)); + encode_timer->invokeCallback(); + + // Advance time by 1s for a full refill. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Now send 1024 in total with end_stream true which should go through and end the streams. + Buffer::OwnedImpl enc_data4(std::string(960, 'e')); + Buffer::OwnedImpl dec_data4(std::string(64, 'd')); + EXPECT_CALL(*encode_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_CALL(*decode_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data4, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data4, true)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(64, 'd')), true)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(960, 'e')), true)); + encode_timer->invokeCallback(); + decode_timer->invokeCallback(); + + filter_->onDestroy(); +} + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy