Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ message BandwidthLimit {
Egress = 2;

// Filter enabled for both Ingress and Egress traffic.
IngressAndEgress = 4;
IngressAndEgress = 3;
}

// The human readable prefix to use when emitting stats.
Expand All @@ -55,6 +55,8 @@ message BandwidthLimit {
//
google.protobuf.UInt64Value limit_kbps = 3 [(validate.rules).uint64 = {gte: 1}];

// Optional fill_rate for the token buckets (per second). Defaults to 64.
// Optional fill_rate for the token buckets (per second). Defaults to 16.
// .. note::
// In the current implementation, the fill_rate must be <= 32 to avoid too aggressive refills.
google.protobuf.UInt32Value fill_rate = 4 [(validate.rules).uint32 = {gte: 1}];
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions source/common/common/token_bucket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

namespace Envoy {

TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate)
TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate,
bool allow_multiple_resets)
: max_tokens_(max_tokens), fill_rate_(std::abs(fill_rate)), tokens_(max_tokens),
last_fill_(time_source.monotonicTime()), time_source_(time_source) {}
last_fill_(time_source.monotonicTime()), time_source_(time_source) {
if (!allow_multiple_resets)
// initialize only when multiple resets are not allowed.
reset_once_ = absl::optional<bool>(false);
}

uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) {
absl::WriterMutexLock lock(&mutex_);
if (tokens_ < max_tokens_) {
const auto time_now = time_source_.monotonicTime();
tokens_ = std::min((std::chrono::duration<double>(time_now - last_fill_).count() * fill_rate_) +
Expand All @@ -31,6 +37,7 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) {

std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() {
// If there are tokens available, return immediately.
absl::ReaderMutexLock lock(&mutex_);
if (tokens_ >= 1) {
return std::chrono::milliseconds(0);
}
Expand All @@ -40,8 +47,14 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() {

void TokenBucketImpl::reset(uint64_t num_tokens) {
ASSERT(num_tokens <= max_tokens_);
absl::WriterMutexLock lock(&mutex_);
// Don't reset if reset before and multiple resets aren't allowed.
if (reset_once_.has_value() && reset_once_.value()) {
return;
}
tokens_ = num_tokens;
last_fill_ = time_source_.monotonicTime();
reset_once_ = absl::optional<bool>(true);
}

} // namespace Envoy
9 changes: 7 additions & 2 deletions source/common/common/token_bucket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

#include "common/common/utility.h"

#include "absl/synchronization/mutex.h"

namespace Envoy {

/**
* A class that implements token bucket interface (not thread-safe).
* A class that implements token bucket interface.
*/
class TokenBucketImpl : public TokenBucket {
public:
Expand All @@ -18,7 +20,8 @@ class TokenBucketImpl : public TokenBucket {
* @param fill_rate supplies the number of tokens that will return to the bucket on each second.
* The default is 1.
*/
explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1);
explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1,
bool allow_multiple_resets = false);

// TokenBucket
uint64_t consume(uint64_t tokens, bool allow_partial) override;
Expand All @@ -29,8 +32,10 @@ class TokenBucketImpl : public TokenBucket {
const double max_tokens_;
const double fill_rate_;
double tokens_;
absl::optional<bool> reset_once_;
MonotonicTime last_fill_;
TimeSource& time_source_;
absl::Mutex mutex_;
};

} // namespace Envoy
1 change: 1 addition & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ EXTENSIONS = {
"envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:config",
"envoy.filters.http.aws_lambda": "//source/extensions/filters/http/aws_lambda:config",
"envoy.filters.http.aws_request_signing": "//source/extensions/filters/http/aws_request_signing:config",
"envoy.filters.http.bandwidth_limit": "//source/extensions/filters/http/bandwidth_limit:config",
"envoy.filters.http.buffer": "//source/extensions/filters/http/buffer:config",
"envoy.filters.http.cache": "//source/extensions/filters/http/cache:config",
"envoy.filters.http.cdn_loop": "//source/extensions/filters/http/cdn_loop:config",
Expand Down
46 changes: 46 additions & 0 deletions source/extensions/filters/http/bandwidth_limit/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

# Local Bandwidthlimit HTTP L7 filter
# Public docs: docs/root/configuration/http_filters/bandwidth_limit_filter.rst

envoy_extension_package()

envoy_cc_library(
name = "bandwidth_limit_lib",
srcs = ["bandwidth_limit.cc"],
hdrs = ["bandwidth_limit.h"],
deps = [
"//include/envoy/http:codes_interface",
"//include/envoy/server:filter_config_interface",
"//include/envoy/stats:stats_macros",
"//source/common/common:utility_lib",
"//source/common/http:header_utility_lib",
"//source/common/http:headers_lib",
"//source/common/router:header_parser_lib",
"//source/common/runtime:runtime_lib",
"//source/extensions/filters/http/common:stream_rate_limiter_lib",
"@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto",
"@envoy_api//envoy/type/v3:pkg_cc_proto",
],
)

envoy_cc_extension(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
security_posture = "unknown",
deps = [
":bandwidth_limit_lib",
"//include/envoy/http:filter_interface",
"//source/common/protobuf:utility_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto",
],
)
153 changes: 153 additions & 0 deletions source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h"

#include <string>
#include <vector>

#include "envoy/http/codes.h"

#include "common/http/utility.h"

using envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit;
using Envoy::Extensions::HttpFilters::Common::StreamRateLimiter;

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace BandwidthLimitFilter {

FilterConfig::FilterConfig(const BandwidthLimit& config, Stats::Scope& scope,
Runtime::Loader& runtime, TimeSource& time_source, bool per_route)
: stats_(generateStats(config.stat_prefix(), scope)), runtime_(runtime), scope_(scope),
time_source_(time_source),
limit_kbps_(config.has_limit_kbps() ? config.limit_kbps().value() : 0),
enable_mode_(config.enable_mode()),
fill_rate_(config.has_fill_rate() ? config.fill_rate().value()
: StreamRateLimiter::DefaultFillRate) {
if (per_route && !config.has_limit_kbps()) {
throw EnvoyException("bandwidthlimitfilter: limit must be set for per route filter config");
}

if (fill_rate_ > MaxFillRate) {
throw EnvoyException("bandwidthlimitfilter: fill rate must be <= 32");
}
// The token bucket is configured with a max token count of the number of ticks per second,
// and refills at the same rate, so that we have a per second limit which refills gradually in
// 1/fill_rate intervals.
token_bucket_ = std::make_shared<TokenBucketImpl>(fill_rate_, time_source, fill_rate_);
}

BandwidthLimitStats FilterConfig::generateStats(const std::string& prefix, Stats::Scope& scope) {
const std::string final_prefix = prefix + ".http_bandwidth_limit";
return {ALL_BANDWIDTH_LIMIT_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))};
}

// BandwidthLimiter members

Http::FilterHeadersStatus BandwidthLimiter::decodeHeaders(Http::RequestHeaderMap&, bool) {
const auto* config = getConfig();

auto mode = config->enable_mode();
ENVOY_LOG(trace, "BandwidthLimiter: decode headers: mode={}", static_cast<uint32_t>(mode));

if (mode & BandwidthLimit::Ingress) {
config->stats().enabled_.inc();
ingress_limiter_ = std::make_unique<Envoy::Extensions::HttpFilters::Common::StreamRateLimiter>(
config_->limit(), decoder_callbacks_->decoderBufferLimit(),
[this] { decoder_callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); },
[this] { decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); },
[this](Buffer::Instance& data, bool end_stream) {
decoder_callbacks_->injectDecodedDataToFilterChain(data, end_stream);
},
[this] { decoder_callbacks_->continueDecoding(); }, config_->timeSource(),
decoder_callbacks_->dispatcher(), decoder_callbacks_->scope(), config_->tokenBucket(),
config_->fill_rate());
}
ENVOY_LOG(trace, "BandwidthLimiter: decode headers: ingress_limiter_={}",
ingress_limiter_ ? true : false);

return Http::FilterHeadersStatus::Continue;
}

Http::FilterDataStatus BandwidthLimiter::decodeData(Buffer::Instance& data, bool end_stream) {
if (ingress_limiter_ != nullptr) {
ingress_limiter_->writeData(data, end_stream);
return Http::FilterDataStatus::StopIterationNoBuffer;
}
ENVOY_LOG(trace, "BandwidthLimiter: decode data: ingress_limiter_ not set");
return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus BandwidthLimiter::decodeTrailers(Http::RequestTrailerMap&) {
if (ingress_limiter_ != nullptr) {
return ingress_limiter_->onTrailers() ? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
}
return Http::FilterTrailersStatus::Continue;
}

Http::FilterHeadersStatus BandwidthLimiter::encodeHeaders(Http::ResponseHeaderMap&, bool) {
const auto* config = getConfig();

auto mode = config->enable_mode();
ENVOY_LOG(trace, "BandwidthLimiter: encode headers: mode={}", static_cast<uint32_t>(mode));

if (mode & BandwidthLimit::Egress) {
config->stats().enabled_.inc();

egress_limiter_ = std::make_unique<Envoy::Extensions::HttpFilters::Common::StreamRateLimiter>(
config_->limit(), encoder_callbacks_->encoderBufferLimit(),
[this] { encoder_callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); },
[this] { encoder_callbacks_->onEncoderFilterBelowWriteBufferLowWatermark(); },
[this](Buffer::Instance& data, bool end_stream) {
encoder_callbacks_->injectEncodedDataToFilterChain(data, end_stream);
},
[this] { encoder_callbacks_->continueEncoding(); }, config_->timeSource(),
encoder_callbacks_->dispatcher(), encoder_callbacks_->scope(), config_->tokenBucket(),
config_->fill_rate());
}
ENVOY_LOG(trace, "BandwidthLimiter: encode headers: egress_limiter_={}",
egress_limiter_ ? true : false);

return Http::FilterHeadersStatus::Continue;
}

Http::FilterDataStatus BandwidthLimiter::encodeData(Buffer::Instance& data, bool end_stream) {
if (egress_limiter_ != nullptr) {
egress_limiter_->writeData(data, end_stream);
return Http::FilterDataStatus::StopIterationNoBuffer;
}
ENVOY_LOG(trace, "BandwidthLimiter: encode data: egress_limiter_ not set");
return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus BandwidthLimiter::encodeTrailers(Http::ResponseTrailerMap&) {
if (egress_limiter_ != nullptr) {
return egress_limiter_->onTrailers() ? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
}
return Http::FilterTrailersStatus::Continue;
}

const FilterConfig* BandwidthLimiter::getConfig() const {
const auto* config = Http::Utility::resolveMostSpecificPerFilterConfig<FilterConfig>(
"envoy.filters.http.bandwidth_limit", decoder_callbacks_->route());
if (config) {
return config;
}

return config_.get();
}

void BandwidthLimiter::onDestroy() {
if (ingress_limiter_ != nullptr) {
ingress_limiter_->destroy();
}
if (egress_limiter_ != nullptr) {
egress_limiter_->destroy();
}
}

} // namespace BandwidthLimitFilter
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading