-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Flow control for Http::Filters #1417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a604e38
4804be6
1040f67
647845a
3a8cf94
a445e52
4f80dec
9267859
8ac29e1
95de98b
ae42c02
f705fed
f99ca46
d0a823a
7e5b7c1
471fe64
668932f
37d940e
19aaf38
1292a58
558fb59
9ac14b1
6b5f615
220fcf3
9685ee9
a970125
1409730
eaed1ae
9bf5df7
0bead86
612eacb
3bfbc51
a94b527
be29d74
0d534df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,8 @@ class DynamoFilter : public Http::StreamFilter { | |
| } | ||
|
|
||
| // Http::StreamFilterBase | ||
| void onDestroy() override {} | ||
| void setBufferLimit(uint32_t limit) override { buffer_limit_ = limit; } | ||
| void onDestroy() override { stream_reset_ = true; } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is required for sendLocalReply on the event the stream is destroyed mid-call. |
||
|
|
||
| // Http::StreamDecoderFilter | ||
| Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override; | ||
|
|
@@ -60,14 +61,16 @@ class DynamoFilter : public Http::StreamFilter { | |
| std::string stat_prefix_; | ||
| Stats::Scope& scope_; | ||
|
|
||
| bool enabled_{}; | ||
| std::string operation_{}; | ||
| RequestParser::TableDescriptor table_descriptor_{"", true}; | ||
| std::string error_type_{}; | ||
| MonotonicTime start_decode_; | ||
| Http::HeaderMap* response_headers_; | ||
| Http::StreamDecoderFilterCallbacks* decoder_callbacks_{}; | ||
| Http::StreamEncoderFilterCallbacks* encoder_callbacks_{}; | ||
| uint32_t buffer_limit_{0}; | ||
| bool enabled_{false}; | ||
| bool stream_reset_{false}; | ||
| }; | ||
|
|
||
| } // namespace Dynamo | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,9 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable { | |
| virtual ~GrpcWebFilter(){}; | ||
|
|
||
| // Http::StreamFilterBase | ||
| void onDestroy() override{}; | ||
| // Ignore buffer limtis: see ASSERT in decodeData: decoding_buffer_ buffers less than 4 bytes. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo "limtis" |
||
| void setBufferLimit(uint32_t) override {} | ||
| void onDestroy() override {} | ||
|
|
||
| // Implements StreamDecoderFilter. | ||
| Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,11 @@ | |
| #include "common/common/enum_to_int.h" | ||
| #include "common/common/utility.h" | ||
| #include "common/grpc/common.h" | ||
| #include "common/http/codes.h" | ||
| #include "common/http/filter_utility.h" | ||
| #include "common/http/headers.h" | ||
| #include "common/http/http1/codec_impl.h" | ||
| #include "common/http/utility.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Grpc { | ||
|
|
@@ -48,10 +50,17 @@ Http::FilterHeadersStatus Http1BridgeFilter::encodeHeaders(Http::HeaderMap& head | |
| } | ||
| } | ||
|
|
||
| Http::FilterDataStatus Http1BridgeFilter::encodeData(Buffer::Instance&, bool end_stream) { | ||
| Http::FilterDataStatus Http1BridgeFilter::encodeData(Buffer::Instance& data, bool end_stream) { | ||
| if (!do_bridging_ || end_stream) { | ||
| return Http::FilterDataStatus::Continue; | ||
| } else { | ||
| // If the response is too large to fit in the buffer, send an error to the user. | ||
| if (buffer_limit_ > 0 && data.length() > buffer_limit_) { | ||
| Http::Utility::sendLocalReply(*decoder_callbacks_, stream_reset_, | ||
| Http::Code::InternalServerError, | ||
| Http::CodeUtility::toString(Http::Code::InternalServerError)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. InternalServerError vs. PayloadTooLarge? Also same comment about centralized 413 logic? |
||
| return Http::FilterDataStatus::StopIterationNoBuffer; | ||
| } | ||
| return Http::FilterDataStatus::StopIterationAndBuffer; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,8 +9,10 @@ | |
|
|
||
| #include "common/common/assert.h" | ||
| #include "common/common/enum_to_int.h" | ||
| #include "common/http/codes.h" | ||
| #include "common/http/header_map_impl.h" | ||
| #include "common/http/headers.h" | ||
| #include "common/http/utility.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Http { | ||
|
|
@@ -38,10 +40,8 @@ FilterDataStatus BufferFilter::decodeData(Buffer::Instance&, bool end_stream) { | |
| return FilterDataStatus::Continue; | ||
| } else if (callbacks_->decodingBuffer() && | ||
| callbacks_->decodingBuffer()->length() > config_->max_request_bytes_) { | ||
| // TODO(htuch): Switch this to Utility::sendLocalReply(). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can buffer filter now call setDecoderBufferLimit() and have the 413 be by connection manager? On this topic, IMO we should move the rq_too_large_ stat into connection manager (rename if you want). We should have stats in connection manager for sending 413 due to buffering as well as 500 due to buffering, etc.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We totally could - I left it as-is to preserve the existing stats. Will move in my next update if you don't mind the stats going away
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can lose the existing stats as long as we get new stats in connection manager. :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, one more stats question. downstream_rq_too_large totally makes sense in the connection manager. upstream_rs_too_large not so much. I'm not convinced it's worth the plumbing to make the router know the reason it was torn down was due to buffer limits. I could just put rs_too_large in the connection manager stats. WDYT?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's fine. |
||
| Http::HeaderMapPtr response_headers{new HeaderMapImpl{ | ||
| {Headers::get().Status, std::to_string(enumToInt(Http::Code::PayloadTooLarge))}}}; | ||
| callbacks_->encodeHeaders(std::move(response_headers), true); | ||
| Http::Utility::sendLocalReply(*callbacks_, stream_reset_, Http::Code::PayloadTooLarge, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Vaguely wondering if we should have a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking of a base class with onDestroy() which I found more annoying to have all over. But I figured the odds of forgetting the up-call in an override out-weighted the code savings |
||
| CodeUtility::toString(Http::Code::PayloadTooLarge)); | ||
| config_->stats_.rq_too_large_.inc(); | ||
| } | ||
|
|
||
|
|
@@ -58,7 +58,10 @@ BufferFilterStats BufferFilter::generateStats(const std::string& prefix, Stats:: | |
| return {ALL_BUFFER_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; | ||
| } | ||
|
|
||
| void BufferFilter::onDestroy() { resetInternalState(); } | ||
| void BufferFilter::onDestroy() { | ||
| stream_reset_ = true; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can kill this var now. |
||
| resetInternalState(); | ||
| } | ||
|
|
||
| void BufferFilter::onRequestTimeout() { | ||
| // TODO(htuch): Switch this to Utility::sendLocalReply(). | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,7 @@ class BufferFilter : public StreamDecoderFilter { | |
| static BufferFilterStats generateStats(const std::string& prefix, Stats::Scope& scope); | ||
|
|
||
| // Http::StreamFilterBase | ||
| void setBufferLimit(uint32_t) override {} // Buffer filter uses its own configured limits. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mostly thought exercise: Could we actually allow filter to set buffer limit and push 413 to connection manager? This would make this filter trivial in that all it does is provide config to set buffer limit to min(configured_limit, stream_limit). |
||
| void onDestroy() override; | ||
|
|
||
| // Http::StreamDecoderFilter | ||
|
|
@@ -66,6 +67,7 @@ class BufferFilter : public StreamDecoderFilter { | |
| BufferFilterConfigConstSharedPtr config_; | ||
| StreamDecoderFilterCallbacks* callbacks_{}; | ||
| Event::TimerPtr request_timeout_; | ||
| bool stream_reset_{false}; | ||
| }; | ||
|
|
||
| } // Http | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,7 +73,10 @@ FaultFilterConfig::FaultFilterConfig(const Json::Object& json_config, Runtime::L | |
|
|
||
| FaultFilter::FaultFilter(FaultFilterConfigSharedPtr config) : config_(config) {} | ||
|
|
||
| FaultFilter::~FaultFilter() { ASSERT(!delay_timer_); } | ||
| FaultFilter::~FaultFilter() { | ||
| ASSERT(!delay_timer_); | ||
| ASSERT(!high_watermark_called_); | ||
| } | ||
|
|
||
| // Delays and aborts are independent events. One can inject a delay | ||
| // followed by an abort or inject just a delay or abort. In this callback, | ||
|
|
@@ -208,9 +211,15 @@ void FaultFilter::recordAbortsInjectedStats() { | |
| config_->stats().aborts_injected_.inc(); | ||
| } | ||
|
|
||
| FilterDataStatus FaultFilter::decodeData(Buffer::Instance&, bool) { | ||
| return delay_timer_ == nullptr ? FilterDataStatus::Continue | ||
| : FilterDataStatus::StopIterationAndBuffer; | ||
| FilterDataStatus FaultFilter::decodeData(Buffer::Instance& data, bool) { | ||
| if (delay_timer_ == nullptr) { | ||
| return FilterDataStatus::Continue; | ||
| } | ||
| if (buffer_limit_ > 0 && data.length() > buffer_limit_ && !high_watermark_called_) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, perhaps filter can somehow tell connection manager whether to reset and return 413 vs. fire callbacks? |
||
| high_watermark_called_ = true; | ||
| callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems... quite aggressive. Why can't we rely on ActiveStream just taking care of this when we hit the real limit?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a DoS filter, no? I figured this is just the sort of place we want to be aggressive because if DoS is kicking in you want to reserve your memory for uses who aren't flagged as bad.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this filter is for injecting faults for testing (latency and failures). |
||
| } | ||
| return FilterDataStatus::StopIterationAndBuffer; | ||
| } | ||
|
|
||
| FilterTrailersStatus FaultFilter::decodeTrailers(HeaderMap&) { | ||
|
|
@@ -276,6 +285,13 @@ void FaultFilter::resetTimerState() { | |
| delay_timer_->disableTimer(); | ||
| delay_timer_.reset(); | ||
| } | ||
| // It's not necessarily true that the buffer has drained below the low watermark, but this filter | ||
| // is no longer causing data to be buffered and this is the single safest place for watermark | ||
| // resumption. | ||
| if (high_watermark_called_) { | ||
| high_watermark_called_ = false; | ||
| callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); | ||
| } | ||
| } | ||
|
|
||
| void FaultFilter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -80,6 +80,7 @@ class Filter : public StreamDecoderFilter, public Envoy::RateLimit::RequestCallb | |
| : config_(config), client_(std::move(client)) {} | ||
|
|
||
| // Http::StreamFilterBase | ||
| void setBufferLimit(uint32_t limit) override { limiting_buffers_ = limit > 0; } | ||
| void onDestroy() override; | ||
|
|
||
| // Http::StreamDecoderFilter | ||
|
|
@@ -103,9 +104,11 @@ class Filter : public StreamDecoderFilter, public Envoy::RateLimit::RequestCallb | |
| FilterConfigSharedPtr config_; | ||
| Envoy::RateLimit::ClientPtr client_; | ||
| StreamDecoderFilterCallbacks* callbacks_{}; | ||
| bool initiating_call_{}; | ||
| State state_{State::NotStarted}; | ||
| Upstream::ClusterInfoConstSharedPtr cluster_; | ||
| bool initiating_call_{}; | ||
| bool limiting_buffers_{false}; | ||
| bool high_watermark_called_{false}; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is vestigial? |
||
| }; | ||
|
|
||
| } // namespace RateLimit | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filter pushes buffering down to the connection manager. In this case, should we rely on the connection manager to actually send the 413 and reset? Very few filters will end up doing internal buffering that is outside of what the connection manager will provide IMO. Just something to think about.