Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Version history
* outlier_detection: added support for :ref:`outlier detection event protobuf-based logging <arch_overview_outlier_detection_logging>`.
* mysql: added a MySQL proxy filter that is capable of parsing SQL queries over MySQL wire protocol. Refer to ::ref:`MySQL proxy<config_network_filters_mysql_proxy>` for more details.
* http: added :ref:`max request headers size <envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.max_request_headers_kb>`. The default behaviour is unchanged.
* http: added modifyDecodingBuffer/modifyEncodingBuffer to allow modifying the buffered request/response data.
* redis: added :ref:`hashtagging <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.enable_hashtagging>` to guarantee a given key's upstream.
* redis: added :ref:`latency stats <config_network_filters_redis_proxy_per_command_stats>` for commands.
* redis: added :ref:`success and error stats <config_network_filters_redis_proxy_per_command_stats>` for commands.
Expand Down
12 changes: 12 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual const Buffer::Instance* decodingBuffer() PURE;

/**
* Allows modifying the decoding buffer. May only be called before any data has been continued
* past the calling filter.
*/
virtual void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) PURE;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used a callback style here to make it clear to the caller that they're not supposed to retain a pointer to the buffer

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, chiming in late.

If we're going to need a custom filter in order to buffer the whole body and then call modifyDecodingBuffer, would it be possible to refactor the buffering filter to have a stream complete callback and folks can subclass and implement the onBufferingFilterStreamComplete callback? Or better yet we could implement #5834, subclass the buffering filter and override the base class onEncode/DecodeComplete.

My concern is both adding extra complexity to the HCM for something I think we can push into an exiting filter, and I think by design it's subject to the data-with-end-stream problem called out below.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. @snowp if this makes sense to you I would be in favor of closing this and doing what @alyssawilk says?


/**
* Add buffered body data. This method is used in advanced cases where returning
* StopIterationAndBuffer from decodeData() is not sufficient.
Expand Down Expand Up @@ -430,6 +436,12 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual const Buffer::Instance* encodingBuffer() PURE;

/**
* Allows modifying the encoding buffer. May only be called before any data has been continued
* past the calling filter.
*/
virtual void modifyEncodingBuffer(std::function<void(Buffer::Instance&)> callback) PURE;

/**
* Add buffered body data. This method is used in advanced cases where returning
* StopIterationAndBuffer from encodeData() is not sufficient.
Expand Down
6 changes: 6 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
ASSERT(buffered_body_ != nullptr);
}
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
void sendLocalReply(Code code, absl::string_view body,
std::function<void(HeaderMap& headers)> modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status) override {
Expand Down Expand Up @@ -383,6 +386,9 @@ class AsyncRequestImpl final : public AsyncClient::Request,
// internal use of the router filter which uses this function for buffering.
}
const Buffer::Instance* decodingBuffer() override { return request_->body().get(); }
void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

MessagePtr request_;
AsyncClient::Callbacks& callbacks_;
Expand Down
37 changes: 37 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@
namespace Envoy {
namespace Http {

namespace {

template <class T> using FilterList = std::list<std::unique_ptr<T>>;

// Shared helper for recording the latest filter used.
template <class T>
void recordLatestDataFilter(const typename FilterList<T>::iterator current_filter,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @soya3129 this is similar to what you had been doing in one of your metadata PRs in case you end up needing this again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice! Thanks! Can be very useful when we allow metadata to go through downstream filters only.

T*& latest_filter, const FilterList<T>& filters) {
// If this is the first time we're calling onData, just record the current filter.
if (latest_filter == nullptr) {
latest_filter = current_filter->get();
return;
}

// We want to keep this pointing at the latest filter in the filter list that has received the
// onData callback. To do so, we compare the current latest with the *previous* filter. If they
// match, then we must be processing a new filter for the first time. We omit this check if we're
// the first filter, since the above check handles that case.
//
// We compare against the previous filter to avoid multiple filter iterations from reseting the
// pointer: If we just set latest to current, then the first onData filter iteration would
// correctly iterate over the filters and set latest, but on subsequent onData iterations
// we'd start from the beginning again, potentially allowing filter N to modify the buffer even
// though filter M > N was the filter that inserted data into the buffer.
if (current_filter != filters.begin() && latest_filter == std::prev(current_filter)->get()) {
latest_filter = current_filter->get();
}
}

} // namespace

ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix,
Stats::Scope& scope) {
return {
Expand Down Expand Up @@ -871,6 +902,9 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter*
if (end_stream) {
state_.filter_call_state_ |= FilterCallState::LastDataFrame;
}

recordLatestDataFilter(entry, state_.latest_data_decoding_filter_, decoder_filters_);

state_.filter_call_state_ |= FilterCallState::DecodeData;
(*entry)->end_stream_ = end_stream && !request_trailers_;
FilterDataStatus status = (*entry)->handle_->decodeData(data, (*entry)->end_stream_);
Expand Down Expand Up @@ -1322,6 +1356,9 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter*
if (end_stream) {
state_.filter_call_state_ |= FilterCallState::LastDataFrame;
}

recordLatestDataFilter(entry, state_.latest_data_encoding_filter_, encoder_filters_);

(*entry)->end_stream_ = end_stream && !response_trailers_;
FilterDataStatus status = (*entry)->handle_->encodeData(data, (*entry)->end_stream_);
state_.filter_call_state_ &= ~FilterCallState::EncodeData;
Expand Down
14 changes: 14 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
const Buffer::Instance* decodingBuffer() override {
return parent_.buffered_request_data_.get();
}

void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) override {
ASSERT(parent_.state_.latest_data_decoding_filter_ == this);
callback(*parent_.buffered_request_data_.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for chiming in late, but this seems like a lot of complexity for an ASSERT. To me, this has similar functionality to just allowing raw buffer access - the filter can do arbitrary transforms on any data, and it'd be far simpler conceptually to just allow connections to access the buffer directly than add on the std::function complexity to get an ASSERT check.
I suspect Matt may disagree as he hasn't called it out so I'm fine letting it stand :-)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we stick with this approach, I do personally think this assert is worth it, as well as the differentiation between const and non-const access, mainly because I think it would be very easy to get hard to understand behavior between filters. With that said, per your other comment, maybe we don't need this change at all?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably need something, as the current buffering filter pushes the buffering into the HCM. We could refactor the buffering filter to do the buffering itself (which seems reasonable) and then subclass, but the extra work was why I am fine going either way.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, sorry, I wasn't thinking very clearly at the end of the day yesterday. As you point out the buffer filter as-is won't do it. I'm not in favor of changing how the buffer filter works mainly because we avoid double buffering in many cases. I.e., the buffer filter buffers, then some other filter buffers but it's a NOP because the HCM has already buffered the data.

I guess in thinking about it more, I'm back to being fine with this solution. Alyssa's concern about not handling end_stream is a good one, though I can't think of any elegant quick fix for that if we keep this general API flow. Any ideas?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry, I'd edited my comment in some window or other and it got eaten by GitHub.

I don't know if we can fix the end_stream thing here, but we can at least update our sample code to do the addDecodedData dance, and then do away with it if #5834 gets fixed. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we can fix the end_stream thing here, but we can at least update our sample code to do the addDecodedData dance, and then do away with it if #5834 gets fixed. WDYT?

+1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all sounds good to me. I'll update the tests

}

void sendLocalReply(Code code, absl::string_view body,
std::function<void(HeaderMap& headers)> modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status) override {
Expand Down Expand Up @@ -252,6 +258,10 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
const Buffer::Instance* encodingBuffer() override {
return parent_.buffered_response_data_.get();
}
void modifyEncodingBuffer(std::function<void(Buffer::Instance&)> callback) override {
ASSERT(parent_.state_.latest_data_encoding_filter_ == this);
callback(*parent_.buffered_response_data_.get());
}

void responseDataTooLarge();
void responseDataDrained();
Expand Down Expand Up @@ -384,6 +394,10 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// True if this stream is internally created. Currently only used for
// internal redirects or other streams created via recreateStream().
bool is_internally_created_ : 1;

// Used to track which filter is the latest filter that has received data.
ActiveStreamEncoderFilter* latest_data_encoding_filter_{};
ActiveStreamDecoderFilter* latest_data_decoding_filter_{};
};

// Possibly increases buffer_limit_ to the value of limit.
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ envoy_cc_test_library(
"//test/common/upstream:utility_lib",
"//test/integration/filters:add_trailers_filter_config_lib",
"//test/integration/filters:headers_only_filter_config_lib",
"//test/integration/filters:modify_buffer_filter_config_lib",
"//test/integration/filters:passthrough_filter_config_lib",
"//test/integration/filters:pause_filter_lib",
"//test/integration/filters:response_metadata_filter_config_lib",
Expand Down
14 changes: 14 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "modify_buffer_filter_config_lib",
srcs = [
"modify_buffer_filter.cc",
],
deps = [
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:empty_http_filter_config_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
],
)

envoy_cc_test_library(
name = "passthrough_filter_config_lib",
srcs = [
Expand Down
61 changes: 61 additions & 0 deletions test/integration/filters/modify_buffer_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "extensions/filters/http/common/empty_http_filter_config.h"
#include "extensions/filters/http/common/pass_through_filter.h"

namespace Envoy {

// A filter that buffers the entire request/response, then doubles
// the content of the filter buffer.
class ModifyBufferStreamFilter : public Http::PassThroughFilter {
public:
Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) {
decoder_callbacks_->addDecodedData(data, true);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this addition, I think you want to return no buffer in the response code. Before @alyssawilk yells at me about this, I agree this is too complicated, and I will continue to think about how to make this better generically. :)

Same for the encode case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that makes sense, updated


if (end_stream) {
decoder_callbacks_->modifyDecodingBuffer([](auto& buffer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you call out in the integration test, this doesn't modify the entire request body when the data arrives with end stream. I suspect this generally won't be what the user wants - we've seen plenty of these bugs (e.g. #5674) where we need to
callbacks_->addDecodedData(data, true);
to capture data sent inline with end_stream

This means that the reference implementation of this function doesn't do what it says (and folks may copy it without realizing) and also that the design is subject to the data-with-final-end-stream pattern which I'd really love to avoid. Can we try to fix this?

// Append the buffer with itself.
buffer.add(buffer);
});
return Http::FilterDataStatus::Continue;
}

return Http::FilterDataStatus::StopIterationNoBuffer;
}

Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) {
encoder_callbacks_->addEncodedData(data, true);

if (end_stream) {
encoder_callbacks_->modifyEncodingBuffer([](auto& buffer) {
// Append the buffer with itself.
buffer.add(buffer);
});
return Http::FilterDataStatus::Continue;
}

return Http::FilterDataStatus::StopIterationNoBuffer;
}
};

class ModifyBuffferFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
public:
ModifyBuffferFilterConfig() : EmptyHttpFilterConfig("modify-buffer-filter") {}

Http::FilterFactoryCb createFilter(const std::string&, Server::Configuration::FactoryContext&) {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<::Envoy::ModifyBufferStreamFilter>());
};
}
};

// perform static registration
static Registry::RegisterFactory<ModifyBuffferFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;

} // namespace Envoy
20 changes: 20 additions & 0 deletions test/integration/protocol_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,26 @@ name: envoy.health_check
EXPECT_STREQ("503", response->headers().Status()->value().c_str());
}

// Add a health check filter and verify correct computation of health based on upstream status.
TEST_P(ProtocolIntegrationTest, ModifyBuffer) {
config_helper_.addFilter(R"EOF(
name: envoy.health_check
config:
pass_through_mode: false
cluster_min_healthy_percentages:
example_cluster_name: { value: 75 }
)EOF");
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeHeaderOnlyRequest(Http::TestHeaderMapImpl{
{":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}});
response->waitForEndStream();

EXPECT_TRUE(response->complete());
EXPECT_STREQ("503", response->headers().Status()->value().c_str());
}

TEST_P(ProtocolIntegrationTest, AddEncodedTrailers) {
config_helper_.addFilter(R"EOF(
name: add-trailers-filter
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD0(addDecodedTrailers, HeaderMap&());
MOCK_METHOD0(decodingBuffer, const Buffer::Instance*());
MOCK_METHOD1(modifyDecodingBuffer, void(std::function<void(Buffer::Instance&)>));
MOCK_METHOD1(encode100ContinueHeaders_, void(HeaderMap& headers));
MOCK_METHOD2(encodeHeaders_, void(HeaderMap& headers, bool end_stream));
MOCK_METHOD2(encodeData, void(Buffer::Instance& data, bool end_stream));
Expand Down Expand Up @@ -221,6 +222,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,
MOCK_METHOD0(addEncodedTrailers, HeaderMap&());
MOCK_METHOD0(continueEncoding, void());
MOCK_METHOD0(encodingBuffer, const Buffer::Instance*());
MOCK_METHOD1(modifyEncodingBuffer, void(std::function<void(Buffer::Instance&)>));

Buffer::InstancePtr buffer_;
testing::NiceMock<Tracing::MockSpan> active_span_;
Expand Down