Skip to content
7 changes: 7 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,13 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual HeaderMap& addEncodedTrailers() PURE;

/**
* Adds new metadata to be encoded.
*
* @param metadata_map supplies the unique_ptr of the metadata to be encoded.
*/
virtual void addEncodedMetadata(MetadataMapPtr&& metadata_map) PURE;

/**
* Called when an encoder filter goes over its high watermark.
*/
Expand Down
34 changes: 31 additions & 3 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));

(*entry)->encode_headers_called_ = true;
const auto continue_iteration =
(*entry)->commonHandleAfterHeadersCallback(status, encoding_headers_only_);

Expand Down Expand Up @@ -1438,10 +1439,19 @@ void ConnectionManagerImpl::ActiveStream::encodeMetadata(ActiveStreamEncoderFilt
MetadataMapPtr&& metadata_map_ptr) {
resetIdleTimer();

// Metadata currently go through all filters.
ASSERT(filter == nullptr);
std::list<ActiveStreamEncoderFilterPtr>::iterator entry = encoder_filters_.begin();
std::list<ActiveStreamEncoderFilterPtr>::iterator entry =
commonEncodePrefix(filter, false, FilterIterationStartState::CanStartFromCurrent);

for (; entry != encoder_filters_.end(); entry++) {
// If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
// If the filter pointed by entry hasn't returned from encodeHeaders, stores newly added
// metadata in case encodeHeaders returns StopAllIteration. The latter can happen when headers
// callbacks generate new metadata.
if (!(*entry)->encode_headers_called_ || (*entry)->stoppedAll()) {
(*entry)->getSavedResponseMetadata()->emplace_back(std::move(metadata_map_ptr));
return;
}

FilterMetadataStatus status = (*entry)->handle_->encodeMetadata(*metadata_map_ptr);
ENVOY_STREAM_LOG(trace, "encode metadata called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
Expand Down Expand Up @@ -2086,6 +2096,19 @@ Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamEncoderFilter::cre
return Buffer::WatermarkBufferPtr{buffer};
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::handleMetadataAfterHeadersCallback() {
// If we drain accumulated metadata, the iteration must start with the current filter.
const bool saved_state = iterate_from_current_filter_;
iterate_from_current_filter_ = true;
// If encodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait
// for doMetadata() to drain the metadata after iteration continues.
if (!stoppedAll() && saved_response_metadata_ != nullptr &&
!getSavedResponseMetadata()->empty()) {
drainSavedResponseMetadata();
}
// Restores the original value of iterate_from_current_filter_.
iterate_from_current_filter_ = saved_state;
}
void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data,
bool streaming) {
return parent_.addEncodedData(*this, data, streaming);
Expand All @@ -2101,6 +2124,11 @@ HeaderMap& ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedTrailers(
return parent_.addEncodedTrailers();
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedMetadata(
MetadataMapPtr&& metadata_map_ptr) {
return parent_.encodeMetadata(this, std::move(metadata_map_ptr));
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::
onEncoderFilterAboveWriteBufferHighWatermark() {
ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to filter callbacks.", parent_);
Expand Down
11 changes: 7 additions & 4 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
: parent_(parent), iteration_state_(IterationState::Continue),
iterate_from_current_filter_(false), headers_continued_(false),
continue_headers_continued_(false), end_stream_(false), dual_filter_(dual_filter),
decode_headers_called_(false) {}
decode_headers_called_(false), encode_headers_called_(false) {}

// Functions in the following block are called after the filter finishes processing
// corresponding data. Those functions handle state updates and data storage (if needed)
Expand Down Expand Up @@ -131,7 +131,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
virtual const HeaderMapPtr& trailers() PURE;
virtual void doMetadata() PURE;
// TODO(soya3129): make this pure when adding impl to encodefilter.
virtual void handleMetadataAfterHeadersCallback() {}
virtual void handleMetadataAfterHeadersCallback() PURE;

// Http::StreamFilterCallbacks
const Network::Connection* connection() override;
Expand Down Expand Up @@ -173,8 +173,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// either because [de|en]codeHeaders() of the current filter returns StopAllIteration or because
// [de|en]codeHeaders() adds new metadata to [de|en]code, but we don't know
// [de|en]codeHeaders()'s return value yet. The storage is created on demand.
std::unique_ptr<MetadataMapVector> saved_request_metadata_;
std::unique_ptr<MetadataMapVector> saved_response_metadata_;
std::unique_ptr<MetadataMapVector> saved_request_metadata_{nullptr};
std::unique_ptr<MetadataMapVector> saved_response_metadata_{nullptr};
// The state of iteration.
enum class IterationState {
Continue, // Iteration has not stopped for any frame type.
Expand All @@ -197,6 +197,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
bool end_stream_ : 1;
const bool dual_filter_ : 1;
bool decode_headers_called_ : 1;
bool encode_headers_called_ : 1;
};

/**
Expand Down Expand Up @@ -346,6 +347,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
}
getSavedResponseMetadata()->clear();
}
void handleMetadataAfterHeadersCallback() override;

void doMetadata() override {
if (saved_response_metadata_ != nullptr) {
Expand All @@ -359,6 +361,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void addEncodedData(Buffer::Instance& data, bool streaming) override;
void injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addEncodedTrailers() override;
void addEncodedMetadata(MetadataMapPtr&& metadata_map) override;
void onEncoderFilterAboveWriteBufferHighWatermark() override;
void onEncoderFilterBelowWriteBufferLowWatermark() override;
void setEncoderBufferLimit(uint32_t limit) override { parent_.setBufferLimit(limit); }
Expand Down
17 changes: 7 additions & 10 deletions source/docs/h2_metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,16 @@ StreamDecoderFilter::decodeMetadata(MetadataMap metadata\_map). New metadata can
be added directly to metadata\_map.

If users need to add new metadata for a response to downstream, a
StreamFilter should be created. Users pass the metadata to be added to
StreamDecoderFilterCallbacks::encodeMetadata(MetadataMapPtr&&
StreamEncoderFilter should be created. Users pass the metadata to be added to
StreamEncoderFilterCallbacks::addEncodedMetadata(MetadataMapPtr&&
metadata\_map\_ptr). This function can be called in
StreamFilter::encode100ContinueHeaders(HeaderMap& headers), StreamFilter::encodeHeaders(HeaderMap& headers, bool end\_stream),
StreamFilter::encodeData(Buffer::Instance& data, bool end\_stream), StreamFilter::encodeTrailers(HeaderMap& trailers).
Consequently, the new metadata will be passed through all the encoding filters. Note that, the added
metadata should not share the same key as the metadata to be consumed. Otherwise, the added metadata
will be consumed.
StreamEncoderFilter::encode100ContinueHeaders(HeaderMap& headers), StreamEncoderFilter::encodeHeaders(HeaderMap& headers, bool end\_stream),
StreamEncoderFilter::encodeData(Buffer::Instance& data, bool end\_stream), StreamEncoderFilter::encodeTrailers(HeaderMap& trailers).
Consequently, the new metadata will be passed through all the encoding filters that follow the filter
where the new metadata are added.

If users receive metadata from upstream, new metadata can be added directly to
the input argument metadata\_map in StreamFilter::encodeMetadata(MetadataMap& metadata\_map). Note that,
users should never call StreamDecoderFilterCallbacks::encodeMetadata(MetadataMapPtr&&
metadata\_map\_ptr) to add new metadata in StreamFilter::encodeMetadata(MetadataMap& metadata\_map).
the input argument metadata\_map in StreamFilter::encodeMetadata(MetadataMap& metadata\_map).

### Metadata implementation

Expand Down
1 change: 1 addition & 0 deletions test/common/http/http1/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ envoy_cc_test(
"//test/mocks/protobuf:protobuf_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/mocks/upstream:upstream_mocks",
"//test/test_common:logging_lib",
],
)

Expand Down
1 change: 1 addition & 0 deletions test/common/http/http1/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "test/mocks/protobuf/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/printers.h"
#include "test/test_common/utility.h"

Expand Down
12 changes: 12 additions & 0 deletions test/integration/filters/encode_headers_return_stop_all_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class EncodeHeadersReturnStopAllFilter : public Http::PassThroughFilter {

createTimerForContinue();

Http::MetadataMap metadata_map = {{"headers", "headers"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));

Http::HeaderEntry* entry_buffer = header_map.get(Envoy::Http::LowerCaseString("buffer_limit"));
if (entry_buffer == nullptr) {
return Http::FilterHeadersStatus::StopAllIterationAndBuffer;
Expand All @@ -56,13 +60,21 @@ class EncodeHeadersReturnStopAllFilter : public Http::PassThroughFilter {
// encodeData will only be called once after iteration resumes.
EXPECT_EQ(data.length(), content_size_);
}
Http::MetadataMap metadata_map = {{"data", "data"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));

Buffer::OwnedImpl added_data(std::string(added_size_, 'a'));
encoder_callbacks_->addEncodedData(added_data, false);
return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap&) override {
ASSERT(timer_triggered_);
Http::MetadataMap metadata_map = {{"trailers", "trailers"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));

Buffer::OwnedImpl data(std::string(added_size_, 'a'));
encoder_callbacks_->addEncodedData(data, false);
return Http::FilterTrailersStatus::Continue;
Expand Down
27 changes: 10 additions & 17 deletions test/integration/filters/response_metadata_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,39 @@ class ResponseMetadataStreamFilter : public Http::PassThroughFilter {
public:
// Inserts one new metadata_map.
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap&, bool) override {
Http::MetadataMap metadata_map = {
{"headers", "headers"}, {"duplicate", "duplicate"}, {"remove", "remove"}};
Http::MetadataMap metadata_map = {{"headers", "headers"}, {"duplicate", "duplicate"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
decoder_callbacks_->encodeMetadata(std::move(metadata_map_ptr));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there HCM unit tests we should update as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding new metadata was only covered in integration test.

encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));
return Http::FilterHeadersStatus::Continue;
}

// Inserts one new metadata_map.
Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override {
Http::MetadataMap metadata_map = {
{"data", "data"}, {"duplicate", "duplicate"}, {"remove", "remove"}};
Http::MetadataMap metadata_map = {{"data", "data"}, {"duplicate", "duplicate"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
decoder_callbacks_->encodeMetadata(std::move(metadata_map_ptr));
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));
return Http::FilterDataStatus::Continue;
}

// Inserts two metadata_maps by calling decoder_callbacks_->encodeMetadata() twice.
Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap&) override {
Http::MetadataMap metadata_map = {{"trailers", "trailers"}, {"remove", "remove"}};
Http::MetadataMap metadata_map = {{"trailers", "trailers"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
decoder_callbacks_->encodeMetadata(std::move(metadata_map_ptr));
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));
metadata_map = {{"duplicate", "duplicate"}};
metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
decoder_callbacks_->encodeMetadata(std::move(metadata_map_ptr));
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));
return Http::FilterTrailersStatus::Continue;
}

// Inserts two metadata_maps by calling decoder_callbacks_->encodeMetadata() twice.
Http::FilterHeadersStatus encode100ContinueHeaders(Http::HeaderMap&) override {
Http::MetadataMap metadata_map = {
{"100-continue", "100-continue"}, {"duplicate", "duplicate"}, {"remove", "remove"}};
Http::MetadataMap metadata_map = {{"100-continue", "100-continue"}, {"duplicate", "duplicate"}};
Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
decoder_callbacks_->encodeMetadata(std::move(metadata_map_ptr));
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));
metadata_map = {{"duplicate", "duplicate"}};
metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
decoder_callbacks_->encodeMetadata(std::move(metadata_map_ptr));
encoder_callbacks_->addEncodedMetadata(std::move(metadata_map_ptr));
return Http::FilterHeadersStatus::Continue;
}

Expand All @@ -66,10 +63,6 @@ class ResponseMetadataStreamFilter : public Http::PassThroughFilter {
metadata_map.erase("consume");
metadata_map.emplace("replace", "replace");
}
it = metadata_map.find("remove");
if (it != metadata_map.end()) {
metadata_map.erase("remove");
}
it = metadata_map.find("metadata");
if (it != metadata_map.end()) {
metadata_map.erase("metadata");
Expand Down
Loading