Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a604e38
filter flow control
alyssawilk Aug 9, 2017
4804be6
nobuffer
alyssawilk Aug 9, 2017
1040f67
Addressing review comments re fault filter, initializer, NoBuffer
alyssawilk Aug 9, 2017
647845a
camelCaseForAll
alyssawilk Aug 9, 2017
3a8cf94
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 10, 2017
a445e52
Work in progress rewrite
alyssawilk Aug 10, 2017
4f80dec
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 16, 2017
9267859
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 17, 2017
8ac29e1
filter return status
alyssawilk Aug 17, 2017
95de98b
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 21, 2017
ae42c02
opt-in filter callbacks. Still insufficient tests
alyssawilk Aug 21, 2017
f705fed
minor cleanup + TODO
alyssawilk Aug 21, 2017
f99ca46
router test, one of several necessary integration tests
alyssawilk Aug 21, 2017
d0a823a
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 23, 2017
7e5b7c1
connection manager unit tests
alyssawilk Aug 23, 2017
471fe64
integration tests
alyssawilk Aug 23, 2017
668932f
handling add[De|En|codedData
alyssawilk Aug 23, 2017
37d940e
renaming dynamo urls just to make config blocks clear
alyssawilk Aug 24, 2017
19aaf38
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 24, 2017
1292a58
doxygen fix
alyssawilk Aug 24, 2017
558fb59
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 28, 2017
9ac14b1
fixing merge format fail. bah! =P
alyssawilk Aug 28, 2017
6b5f615
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 28, 2017
220fcf3
addressing reviewer comments
alyssawilk Aug 28, 2017
9685ee9
hopefully all outstanding comments
alyssawilk Aug 30, 2017
a970125
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 30, 2017
1409730
addressing reviewer comments
alyssawilk Aug 31, 2017
eaed1ae
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 31, 2017
9bf5df7
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 7, 2017
0bead86
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 8, 2017
612eacb
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 11, 2017
3bfbc51
removing a deprecated line, adding tests that sendLocalReply works *s…
alyssawilk Sep 11, 2017
a94b527
fix format
alyssawilk Sep 11, 2017
be29d74
adding a hopefully helpful comment
alyssawilk Sep 11, 2017
0d534df
Removing a check because no, it is not needed
alyssawilk Sep 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ class Stream {
* @param disable informs if reads should be disabled (true) or re-enabled (false).
*/
virtual void readDisable(bool disable) PURE;

/*
* Return the number of bytes this stream is allowed to buffer, or 0 if there is no limit
* configured.
* @return the stream's configured buffer limits.
*/
virtual uint32_t bufferLimit() PURE;
};

/**
Expand Down
12 changes: 12 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ class StreamFilterBase {
public:
virtual ~StreamFilterBase() {}

/**
* This routine is called on filter creation, setting the buffer limit for the
* filter. Filters should abide by these limits unless custom configuration
* overrides the limit. A buffer limit of 0 bytes indicates no limits are applied.
*
* If filters buffer enough bytes to hit the high watermark, it should either
* call on[Encoder|Decoder]FilterAboveWriteBufferHighWatermark to halt the
* flow of data or send an error response such as 413 (Payload Too Large).
* @param byte_limit supplies number of bytes this filter may buffer by default.
*/
virtual void setBufferLimit(uint32_t byte_limit) PURE;

/**
* This routine is called prior to a filter being destroyed. This may happen after normal stream
* finish (both downstream and upstream) or due to reset. Every filter is responsible for making
Expand Down
1 change: 1 addition & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class WatermarkBuffer : public LibEventInstance {
void postProcess() override { checkLowWatermark(); }

void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t high_watermark() const { return high_watermark_; }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be highWatermark() ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, looks like!

source/common/router/config_impl.h: uint32_t retryOn() const override { return retry_on_; }
source/common/router/config_impl.h: uint32_t numRetries() const override { return num_retries_; }
source/common/http/access_log/request_info_impl.h: const Optional<uint32_t>& responseCode() const override { return response_code_; }

I'll send a separate PR to clarify the style guide.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I have no great opinion on this one way or the other but @dnoe is correct that is what most of the code is doing.


private:
void checkHighWatermark();
Expand Down
10 changes: 10 additions & 0 deletions source/common/dynamo/dynamo_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ Http::FilterDataStatus DynamoFilter::decodeData(Buffer::Instance& data, bool end
onDecodeComplete(data);
}

// If the request is too large, send a 413 to the user. This could be applied before the check
// above, but as Envoy buffer limits are soft limits, if we already have the data buffered might
// as well pass it through.
if (buffer_limit_ > 0 && data.length() > buffer_limit_) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter pushes buffering down to the connection manager. In this case, should we rely on the connection manager to actually send the 413 and reset? Very few filters will end up doing internal buffering that is outside of what the connection manager will provide IMO. Just something to think about.

scope_.counter(fmt::format("{}rq_too_large", stat_prefix_)).inc();
Http::Utility::sendLocalReply(*decoder_callbacks_, stream_reset_, Http::Code::PayloadTooLarge,
Http::CodeUtility::toString(Http::Code::PayloadTooLarge));
return Http::FilterDataStatus::StopIterationAndBuffer;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not StopIterationNoBuffer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was modeling what buffer_filter.cc does. Granted it just falls through so if you think NoBuffer makes more sense I'm happy to switch.

}

if (end_stream) {
return Http::FilterDataStatus::Continue;
} else {
Expand Down
7 changes: 5 additions & 2 deletions source/common/dynamo/dynamo_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class DynamoFilter : public Http::StreamFilter {
}

// Http::StreamFilterBase
void onDestroy() override {}
void setBufferLimit(uint32_t limit) override { buffer_limit_ = limit; }
void onDestroy() override { stream_reset_ = true; }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can decodeData ever be called after onDestroy? If not, then stream_reset_ isn't doing anything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is required for sendLocalReply on the event the stream is destroyed mid-call.
see source/common/http/utility.h and #1283


// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Expand Down Expand Up @@ -60,14 +61,16 @@ class DynamoFilter : public Http::StreamFilter {
std::string stat_prefix_;
Stats::Scope& scope_;

bool enabled_{};
std::string operation_{};
RequestParser::TableDescriptor table_descriptor_{"", true};
std::string error_type_{};
MonotonicTime start_decode_;
Http::HeaderMap* response_headers_;
Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
uint32_t buffer_limit_{0};
bool enabled_{};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consistency in primitive type initialization with below.

bool stream_reset_{false};
};

} // namespace Dynamo
Expand Down
3 changes: 3 additions & 0 deletions source/common/grpc/grpc_web_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <arpa/inet.h>

#include "common/common/assert.h"
#include "common/common/base64.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
Expand Down Expand Up @@ -90,6 +91,8 @@ Http::FilterDataStatus GrpcWebFilter::decodeData(Buffer::Instance& data, bool) {
decoding_buffer_.drain(decoding_buffer_.length());
decoding_buffer_.move(data);
data.add(decoded);
// Any block of 4 bytes or more should have been decoded and passed through.
ASSERT(decoding_buffer_.length() < 4);
return Http::FilterDataStatus::Continue;
}

Expand Down
2 changes: 2 additions & 0 deletions source/common/grpc/grpc_web_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable {
virtual ~GrpcWebFilter(){};

// Http::StreamFilterBase
// Ignore buffer limtis: see ASSERT in decodeData: decoding_buffer_ buffers less than 4 bytes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "limtis"

void setBufferLimit(uint32_t) override {}
void onDestroy() override{};

// Implements StreamDecoderFilter.
Expand Down
11 changes: 10 additions & 1 deletion source/common/grpc/http1_bridge_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
#include "common/common/enum_to_int.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
#include "common/http/codes.h"
#include "common/http/filter_utility.h"
#include "common/http/headers.h"
#include "common/http/http1/codec_impl.h"
#include "common/http/utility.h"

namespace Envoy {
namespace Grpc {
Expand Down Expand Up @@ -48,10 +50,17 @@ Http::FilterHeadersStatus Http1BridgeFilter::encodeHeaders(Http::HeaderMap& head
}
}

Http::FilterDataStatus Http1BridgeFilter::encodeData(Buffer::Instance&, bool end_stream) {
Http::FilterDataStatus Http1BridgeFilter::encodeData(Buffer::Instance& data, bool end_stream) {
if (!do_bridging_ || end_stream) {
return Http::FilterDataStatus::Continue;
} else {
// If the response is too large to fit in the buffer, send an error to the user.
if (buffer_limit_ > 0 && data.length() > buffer_limit_) {
Http::Utility::sendLocalReply(*decoder_callbacks_, stream_reset_,
Http::Code::InternalServerError,
Http::CodeUtility::toString(Http::Code::InternalServerError));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InternalServerError vs. PayloadTooLarge? Also same comment about centralized 413 logic?

return Http::FilterDataStatus::StopIterationAndBuffer;
}
return Http::FilterDataStatus::StopIterationAndBuffer;
}
}
Expand Down
5 changes: 4 additions & 1 deletion source/common/grpc/http1_bridge_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class Http1BridgeFilter : public Http::StreamFilter {
Http1BridgeFilter(Upstream::ClusterManager& cm) : cm_(cm) {}

// Http::StreamFilterBase
void onDestroy() override {}
void setBufferLimit(uint32_t limit) override { buffer_limit_ = limit; }
void onDestroy() override { stream_reset_ = true; }

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Expand Down Expand Up @@ -48,9 +49,11 @@ class Http1BridgeFilter : public Http::StreamFilter {
Http::HeaderMap* response_headers_{};
bool do_bridging_{};
bool do_stat_tracking_{};
bool stream_reset_{false};
Upstream::ClusterInfoConstSharedPtr cluster_;
std::string grpc_service_;
std::string grpc_method_;
uint32_t buffer_limit_{0};
};

} // namespace Grpc
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/json_transcoder_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable<
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override;

// Http::StreamFilterBase
void setBufferLimit(uint32_t) override {} // Unimplemented: see TODO in .cc file.
void onDestroy() override { stream_reset_ = true; }

private:
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/filter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ envoy_cc_library(
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:enum_to_int",
"//source/common/http:codes_lib",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:utility_lib",
],
)

Expand Down
13 changes: 8 additions & 5 deletions source/common/http/filter/buffer_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

#include "common/common/assert.h"
#include "common/common/enum_to_int.h"
#include "common/http/codes.h"
#include "common/http/header_map_impl.h"
#include "common/http/headers.h"
#include "common/http/utility.h"

namespace Envoy {
namespace Http {
Expand Down Expand Up @@ -38,10 +40,8 @@ FilterDataStatus BufferFilter::decodeData(Buffer::Instance&, bool end_stream) {
return FilterDataStatus::Continue;
} else if (callbacks_->decodingBuffer() &&
callbacks_->decodingBuffer()->length() > config_->max_request_bytes_) {
// TODO(htuch): Switch this to Utility::sendLocalReply().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can buffer filter now call setDecoderBufferLimit() and have the 413 be by connection manager? On this topic, IMO we should move the rq_too_large_ stat into connection manager (rename if you want). We should have stats in connection manager for sending 413 due to buffering as well as 500 due to buffering, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We totally could - I left it as-is to preserve the existing stats. Will move in my next update if you don't mind the stats going away

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can lose the existing stats as long as we get new stats in connection manager. :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, one more stats question. downstream_rq_too_large totally makes sense in the connection manager. upstream_rs_too_large not so much. I'm not convinced it's worth the plumbing to make the router know the reason it was torn down was due to buffer limits. I could just put rs_too_large in the connection manager stats. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine.

Http::HeaderMapPtr response_headers{new HeaderMapImpl{
{Headers::get().Status, std::to_string(enumToInt(Http::Code::PayloadTooLarge))}}};
callbacks_->encodeHeaders(std::move(response_headers), true);
Http::Utility::sendLocalReply(*callbacks_, stream_reset_, Http::Code::PayloadTooLarge,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vaguely wondering if we should have a StreamDecoderFilterWithBoundedBuffer class to inherit from that handles some of this automagically, but don't feel that strongly given how simple this watermark check is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of a base class with onDestroy() which I found more annoying to have all over. But I figured the odds of forgetting the up-call in an override out-weighted the code savings

CodeUtility::toString(Http::Code::PayloadTooLarge));
config_->stats_.rq_too_large_.inc();
}

Expand All @@ -58,7 +58,10 @@ BufferFilterStats BufferFilter::generateStats(const std::string& prefix, Stats::
return {ALL_BUFFER_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))};
}

void BufferFilter::onDestroy() { resetInternalState(); }
void BufferFilter::onDestroy() {
stream_reset_ = true;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can kill this var now.

resetInternalState();
}

void BufferFilter::onRequestTimeout() {
// TODO(htuch): Switch this to Utility::sendLocalReply().
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/filter/buffer_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BufferFilter : public StreamDecoderFilter {
static BufferFilterStats generateStats(const std::string& prefix, Stats::Scope& scope);

// Http::StreamFilterBase
void setBufferLimit(uint32_t) override {} // Buffer filter uses its own configured limits.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly thought exercise: Could we actually allow filter to set buffer limit and push 413 to connection manager? This would make this filter trivial in that all it does is provide config to set buffer limit to min(configured_limit, stream_limit).

void onDestroy() override;

// Http::StreamDecoderFilter
Expand All @@ -66,6 +67,7 @@ class BufferFilter : public StreamDecoderFilter {
BufferFilterConfigConstSharedPtr config_;
StreamDecoderFilterCallbacks* callbacks_{};
Event::TimerPtr request_timeout_;
bool stream_reset_{false};
};

} // Http
Expand Down
24 changes: 21 additions & 3 deletions source/common/http/filter/fault_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ FaultFilterConfig::FaultFilterConfig(const Json::Object& json_config, Runtime::L

FaultFilter::FaultFilter(FaultFilterConfigSharedPtr config) : config_(config) {}

FaultFilter::~FaultFilter() { ASSERT(!delay_timer_); }
FaultFilter::~FaultFilter() {
ASSERT(!delay_timer_);
ASSERT(!high_watermark_called_);
}

// Delays and aborts are independent events. One can inject a delay
// followed by an abort or inject just a delay or abort. In this callback,
Expand Down Expand Up @@ -209,8 +212,16 @@ void FaultFilter::recordAbortsInjectedStats() {
}

FilterDataStatus FaultFilter::decodeData(Buffer::Instance&, bool) {
return delay_timer_ == nullptr ? FilterDataStatus::Continue
: FilterDataStatus::StopIterationAndBuffer;
if (delay_timer_ == nullptr) {
return FilterDataStatus::Continue;
}
// The fault filter mimizes buffering even more aggressively than configured, to avoid
// accumulating data during a delay.
if (limiting_buffers_ && !high_watermark_called_) {
high_watermark_called_ = true;
callbacks_->onDecoderFilterAboveWriteBufferHighWatermark();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems... quite aggressive. Why can't we rely on ActiveStream just taking care of this when we hit the real limit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a DoS filter, no? I figured this is just the sort of place we want to be aggressive because if DoS is kicking in you want to reserve your memory for uses who aren't flagged as bad.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this filter is for injecting faults for testing (latency and failures).

}
return FilterDataStatus::StopIterationAndBuffer;
}

FilterTrailersStatus FaultFilter::decodeTrailers(HeaderMap&) {
Expand Down Expand Up @@ -276,6 +287,13 @@ void FaultFilter::resetTimerState() {
delay_timer_->disableTimer();
delay_timer_.reset();
}
// It's not necessarily true that the buffer has drained below the low watermark, but this filter
// is no longer causing data to be buffered and this is the single safest place for watermark
// resumption.
if (high_watermark_called_) {
high_watermark_called_ = false;
callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}
}

void FaultFilter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) {
Expand Down
5 changes: 5 additions & 0 deletions source/common/http/filter/fault_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class FaultFilter : public StreamDecoderFilter {
~FaultFilter();

// Http::StreamFilterBase
// Fault filter is more aggressive and limits buffered data even before
// hitting the buffer limit.
void setBufferLimit(uint32_t limit) override { limiting_buffers_ = limit > 0; }
void onDestroy() override;

// Http::StreamDecoderFilter
Expand All @@ -107,6 +110,8 @@ class FaultFilter : public StreamDecoderFilter {
StreamDecoderFilterCallbacks* callbacks_{};
Event::TimerPtr delay_timer_;
std::string downstream_cluster_{};
bool limiting_buffers_{false};
bool high_watermark_called_{false};

std::string downstream_cluster_delay_percent_key_{};
std::string downstream_cluster_abort_percent_key_{};
Expand Down
1 change: 1 addition & 0 deletions source/common/http/filter/ip_tagging_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class IpTaggingFilter : public StreamDecoderFilter {
~IpTaggingFilter();

// Http::StreamFilterBase
void setBufferLimit(uint32_t) override {} // Non-buffering filter.
void onDestroy() override;

// Http::StreamDecoderFilter
Expand Down
20 changes: 18 additions & 2 deletions source/common/http/filter/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,16 @@ FilterHeadersStatus Filter::decodeHeaders(HeaderMap& headers, bool) {

FilterDataStatus Filter::decodeData(Buffer::Instance&, bool) {
ASSERT(state_ != State::Responded);
return state_ == State::Calling ? FilterDataStatus::StopIterationAndBuffer
: FilterDataStatus::Continue;
if (state_ != State::Calling) {
return FilterDataStatus::Continue;
}
// The fault filter mimizes buffering even more aggressively than configured, to avoid

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: minimizes

// accumulating data for rate limited requests.
if (limiting_buffers_ && !high_watermark_called_) {
high_watermark_called_ = true;
callbacks_->onDecoderFilterAboveWriteBufferHighWatermark();
}
return FilterDataStatus::StopIterationAndBuffer;
}

FilterTrailersStatus Filter::decodeTrailers(HeaderMap&) {
Expand All @@ -99,6 +107,10 @@ void Filter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks)
}

void Filter::onDestroy() {
if (high_watermark_called_) {
high_watermark_called_ = false;
callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}
if (state_ == State::Calling) {
state_ = State::Complete;
client_->cancel();
Expand All @@ -108,6 +120,10 @@ void Filter::onDestroy() {
void Filter::complete(Envoy::RateLimit::LimitStatus status) {
state_ = State::Complete;

if (high_watermark_called_) {
high_watermark_called_ = false;
callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}
switch (status) {
case Envoy::RateLimit::LimitStatus::OK:
cluster_->statsScope().counter("ratelimit.ok").inc();
Expand Down
5 changes: 4 additions & 1 deletion source/common/http/filter/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Filter : public StreamDecoderFilter, public Envoy::RateLimit::RequestCallb
: config_(config), client_(std::move(client)) {}

// Http::StreamFilterBase
void setBufferLimit(uint32_t limit) override { limiting_buffers_ = limit > 0; }
void onDestroy() override;

// Http::StreamDecoderFilter
Expand All @@ -103,9 +104,11 @@ class Filter : public StreamDecoderFilter, public Envoy::RateLimit::RequestCallb
FilterConfigSharedPtr config_;
Envoy::RateLimit::ClientPtr client_;
StreamDecoderFilterCallbacks* callbacks_{};
bool initiating_call_{};
State state_{State::NotStarted};
Upstream::ClusterInfoConstSharedPtr cluster_;
bool initiating_call_{};
bool limiting_buffers_{false};
bool high_watermark_called_{false};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is vestigial?

};

} // namespace RateLimit
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class StreamEncoderImpl : public StreamEncoder,
void addCallbacks(StreamCallbacks& callbacks) override { addCallbacks_(callbacks); }
void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); }
void resetStream(StreamResetReason reason) override;
// TODO(alyssawilk) HTTP/1.1 flow control.
void readDisable(bool) override {}
uint32_t bufferLimit() override { return 0; }

protected:
StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {}
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class ConnectionImpl : public virtual Connection, Logger::Loggable<Logger::Id::h
void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); }
void resetStream(StreamResetReason reason) override;
virtual void readDisable(bool disable) override;
virtual uint32_t bufferLimit() override { return pending_recv_data_.high_watermark(); }

void setWriteBufferWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
pending_recv_data_.setWatermarks(low_watermark, high_watermark);
Expand Down
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ envoy_cc_library(
"//include/envoy/stats:stats_macros",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/buffer:watermark_buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:enum_to_int",
Expand Down
Loading