-
Notifications
You must be signed in to change notification settings - Fork 5.3k
Common: Consume, proxy and insert metadata from downstream to upstream #5656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6884899
c986662
676b677
510fbd4
69e47c0
f8150ae
33ee213
6be013a
d762053
5de2d09
78f1c8b
082e4ff
fbbf45e
d957782
eca73af
1a8f4d0
7828283
8b173b9
bbe67be
6d573a3
72fc846
ff0b6ef
c779408
eb1a88f
7d29acb
3f507e4
2effaac
cfb962a
feac3fd
55f36b7
6ca0d45
9f6e92d
51dd633
22d3fe0
2b7e86b
a11b330
ec2b5eb
4ce2c3e
91fe461
ea90bc5
c72ee7e
df82d46
eaa9c9d
e74e302
fbfb66b
bf92ee8
554d0c8
d2ddf13
f1a5650
66ebbc0
2abf956
02f18b7
d8b4185
7e9838c
327e5c3
1ac4073
393f31e
fd2e952
bd1adbc
444518f
7afbb2f
b0f8471
18ea24d
3e2d319
3d20b82
dd9c623
563b449
be630e3
f345fcc
ac1ee95
d281e12
e43c751
be94ae1
24f41ee
fe0e9a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -856,6 +856,21 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte | |
| ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this, | ||
| static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); | ||
|
|
||
| const bool new_metadata_added = processNewlyAddedMetadata(); | ||
|
|
||
| // If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream | ||
| // in headers by inserting an empty data frame with end_stream set. The empty data frame is sent | ||
| // after the new metadata. | ||
| if ((*entry)->end_stream_ && new_metadata_added && !buffered_request_data_) { | ||
| Buffer::OwnedImpl empty_data(""); | ||
| ENVOY_STREAM_LOG( | ||
| trace, "inserting an empty data frame for end_stream due metadata being added.", *this); | ||
| // Metadata frame doesn't carry end of stream bit. We need an empty data frame to end the | ||
| // stream. | ||
| addDecodedData(*((*entry).get()), empty_data, true); | ||
mattklein123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| (*entry)->decode_headers_called_ = true; | ||
mattklein123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!(*entry)->commonHandleAfterHeadersCallback(status, decoding_headers_only_) && | ||
| std::next(entry) != decoder_filters_.end()) { | ||
| // Stop iteration IFF this is not the last filter. If it is the last filter, continue with | ||
|
|
@@ -981,6 +996,8 @@ void ConnectionManagerImpl::ActiveStream::decodeData( | |
| ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this, | ||
| static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); | ||
|
|
||
| processNewlyAddedMetadata(); | ||
|
|
||
| if (!trailers_exists_at_start && request_trailers_ && | ||
| trailers_added_entry == decoder_filters_.end()) { | ||
| trailers_added_entry = entry; | ||
|
|
@@ -1038,6 +1055,10 @@ void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilt | |
| } | ||
| } | ||
|
|
||
| MetadataMapVector& ConnectionManagerImpl::ActiveStream::addDecodedMetadata() { | ||
| return *getRequestMetadataMapVector(); | ||
| } | ||
|
|
||
| void ConnectionManagerImpl::ActiveStream::decodeTrailers(HeaderMapPtr&& trailers) { | ||
| ScopeTrackerScopeState scope(this, | ||
| connection_manager_.read_callbacks_->connection().dispatcher()); | ||
|
|
@@ -1077,13 +1098,48 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(ActiveStreamDecoderFilt | |
| state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers; | ||
| ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this, | ||
| static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); | ||
|
|
||
| processNewlyAddedMetadata(); | ||
|
|
||
| if (!(*entry)->commonHandleAfterTrailersCallback(status)) { | ||
| return; | ||
| } | ||
| } | ||
| disarmRequestTimeout(); | ||
| } | ||
|
|
||
| void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) { | ||
| resetIdleTimer(); | ||
| // After going through filters, the ownership of metadata_map will be passed to terminal filter. | ||
| // The terminal filter may encode metadata_map to the next hop immediately or store metadata_map | ||
| // and encode later when connection pool is ready. | ||
| decodeMetadata(nullptr, *metadata_map); | ||
| } | ||
|
|
||
| void ConnectionManagerImpl::ActiveStream::decodeMetadata(ActiveStreamDecoderFilter* filter, | ||
| MetadataMap& metadata_map) { | ||
| // Filter iteration may start at the current filter. | ||
| std::list<ActiveStreamDecoderFilterPtr>::iterator entry = | ||
| commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent); | ||
|
|
||
| for (; entry != decoder_filters_.end(); entry++) { | ||
| // If the filter pointed by entry has stopped for all frame type, stores metadata and returns. | ||
| // If the filter pointed by entry hasn't returned from decodeHeaders, stores newly added | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry when does this happen? Does this somehow happen if inside the headers callback the filter adds metadata? Can you clarify that if so in the comment?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's right. Added comments to clarify. |
||
| // metadata in case decodeHeaders returns StopAllIteration. The latter can happen when headers | ||
| // callbacks generate new metadata. | ||
| if (!(*entry)->decode_headers_called_ || (*entry)->stoppedAll()) { | ||
| Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map); | ||
| (*entry)->getSavedRequestMetadata()->emplace_back(std::move(metadata_map_ptr)); | ||
| return; | ||
| } | ||
|
|
||
| FilterMetadataStatus status = (*entry)->handle_->decodeMetadata(metadata_map); | ||
| ENVOY_STREAM_LOG(trace, "decode metadata called: filter={} status={}, metadata: {}", *this, | ||
| static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status), | ||
| metadata_map); | ||
| } | ||
| } | ||
|
|
||
| void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) { | ||
| ASSERT(!state_.remote_complete_); | ||
| state_.remote_complete_ = end_stream; | ||
|
|
@@ -1555,6 +1611,17 @@ void ConnectionManagerImpl::ActiveStream::maybeEndEncode(bool end_stream) { | |
| } | ||
| } | ||
|
|
||
| bool ConnectionManagerImpl::ActiveStream::processNewlyAddedMetadata() { | ||
| if (request_metadata_map_vector_ == nullptr) { | ||
| return false; | ||
| } | ||
| for (const auto& metadata_map : *getRequestMetadataMapVector()) { | ||
| decodeMetadata(nullptr, *metadata_map); | ||
| } | ||
| getRequestMetadataMapVector()->clear(); | ||
| return true; | ||
| } | ||
|
|
||
| bool ConnectionManagerImpl::ActiveStream::handleDataIfStopAll(ActiveStreamFilterBase& filter, | ||
| Buffer::Instance& data, | ||
| bool& filter_streaming) { | ||
|
|
@@ -1698,6 +1765,8 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() { | |
| doHeaders(complete() && !bufferedData() && !trailers()); | ||
| } | ||
|
|
||
| doMetadata(); | ||
|
|
||
| // Make sure we handle filters returning StopIterationNoBuffer and then commonContinue by flushing | ||
| // the terminal fin. | ||
| const bool end_stream_with_data = complete() && !trailers(); | ||
|
|
@@ -1740,22 +1809,25 @@ bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfterHeadersCall | |
|
|
||
| if (status == FilterHeadersStatus::StopIteration) { | ||
| iteration_state_ = IterationState::StopSingleIteration; | ||
| return false; | ||
| } else if (status == FilterHeadersStatus::StopAllIterationAndBuffer) { | ||
| iteration_state_ = IterationState::StopAllBuffer; | ||
| return false; | ||
| } else if (status == FilterHeadersStatus::StopAllIterationAndWatermark) { | ||
| iteration_state_ = IterationState::StopAllWatermark; | ||
| return false; | ||
| } else if (status == FilterHeadersStatus::ContinueAndEndStream) { | ||
| // Set headers_only to true so we know to end early if necessary, | ||
| // but continue filter iteration so we actually write the headers/run the cleanup code. | ||
| headers_only = true; | ||
| ENVOY_STREAM_LOG(debug, "converting to headers only", parent_); | ||
| return true; | ||
| } else { | ||
| ASSERT(status == FilterHeadersStatus::Continue); | ||
| headers_continued_ = true; | ||
| } | ||
|
|
||
| handleMetadataAfterHeadersCallback(); | ||
|
|
||
| if (stoppedAll() || status == FilterHeadersStatus::StopIteration) { | ||
| return false; | ||
| } else { | ||
| return true; | ||
| } | ||
| } | ||
|
|
@@ -1871,6 +1943,19 @@ Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::cre | |
| return buffer; | ||
| } | ||
|
|
||
| void ConnectionManagerImpl::ActiveStreamDecoderFilter::handleMetadataAfterHeadersCallback() { | ||
| // If we drain accumulated metadata, the iteration must start with the current filter. | ||
| const bool saved_state = iterate_from_current_filter_; | ||
| iterate_from_current_filter_ = true; | ||
| // If decodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait | ||
| // for doMetadata() to drain the metadata after iteration continues. | ||
| if (!stoppedAll() && saved_request_metadata_ != nullptr && !getSavedRequestMetadata()->empty()) { | ||
| drainSavedRequestMetadata(); | ||
| } | ||
| // Restores the original value of iterate_from_current_filter_. | ||
| iterate_from_current_filter_ = saved_state; | ||
| } | ||
|
|
||
| HeaderMap& ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedTrailers() { | ||
| return parent_.addDecodedTrailers(); | ||
| } | ||
|
|
@@ -1880,6 +1965,10 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::In | |
| parent_.addDecodedData(*this, data, streaming); | ||
| } | ||
|
|
||
| MetadataMapVector& ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedMetadata() { | ||
| return parent_.addDecodedMetadata(); | ||
| } | ||
|
|
||
| void ConnectionManagerImpl::ActiveStreamDecoderFilter::injectDecodedDataToFilterChain( | ||
| Buffer::Instance& data, bool end_stream) { | ||
| parent_.decodeData(this, data, end_stream, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,7 +100,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| ActiveStreamFilterBase(ActiveStream& parent, bool dual_filter) | ||
| : parent_(parent), iteration_state_(IterationState::Continue), | ||
| iterate_from_current_filter_(false), headers_continued_(false), | ||
| continue_headers_continued_(false), end_stream_(false), dual_filter_(dual_filter) {} | ||
| continue_headers_continued_(false), end_stream_(false), dual_filter_(dual_filter), | ||
| decode_headers_called_(false) {} | ||
|
|
||
| // Functions in the following block are called after the filter finishes processing | ||
| // corresponding data. Those functions handle state updates and data storage (if needed) | ||
|
|
@@ -128,6 +129,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| virtual void doData(bool end_stream) PURE; | ||
| virtual void doTrailers() PURE; | ||
| virtual const HeaderMapPtr& trailers() PURE; | ||
| virtual void doMetadata() PURE; | ||
| // TODO(soya3129): make this pure when adding impl to encodefilter. | ||
| virtual void handleMetadataAfterHeadersCallback() {} | ||
|
|
||
| // Http::StreamFilterCallbacks | ||
| const Network::Connection* connection() override; | ||
|
|
@@ -151,7 +155,25 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| ASSERT(iteration_state_ != IterationState::Continue); | ||
| iteration_state_ = IterationState::Continue; | ||
| } | ||
| MetadataMapVector* getSavedRequestMetadata() { | ||
| if (saved_request_metadata_ == nullptr) { | ||
| saved_request_metadata_ = std::make_unique<MetadataMapVector>(); | ||
| } | ||
| return saved_request_metadata_.get(); | ||
| } | ||
| MetadataMapVector* getSavedResponseMetadata() { | ||
| if (saved_response_metadata_ == nullptr) { | ||
| saved_response_metadata_ = std::make_unique<MetadataMapVector>(); | ||
| } | ||
| return saved_response_metadata_.get(); | ||
| } | ||
|
|
||
| // A vector to save metadata when the current filter's [de|en]codeMetadata() can not be called, | ||
| // either because [de|en]codeHeaders() of the current filter returns StopAllIteration or because | ||
| // [de|en]codeHeaders() adds new metadata to [de|en]code, but we don't know | ||
| // [de|en]codeHeaders()'s return value yet. The storage is created on demand. | ||
| std::unique_ptr<MetadataMapVector> saved_request_metadata_; | ||
| std::unique_ptr<MetadataMapVector> saved_response_metadata_; | ||
| // The state of iteration. | ||
| enum class IterationState { | ||
| Continue, // Iteration has not stopped for any frame type. | ||
|
|
@@ -173,6 +195,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| // If true, end_stream is called for this filter. | ||
| bool end_stream_ : 1; | ||
| const bool dual_filter_ : 1; | ||
| bool decode_headers_called_ : 1; | ||
| }; | ||
|
|
||
| /** | ||
|
|
@@ -205,13 +228,29 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| parent_.decodeData(this, *parent_.buffered_request_data_, end_stream, | ||
| ActiveStream::FilterIterationStartState::CanStartFromCurrent); | ||
| } | ||
| void doMetadata() override { | ||
| if (saved_request_metadata_ != nullptr) { | ||
| drainSavedRequestMetadata(); | ||
| } | ||
| } | ||
| void doTrailers() override { parent_.decodeTrailers(this, *parent_.request_trailers_); } | ||
| const HeaderMapPtr& trailers() override { return parent_.request_trailers_; } | ||
|
|
||
| void drainSavedRequestMetadata() { | ||
| ASSERT(saved_request_metadata_ != nullptr); | ||
| for (auto& metadata_map : *getSavedRequestMetadata()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps ASSERT that the storage vector pointer is not nullptr so we don't end up creating an empty vector anyway? Can you audit other calls to make sure we aren't accidentally creating a vector just to check that it's empty?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Fixed in both request and response direction.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have a parallel assert in drainSavedResponseMetadata?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Thanks! |
||
| parent_.decodeMetadata(this, *metadata_map); | ||
| } | ||
| getSavedRequestMetadata()->clear(); | ||
| } | ||
| // This function is called after the filter calls decodeHeaders() to drain accumulated metadata. | ||
| void handleMetadataAfterHeadersCallback() override; | ||
|
|
||
| // Http::StreamDecoderFilterCallbacks | ||
| void addDecodedData(Buffer::Instance& data, bool streaming) override; | ||
| void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override; | ||
| HeaderMap& addDecodedTrailers() override; | ||
| MetadataMapVector& addDecodedMetadata() override; | ||
| void continueDecoding() override; | ||
| const Buffer::Instance* decodingBuffer() override { | ||
| return parent_.buffered_request_data_.get(); | ||
|
|
@@ -299,6 +338,19 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| parent_.encodeData(this, *parent_.buffered_response_data_, end_stream, | ||
| ActiveStream::FilterIterationStartState::CanStartFromCurrent); | ||
| } | ||
| void drainSavedResponseMetadata() { | ||
| ASSERT(saved_response_metadata_ != nullptr); | ||
| for (auto& metadata_map : *getSavedResponseMetadata()) { | ||
| parent_.encodeMetadata(this, std::move(metadata_map)); | ||
| } | ||
| getSavedResponseMetadata()->clear(); | ||
| } | ||
|
|
||
| void doMetadata() override { | ||
| if (saved_response_metadata_ != nullptr) { | ||
| drainSavedResponseMetadata(); | ||
| } | ||
| } | ||
| void doTrailers() override { parent_.encodeTrailers(this, *parent_.response_trailers_); } | ||
| const HeaderMapPtr& trailers() override { return parent_.response_trailers_; } | ||
|
|
||
|
|
@@ -358,12 +410,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| const Network::Connection* connection(); | ||
| void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming); | ||
| HeaderMap& addDecodedTrailers(); | ||
| MetadataMapVector& addDecodedMetadata(); | ||
| void decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream); | ||
| // Sends data through decoding filter chains. filter_iteration_start_state indicates which | ||
| // filter to start the iteration with. | ||
| void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream, | ||
| FilterIterationStartState filter_iteration_start_state); | ||
| void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers); | ||
| void decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map); | ||
| void disarmRequestTimeout(); | ||
| void maybeEndDecode(bool end_stream); | ||
| void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data, bool streaming); | ||
|
|
@@ -382,6 +436,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| void encodeTrailers(ActiveStreamEncoderFilter* filter, HeaderMap& trailers); | ||
| void encodeMetadata(ActiveStreamEncoderFilter* filter, MetadataMapPtr&& metadata_map_ptr); | ||
| void maybeEndEncode(bool end_stream); | ||
| // Returns true if new metadata is decoded. Otherwise, returns false. | ||
| bool processNewlyAddedMetadata(); | ||
| uint64_t streamId() { return stream_id_; } | ||
| // Returns true if filter has stopped iteration for all frame types. Otherwise, returns false. | ||
| // filter_streaming is the variable to indicate if stream is streaming, and its value may be | ||
|
|
@@ -400,7 +456,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; | ||
| void decodeData(Buffer::Instance& data, bool end_stream) override; | ||
| void decodeTrailers(HeaderMapPtr&& trailers) override; | ||
| void decodeMetadata(MetadataMapPtr&&) override { NOT_REACHED_GCOVR_EXCL_LINE; } | ||
| void decodeMetadata(MetadataMapPtr&&) override; | ||
|
|
||
| // Http::FilterChainFactoryCallbacks | ||
| void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override { | ||
|
|
@@ -516,6 +572,13 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| return os; | ||
| } | ||
|
|
||
| MetadataMapVector* getRequestMetadataMapVector() { | ||
| if (request_metadata_map_vector_ == nullptr) { | ||
| request_metadata_map_vector_ = std::make_unique<MetadataMapVector>(); | ||
| } | ||
| return request_metadata_map_vector_.get(); | ||
| } | ||
|
|
||
| ConnectionManagerImpl& connection_manager_; | ||
| Router::ConfigConstSharedPtr snapped_route_config_; | ||
| Router::ScopedConfigConstSharedPtr snapped_scoped_route_config_; | ||
|
|
@@ -543,6 +606,10 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
| absl::optional<Router::RouteConstSharedPtr> cached_route_; | ||
| absl::optional<Upstream::ClusterInfoConstSharedPtr> cached_cluster_info_; | ||
| std::list<DownstreamWatermarkCallbacks*> watermark_callbacks_{}; | ||
| // Stores metadata added in the decoding filter that is being processed. Will be cleared before | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I notice that we have similar storage on a per-filter basis as well as on a connection basis. Is it possible to collapse that somehow? If not can you add some clarifying comments on why we need both types of storage? It's not immediately clear.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comments and removed not used variables. |
||
| // processing the next filter. The storage is created on demand. We need to store metadata | ||
| // temporarily in the filter in case the filter has stopped all while processing headers. | ||
| std::unique_ptr<MetadataMapVector> request_metadata_map_vector_{nullptr}; | ||
| uint32_t buffer_limit_{0}; | ||
| uint32_t high_watermark_count_{0}; | ||
| const std::string* decorated_operation_{nullptr}; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.