-
Notifications
You must be signed in to change notification settings - Fork 5.5k
http: add modifyBuffer filter callback #5899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
6ee3396
3562a87
bb8a0cf
764887f
def9ec9
112e224
9140bda
d41ec48
b9e831f
abca73f
0b44ca5
89ae544
8323d02
f2a3b37
e29dded
e7f7a95
1b17e6d
77750ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -189,6 +189,12 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { | |
| */ | ||
| virtual const Buffer::Instance* decodingBuffer() PURE; | ||
|
|
||
| /** | ||
| * Allows modifying the decoding buffer. May only be called before any data has been continued | ||
| * past the calling filter. | ||
| */ | ||
| virtual void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) PURE; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Used a callback style here to make it clear to the caller that they're not supposed to retain a pointer to the buffer
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, chiming in late. If we're going to need a custom filter in order to buffer the whole body and then call modifyDecodingBuffer, would it be possible to refactor the buffering filter to have a stream complete callback and folks can subclass and implement the onBufferingFilterStreamComplete callback? Or better yet we could implement #5834, subclass the buffering filter and override the base class onEncode/DecodeComplete. My concern is both adding extra complexity to the HCM for something I think we can push into an exiting filter, and I think by design it's subject to the data-with-end-stream problem called out below.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point. @snowp if this makes sense to you I would be in favor of closing this and doing what @alyssawilk says? |
||
|
|
||
| /** | ||
| * Add buffered body data. This method is used in advanced cases where returning | ||
| * StopIterationAndBuffer from decodeData() is not sufficient. | ||
|
|
@@ -430,6 +436,12 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks { | |
| */ | ||
| virtual const Buffer::Instance* encodingBuffer() PURE; | ||
|
|
||
| /** | ||
| * Allows modifying the encoding buffer. May only be called before any data has been continued | ||
| * past the calling filter. | ||
| */ | ||
| virtual void modifyEncodingBuffer(std::function<void(Buffer::Instance&)> callback) PURE; | ||
|
|
||
| /** | ||
| * Add buffered body data. This method is used in advanced cases where returning | ||
| * StopIterationAndBuffer from encodeData() is not sufficient. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -299,6 +299,9 @@ class AsyncStreamImpl : public AsyncClient::Stream, | |
| ASSERT(buffered_body_ != nullptr); | ||
| } | ||
| const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); } | ||
| void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) override { | ||
| callback(*buffered_body_.get()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe just NOT_IMPLEMENTED this and below? I don't think the router filter should ever call this currently? |
||
| } | ||
| void sendLocalReply(Code code, absl::string_view body, | ||
| std::function<void(HeaderMap& headers)> modify_headers, | ||
| const absl::optional<Grpc::Status::GrpcStatus> grpc_status) override { | ||
|
|
@@ -370,6 +373,9 @@ class AsyncRequestImpl final : public AsyncClient::Request, | |
| // internal use of the router filter which uses this function for buffering. | ||
| } | ||
| const Buffer::Instance* decodingBuffer() override { return request_->body().get(); } | ||
| void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) override { | ||
| callback(*request_->body().get()); | ||
| } | ||
|
|
||
| MessagePtr request_; | ||
| AsyncClient::Callbacks& callbacks_; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,31 @@ | |
| namespace Envoy { | ||
| namespace Http { | ||
|
|
||
| namespace { | ||
|
|
||
| template <class T> using FilterList = std::list<std::unique_ptr<T>>; | ||
|
|
||
| // Shared helper for recording the latest filter used. | ||
| template <class T> | ||
| void recordLatestDataFilter(const typename FilterList<T>::iterator current_filter, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @soya3129 this is similar to what you had been doing in one of your metadata PRs in case you end up needing this again.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very nice! Thanks! Can be very useful when we allow metadata to go through downstream filters only. |
||
| T** latest_filter, const FilterList<T>& filters) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: small preference for passing a reference to a pointer, since it can't be nullptr, and less dereferencing below, but up to you. |
||
| // If this is the first time we're calling onData, just record the current filter. | ||
| if (*latest_filter == nullptr) { | ||
| *latest_filter = current_filter->get(); | ||
| return; | ||
| } | ||
|
|
||
| // We want to keep this pointing at the latest filter in the filter list that has received the | ||
| // onData callback. To do so, we compare the current latest with the *previous* filter. If they | ||
| // match, then we must be processing a new filter for the first time. We omit this check if we're | ||
| // the first filter, since the above check handles that case. | ||
| if (current_filter != filters.begin() && *latest_filter == std::prev(current_filter)->get()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just universally set latest to current? Seems simple to read/reason about?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to cover the case where there are multiple onData callbacks: If we just set latest to current, then the first onData filter iteration would correctly iterate over the the filters and set Hopefully this makes sense - open for suggestions if this seems unnecessary or if there's a better way.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see OK, makes sense. Can you clarify that a bit in the above comment (just add your additional explanation in the above comment)? |
||
| *latest_filter = current_filter->get(); | ||
| } | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix, | ||
| Stats::Scope& scope) { | ||
| return { | ||
|
|
@@ -874,6 +899,9 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter* | |
| if (end_stream) { | ||
| state_.filter_call_state_ |= FilterCallState::LastDataFrame; | ||
| } | ||
|
|
||
| recordLatestDataFilter(entry, &state_.latest_data_decoding_filter_, decoder_filters_); | ||
|
|
||
| state_.filter_call_state_ |= FilterCallState::DecodeData; | ||
| (*entry)->end_stream_ = end_stream && !request_trailers_; | ||
| FilterDataStatus status = (*entry)->handle_->decodeData(data, (*entry)->end_stream_); | ||
|
|
@@ -1325,6 +1353,9 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* | |
| if (end_stream) { | ||
| state_.filter_call_state_ |= FilterCallState::LastDataFrame; | ||
| } | ||
|
|
||
| recordLatestDataFilter(entry, &state_.latest_data_encoding_filter_, encoder_filters_); | ||
|
|
||
| (*entry)->end_stream_ = end_stream && !response_trailers_; | ||
| FilterDataStatus status = (*entry)->handle_->encodeData(data, (*entry)->end_stream_); | ||
| state_.filter_call_state_ &= ~FilterCallState::EncodeData; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -176,6 +176,12 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| const Buffer::Instance* decodingBuffer() override { | ||
| return parent_.buffered_request_data_.get(); | ||
| } | ||
|
|
||
| void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) override { | ||
| ASSERT(parent_.state_.latest_data_decoding_filter_ == this); | ||
| callback(*parent_.buffered_request_data_.get()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for chiming in late, but this seems like a lot of complexity for an ASSERT. To me, this has similar functionality to just allowing raw buffer access - the filter can do arbitrary transforms on any data, and it'd be far simpler conceptually to just allow connections to access the buffer directly than add on the std::function complexity to get an ASSERT check.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assuming we stick with this approach, I do personally think this assert is worth it, as well as the differentiation between const and non-const access, mainly because I think it would be very easy to get hard to understand behavior between filters. With that said, per your other comment, maybe we don't need this change at all?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we probably need something, as the current buffering filter pushes the buffering into the HCM. We could refactor the buffering filter to do the buffering itself (which seems reasonable) and then subclass, but the extra work was why I am fine going either way.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, sorry, I wasn't thinking very clearly at the end of the day yesterday. As you point out the buffer filter as-is won't do it. I'm not in favor of changing how the buffer filter works mainly because we avoid double buffering in many cases. I.e., the buffer filter buffers, then some other filter buffers but it's a NOP because the HCM has already buffered the data. I guess in thinking about it more, I'm back to being fine with this solution. Alyssa's concern about not handling end_stream is a good one, though I can't think of any elegant quick fix for that if we keep this general API flow. Any ideas?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, sorry, I'd edited my comment in some window or other and it got eaten by GitHub. I don't know if we can fix the end_stream thing here, but we can at least update our sample code to do the addDecodedData dance, and then do away with it if #5834 gets fixed. WDYT?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
+1
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This all sounds good to me. I'll update the tests |
||
| } | ||
|
|
||
| void sendLocalReply(Code code, absl::string_view body, | ||
| std::function<void(HeaderMap& headers)> modify_headers, | ||
| const absl::optional<Grpc::Status::GrpcStatus> grpc_status) override { | ||
|
|
@@ -252,6 +258,10 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| const Buffer::Instance* encodingBuffer() override { | ||
| return parent_.buffered_response_data_.get(); | ||
| } | ||
| void modifyEncodingBuffer(std::function<void(Buffer::Instance&)> callback) override { | ||
| ASSERT(parent_.state_.latest_data_encoding_filter_ == this); | ||
| callback(*parent_.buffered_response_data_.get()); | ||
| } | ||
|
|
||
| void responseDataTooLarge(); | ||
| void responseDataDrained(); | ||
|
|
@@ -384,6 +394,10 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| // True if this stream is internally created. Currently only used for | ||
| // internal redirects or other streams created via recreateStream(). | ||
| bool is_internally_created_ : 1; | ||
|
|
||
| // Used to track which filter is the latest filter that has received data. | ||
| ActiveStreamEncoderFilter* latest_data_encoding_filter_{}; | ||
| ActiveStreamDecoderFilter* latest_data_decoding_filter_{}; | ||
| }; | ||
|
|
||
| // Possibly increases buffer_limit_ to the value of limit. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| #include <string> | ||
|
|
||
| #include "envoy/http/filter.h" | ||
| #include "envoy/registry/registry.h" | ||
| #include "envoy/server/filter_config.h" | ||
|
|
||
| #include "extensions/filters/http/common/empty_http_filter_config.h" | ||
| #include "extensions/filters/http/common/pass_through_filter.h" | ||
|
|
||
| namespace Envoy { | ||
|
|
||
| // A filter that buffers the entire request/response, then doubles | ||
| // the content of the filter buffer. | ||
| class ModifyBufferStreamFilter : public Http::PassThroughFilter { | ||
| public: | ||
| Http::FilterDataStatus decodeData(Buffer::Instance&, bool end_stream) { | ||
| if (end_stream) { | ||
| decoder_callbacks_->modifyDecodingBuffer([](auto& buffer) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you call out in the integration test, this doesn't modify the entire request body when the data arrives with end stream. I suspect this generally won't be what the user wants - we've seen plenty of these bugs (e.g. #5674) where we need to This means that the reference implementation of this function doesn't do what it says (and folks may copy it without realizing) and also that the design is subject to the data-with-final-end-stream pattern which I'd really love to avoid. Can we try to fix this? |
||
| // Append the buffer with itself. | ||
| buffer.add(buffer); | ||
| }); | ||
| return Http::FilterDataStatus::Continue; | ||
| } | ||
|
|
||
| return Http::FilterDataStatus::StopIterationAndBuffer; | ||
| } | ||
|
|
||
| Http::FilterDataStatus encodeData(Buffer::Instance&, bool end_stream) { | ||
| if (end_stream) { | ||
| encoder_callbacks_->modifyEncodingBuffer([](auto& buffer) { | ||
| // Append the buffer with itself. | ||
| buffer.add(buffer); | ||
| }); | ||
| return Http::FilterDataStatus::Continue; | ||
| } | ||
|
|
||
| return Http::FilterDataStatus::StopIterationAndBuffer; | ||
| } | ||
| }; | ||
|
|
||
| class ModifyBuffferFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { | ||
| public: | ||
| ModifyBuffferFilterConfig() : EmptyHttpFilterConfig("modify-buffer-filter") {} | ||
|
|
||
| Http::FilterFactoryCb createFilter(const std::string&, Server::Configuration::FactoryContext&) { | ||
| return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { | ||
| callbacks.addStreamFilter(std::make_shared<::Envoy::ModifyBufferStreamFilter>()); | ||
| }; | ||
| } | ||
| }; | ||
|
|
||
| // perform static registration | ||
| static Registry::RegisterFactory<ModifyBuffferFilterConfig, | ||
| Server::Configuration::NamedHttpFilterConfigFactory> | ||
| register_; | ||
|
|
||
| } // namespace Envoy | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is duplicate