diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 1e1bacaab3fc4..e34aeec39a944 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -302,6 +302,15 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { const absl::optional grpc_status, absl::string_view details) PURE; + /** + * Adds decoded metadata. This function can only be called in + * StreamDecoderFilter::decodeHeaders/Data/Trailers(). Do not call in + * StreamDecoderFilter::decodeMetadata(). + * + * @return a reference to metadata map vector, where new metadata map can be added. + */ + virtual MetadataMapVector& addDecodedMetadata() PURE; + /** * Called with 100-Continue headers to be encoded. * @@ -468,6 +477,22 @@ class StreamDecoderFilter : public StreamFilterBase { */ virtual FilterTrailersStatus decodeTrailers(HeaderMap& trailers) PURE; + /** + * Called with decoded metadata. Add new metadata to metadata_map directly. Do not call + * StreamDecoderFilterCallbacks::addDecodedMetadata() to add new metadata. + * + * Note: decodeMetadata() currently cannot stop the filter iteration, and always returns Continue. + * That means metadata will go through the complete filter chain at once, even if the other frame + * types return StopIteration. If metadata should not pass through all filters at once, users + * should consider using StopAllIterationAndBuffer or StopAllIterationAndWatermark in + * decodeHeaders() to prevent metadata passing to the following filters. + * + * @param metadata supplies the decoded metadata. + */ + virtual FilterMetadataStatus decodeMetadata(MetadataMap& /* metadata_map */) { + return Http::FilterMetadataStatus::Continue; + } + /** * Called by the filter manager once to initialize the filter decoder callbacks that the * filter should use. Callbacks will not be invoked by the filter after onDestroy() is called. diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 9830b98627ccc..e0a079dddf73e 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -302,6 +302,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, // filter which uses this function for buffering. ASSERT(buffered_body_ != nullptr); } + MetadataMapVector& addDecodedMetadata() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 8e905ac5aba0f..cf3e4f90ded66 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -856,6 +856,21 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); + const bool new_metadata_added = processNewlyAddedMetadata(); + + // If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream + // in headers by inserting an empty data frame with end_stream set. The empty data frame is sent + // after the new metadata. + if ((*entry)->end_stream_ && new_metadata_added && !buffered_request_data_) { + Buffer::OwnedImpl empty_data(""); + ENVOY_STREAM_LOG( + trace, "inserting an empty data frame for end_stream due metadata being added.", *this); + // Metadata frame doesn't carry end of stream bit. We need an empty data frame to end the + // stream. + addDecodedData(*((*entry).get()), empty_data, true); + } + + (*entry)->decode_headers_called_ = true; if (!(*entry)->commonHandleAfterHeadersCallback(status, decoding_headers_only_) && std::next(entry) != decoder_filters_.end()) { // Stop iteration IFF this is not the last filter. If it is the last filter, continue with @@ -981,6 +996,8 @@ void ConnectionManagerImpl::ActiveStream::decodeData( ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); + processNewlyAddedMetadata(); + if (!trailers_exists_at_start && request_trailers_ && trailers_added_entry == decoder_filters_.end()) { trailers_added_entry = entry; @@ -1038,6 +1055,10 @@ void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilt } } +MetadataMapVector& ConnectionManagerImpl::ActiveStream::addDecodedMetadata() { + return *getRequestMetadataMapVector(); +} + void ConnectionManagerImpl::ActiveStream::decodeTrailers(HeaderMapPtr&& trailers) { ScopeTrackerScopeState scope(this, connection_manager_.read_callbacks_->connection().dispatcher()); @@ -1077,6 +1098,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(ActiveStreamDecoderFilt state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers; ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); + + processNewlyAddedMetadata(); + if (!(*entry)->commonHandleAfterTrailersCallback(status)) { return; } @@ -1084,6 +1108,38 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(ActiveStreamDecoderFilt disarmRequestTimeout(); } +void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) { + resetIdleTimer(); + // After going through filters, the ownership of metadata_map will be passed to terminal filter. + // The terminal filter may encode metadata_map to the next hop immediately or store metadata_map + // and encode later when connection pool is ready. + decodeMetadata(nullptr, *metadata_map); +} + +void ConnectionManagerImpl::ActiveStream::decodeMetadata(ActiveStreamDecoderFilter* filter, + MetadataMap& metadata_map) { + // Filter iteration may start at the current filter. + std::list::iterator entry = + commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent); + + for (; entry != decoder_filters_.end(); entry++) { + // If the filter pointed by entry has stopped for all frame type, stores metadata and returns. + // If the filter pointed by entry hasn't returned from decodeHeaders, stores newly added + // metadata in case decodeHeaders returns StopAllIteration. The latter can happen when headers + // callbacks generate new metadata. + if (!(*entry)->decode_headers_called_ || (*entry)->stoppedAll()) { + Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); + (*entry)->getSavedRequestMetadata()->emplace_back(std::move(metadata_map_ptr)); + return; + } + + FilterMetadataStatus status = (*entry)->handle_->decodeMetadata(metadata_map); + ENVOY_STREAM_LOG(trace, "decode metadata called: filter={} status={}, metadata: {}", *this, + static_cast((*entry).get()), static_cast(status), + metadata_map); + } +} + void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) { ASSERT(!state_.remote_complete_); state_.remote_complete_ = end_stream; @@ -1555,6 +1611,17 @@ void ConnectionManagerImpl::ActiveStream::maybeEndEncode(bool end_stream) { } } +bool ConnectionManagerImpl::ActiveStream::processNewlyAddedMetadata() { + if (request_metadata_map_vector_ == nullptr) { + return false; + } + for (const auto& metadata_map : *getRequestMetadataMapVector()) { + decodeMetadata(nullptr, *metadata_map); + } + getRequestMetadataMapVector()->clear(); + return true; +} + bool ConnectionManagerImpl::ActiveStream::handleDataIfStopAll(ActiveStreamFilterBase& filter, Buffer::Instance& data, bool& filter_streaming) { @@ -1698,6 +1765,8 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() { doHeaders(complete() && !bufferedData() && !trailers()); } + doMetadata(); + // Make sure we handle filters returning StopIterationNoBuffer and then commonContinue by flushing // the terminal fin. const bool end_stream_with_data = complete() && !trailers(); @@ -1740,22 +1809,25 @@ bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfterHeadersCall if (status == FilterHeadersStatus::StopIteration) { iteration_state_ = IterationState::StopSingleIteration; - return false; } else if (status == FilterHeadersStatus::StopAllIterationAndBuffer) { iteration_state_ = IterationState::StopAllBuffer; - return false; } else if (status == FilterHeadersStatus::StopAllIterationAndWatermark) { iteration_state_ = IterationState::StopAllWatermark; - return false; } else if (status == FilterHeadersStatus::ContinueAndEndStream) { // Set headers_only to true so we know to end early if necessary, // but continue filter iteration so we actually write the headers/run the cleanup code. headers_only = true; ENVOY_STREAM_LOG(debug, "converting to headers only", parent_); - return true; } else { ASSERT(status == FilterHeadersStatus::Continue); headers_continued_ = true; + } + + handleMetadataAfterHeadersCallback(); + + if (stoppedAll() || status == FilterHeadersStatus::StopIteration) { + return false; + } else { return true; } } @@ -1871,6 +1943,19 @@ Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::cre return buffer; } +void ConnectionManagerImpl::ActiveStreamDecoderFilter::handleMetadataAfterHeadersCallback() { + // If we drain accumulated metadata, the iteration must start with the current filter. + const bool saved_state = iterate_from_current_filter_; + iterate_from_current_filter_ = true; + // If decodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait + // for doMetadata() to drain the metadata after iteration continues. + if (!stoppedAll() && saved_request_metadata_ != nullptr && !getSavedRequestMetadata()->empty()) { + drainSavedRequestMetadata(); + } + // Restores the original value of iterate_from_current_filter_. + iterate_from_current_filter_ = saved_state; +} + HeaderMap& ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedTrailers() { return parent_.addDecodedTrailers(); } @@ -1880,6 +1965,10 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::In parent_.addDecodedData(*this, data, streaming); } +MetadataMapVector& ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedMetadata() { + return parent_.addDecodedMetadata(); +} + void ConnectionManagerImpl::ActiveStreamDecoderFilter::injectDecodedDataToFilterChain( Buffer::Instance& data, bool end_stream) { parent_.decodeData(this, data, end_stream, diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index a7ef3ef9eae8e..0a7139a40f123 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -100,7 +100,8 @@ class ConnectionManagerImpl : Logger::Loggable, ActiveStreamFilterBase(ActiveStream& parent, bool dual_filter) : parent_(parent), iteration_state_(IterationState::Continue), iterate_from_current_filter_(false), headers_continued_(false), - continue_headers_continued_(false), end_stream_(false), dual_filter_(dual_filter) {} + continue_headers_continued_(false), end_stream_(false), dual_filter_(dual_filter), + decode_headers_called_(false) {} // Functions in the following block are called after the filter finishes processing // corresponding data. Those functions handle state updates and data storage (if needed) @@ -128,6 +129,9 @@ class ConnectionManagerImpl : Logger::Loggable, virtual void doData(bool end_stream) PURE; virtual void doTrailers() PURE; virtual const HeaderMapPtr& trailers() PURE; + virtual void doMetadata() PURE; + // TODO(soya3129): make this pure when adding impl to encodefilter. + virtual void handleMetadataAfterHeadersCallback() {} // Http::StreamFilterCallbacks const Network::Connection* connection() override; @@ -151,7 +155,25 @@ class ConnectionManagerImpl : Logger::Loggable, ASSERT(iteration_state_ != IterationState::Continue); iteration_state_ = IterationState::Continue; } + MetadataMapVector* getSavedRequestMetadata() { + if (saved_request_metadata_ == nullptr) { + saved_request_metadata_ = std::make_unique(); + } + return saved_request_metadata_.get(); + } + MetadataMapVector* getSavedResponseMetadata() { + if (saved_response_metadata_ == nullptr) { + saved_response_metadata_ = std::make_unique(); + } + return saved_response_metadata_.get(); + } + // A vector to save metadata when the current filter's [de|en]codeMetadata() can not be called, + // either because [de|en]codeHeaders() of the current filter returns StopAllIteration or because + // [de|en]codeHeaders() adds new metadata to [de|en]code, but we don't know + // [de|en]codeHeaders()'s return value yet. The storage is created on demand. + std::unique_ptr saved_request_metadata_; + std::unique_ptr saved_response_metadata_; // The state of iteration. enum class IterationState { Continue, // Iteration has not stopped for any frame type. @@ -173,6 +195,7 @@ class ConnectionManagerImpl : Logger::Loggable, // If true, end_stream is called for this filter. bool end_stream_ : 1; const bool dual_filter_ : 1; + bool decode_headers_called_ : 1; }; /** @@ -205,13 +228,29 @@ class ConnectionManagerImpl : Logger::Loggable, parent_.decodeData(this, *parent_.buffered_request_data_, end_stream, ActiveStream::FilterIterationStartState::CanStartFromCurrent); } + void doMetadata() override { + if (saved_request_metadata_ != nullptr) { + drainSavedRequestMetadata(); + } + } void doTrailers() override { parent_.decodeTrailers(this, *parent_.request_trailers_); } const HeaderMapPtr& trailers() override { return parent_.request_trailers_; } + void drainSavedRequestMetadata() { + ASSERT(saved_request_metadata_ != nullptr); + for (auto& metadata_map : *getSavedRequestMetadata()) { + parent_.decodeMetadata(this, *metadata_map); + } + getSavedRequestMetadata()->clear(); + } + // This function is called after the filter calls decodeHeaders() to drain accumulated metadata. + void handleMetadataAfterHeadersCallback() override; + // Http::StreamDecoderFilterCallbacks void addDecodedData(Buffer::Instance& data, bool streaming) override; void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override; HeaderMap& addDecodedTrailers() override; + MetadataMapVector& addDecodedMetadata() override; void continueDecoding() override; const Buffer::Instance* decodingBuffer() override { return parent_.buffered_request_data_.get(); @@ -299,6 +338,19 @@ class ConnectionManagerImpl : Logger::Loggable, parent_.encodeData(this, *parent_.buffered_response_data_, end_stream, ActiveStream::FilterIterationStartState::CanStartFromCurrent); } + void drainSavedResponseMetadata() { + ASSERT(saved_response_metadata_ != nullptr); + for (auto& metadata_map : *getSavedResponseMetadata()) { + parent_.encodeMetadata(this, std::move(metadata_map)); + } + getSavedResponseMetadata()->clear(); + } + + void doMetadata() override { + if (saved_response_metadata_ != nullptr) { + drainSavedResponseMetadata(); + } + } void doTrailers() override { parent_.encodeTrailers(this, *parent_.response_trailers_); } const HeaderMapPtr& trailers() override { return parent_.response_trailers_; } @@ -358,12 +410,14 @@ class ConnectionManagerImpl : Logger::Loggable, const Network::Connection* connection(); void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming); HeaderMap& addDecodedTrailers(); + MetadataMapVector& addDecodedMetadata(); void decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream); // Sends data through decoding filter chains. filter_iteration_start_state indicates which // filter to start the iteration with. void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream, FilterIterationStartState filter_iteration_start_state); void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers); + void decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map); void disarmRequestTimeout(); void maybeEndDecode(bool end_stream); void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data, bool streaming); @@ -382,6 +436,8 @@ class ConnectionManagerImpl : Logger::Loggable, void encodeTrailers(ActiveStreamEncoderFilter* filter, HeaderMap& trailers); void encodeMetadata(ActiveStreamEncoderFilter* filter, MetadataMapPtr&& metadata_map_ptr); void maybeEndEncode(bool end_stream); + // Returns true if new metadata is decoded. Otherwise, returns false. + bool processNewlyAddedMetadata(); uint64_t streamId() { return stream_id_; } // Returns true if filter has stopped iteration for all frame types. Otherwise, returns false. // filter_streaming is the variable to indicate if stream is streaming, and its value may be @@ -400,7 +456,7 @@ class ConnectionManagerImpl : Logger::Loggable, void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; void decodeData(Buffer::Instance& data, bool end_stream) override; void decodeTrailers(HeaderMapPtr&& trailers) override; - void decodeMetadata(MetadataMapPtr&&) override { NOT_REACHED_GCOVR_EXCL_LINE; } + void decodeMetadata(MetadataMapPtr&&) override; // Http::FilterChainFactoryCallbacks void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override { @@ -516,6 +572,13 @@ class ConnectionManagerImpl : Logger::Loggable, return os; } + MetadataMapVector* getRequestMetadataMapVector() { + if (request_metadata_map_vector_ == nullptr) { + request_metadata_map_vector_ = std::make_unique(); + } + return request_metadata_map_vector_.get(); + } + ConnectionManagerImpl& connection_manager_; Router::ConfigConstSharedPtr snapped_route_config_; Router::ScopedConfigConstSharedPtr snapped_scoped_route_config_; @@ -543,6 +606,10 @@ class ConnectionManagerImpl : Logger::Loggable, absl::optional cached_route_; absl::optional cached_cluster_info_; std::list watermark_callbacks_{}; + // Stores metadata added in the decoding filter that is being processed. Will be cleared before + // processing the next filter. The storage is created on demand. We need to store metadata + // temporarily in the filter in case the filter has stopped all while processing headers. + std::unique_ptr request_metadata_map_vector_{nullptr}; uint32_t buffer_limit_{0}; uint32_t high_watermark_count_{0}; const std::string* decorated_operation_{nullptr}; diff --git a/source/common/router/router.cc b/source/common/router/router.cc index ca379197b838f..95e686b7db77f 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -606,6 +606,13 @@ Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) { return Http::FilterTrailersStatus::StopIteration; } +Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) { + Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); + ASSERT(upstream_requests_.size() == 1); + upstream_requests_.front()->encodeMetadata(std::move(metadata_map_ptr)); + return Http::FilterMetadataStatus::Continue; +} + void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { callbacks_ = &callbacks; // As the decoder filter only pushes back via watermarks once data has reached @@ -1400,6 +1407,8 @@ void Filter::UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream buffered_request_body_->move(data); } else { + ASSERT(downstream_metadata_map_vector_.empty()); + ENVOY_STREAM_LOG(trace, "proxying {} bytes", *parent_.callbacks_, data.length()); stream_info_.addBytesSent(data.length()); request_encoder_->encodeData(data, end_stream); @@ -1417,12 +1426,27 @@ void Filter::UpstreamRequest::encodeTrailers(const Http::HeaderMap& trailers) { if (!request_encoder_) { ENVOY_STREAM_LOG(trace, "buffering trailers", *parent_.callbacks_); } else { + ASSERT(downstream_metadata_map_vector_.empty()); + ENVOY_STREAM_LOG(trace, "proxying trailers", *parent_.callbacks_); request_encoder_->encodeTrailers(trailers); upstream_timing_.onLastUpstreamTxByteSent(parent_.callbacks_->dispatcher().timeSource()); } } +void Filter::UpstreamRequest::encodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) { + if (!request_encoder_) { + ENVOY_STREAM_LOG(trace, "request_encoder_ not ready. Store metadata_map to encode later: {}", + *parent_.callbacks_, *metadata_map_ptr); + downstream_metadata_map_vector_.emplace_back(std::move(metadata_map_ptr)); + } else { + ENVOY_STREAM_LOG(trace, "Encode metadata: {}", *parent_.callbacks_, *metadata_map_ptr); + Http::MetadataMapVector metadata_map_vector; + metadata_map_vector.emplace_back(std::move(metadata_map_ptr)); + request_encoder_->encodeMetadata(metadata_map_vector); + } +} + void Filter::UpstreamRequest::onResetStream(Http::StreamResetReason reason, absl::string_view transport_failure_reason) { clearRequestEncoder(); @@ -1526,8 +1550,13 @@ void Filter::UpstreamRequest::onPoolReady(Http::StreamEncoder& request_encoder, } upstream_timing_.onFirstUpstreamTxByteSent(parent_.callbacks_->dispatcher().timeSource()); + + const bool end_stream = !buffered_request_body_ && encode_complete_ && !encode_trailers_; + // If end_stream is set in headers, and there are metadata to send, delays end_stream. The case + // only happens when decoding headers filters return ContinueAndEndStream. + const bool delay_headers_end_stream = end_stream && !downstream_metadata_map_vector_.empty(); request_encoder.encodeHeaders(*parent_.downstream_headers_, - !buffered_request_body_ && encode_complete_ && !encode_trailers_); + end_stream && !delay_headers_end_stream); calling_encode_headers_ = false; // It is possible to get reset in the middle of an encodeHeaders() call. This happens for example @@ -1538,6 +1567,18 @@ void Filter::UpstreamRequest::onPoolReady(Http::StreamEncoder& request_encoder, if (deferred_reset_reason_) { onResetStream(deferred_reset_reason_.value(), absl::string_view()); } else { + // Encode metadata after headers and before any other frame type. + if (!downstream_metadata_map_vector_.empty()) { + ENVOY_STREAM_LOG(debug, "Send metadata onPoolReady. {}", *parent_.callbacks_, + downstream_metadata_map_vector_); + request_encoder.encodeMetadata(downstream_metadata_map_vector_); + downstream_metadata_map_vector_.clear(); + if (delay_headers_end_stream) { + Buffer::OwnedImpl empty_data(""); + request_encoder.encodeData(empty_data, true); + } + } + if (buffered_request_body_) { stream_info_.addBytesSent(buffered_request_body_->length()); request_encoder.encodeData(*buffered_request_body_, encode_complete_ && !encode_trailers_); diff --git a/source/common/router/router.h b/source/common/router/router.h index 282d41997b376..2767db0a29216 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -243,6 +243,7 @@ class Filter : Logger::Loggable, Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override; Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override; + Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override; void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; // Upstream::LoadBalancerContext @@ -361,6 +362,7 @@ class Filter : Logger::Loggable, void encodeHeaders(bool end_stream); void encodeData(Buffer::Instance& data, bool end_stream); void encodeTrailers(const Http::HeaderMap& trailers); + void encodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr); void resetStream(); void setupPerTryTimeout(); @@ -458,6 +460,7 @@ class Filter : Logger::Loggable, // access logging is configured. Http::HeaderMapPtr upstream_headers_; Http::HeaderMapPtr upstream_trailers_; + Http::MetadataMapVector downstream_metadata_map_vector_; bool calling_encode_headers_ : 1; bool upstream_canary_ : 1; @@ -523,6 +526,7 @@ class Filter : Logger::Loggable, // for the remaining upstream requests to return. void resetOtherUpstreams(UpstreamRequest& upstream_request); void sendNoHealthyUpstreamResponse(); + // TODO(soya3129): Save metadata for retry, redirect and shadowing case. bool setupRetry(); bool setupRedirect(const Http::HeaderMap& headers, UpstreamRequest& upstream_request); void updateOutlierDetection(Upstream::Outlier::Result result, UpstreamRequest& upstream_request, diff --git a/source/docs/h2_metadata.md b/source/docs/h2_metadata.md index e986ff27a2111..81fce3daac7aa 100644 --- a/source/docs/h2_metadata.md +++ b/source/docs/h2_metadata.md @@ -22,10 +22,9 @@ stream flag. Because metadata frames must be associated with an existing frame, ensure metadata frames to be received before the end of stream is received by the peer. -Metadata associated with a response can be sent before response headers, after response headers, -between response data or after response data. If metadata frames have to be sent last, +Metadata associated with a stream can be sent before headers, after headers, +between data or after data. If metadata frames have to be sent last, users must put the end of stream in an empty data frame and send the empty data frame after metadata frames. -TODO(soya3129): add more conditions for metadata in requests. Envoy only allows up to 1M metadata to be sent per stream. If the accumulated metadata size exceeds the limit, the stream will be reset. @@ -36,8 +35,6 @@ Envoy provides the functionality to proxy, process and add metadata. ## Proxying metadata -(To be implemented) - If not specified, all the metadata received by Envoy is proxied to the next hop unmodified. Note that, we do not guarantee the same frame order will be preserved from hop by hop. That is, metadata from upstream at the beginning of a stream can be @@ -45,8 +42,6 @@ received by the downstream at the end of the stream. ## Consuming metadata -(To be implemented) - If Envoy needs to take actions when a metadata frame is received, users should create a new filter. @@ -74,17 +69,20 @@ If the metadata is left in the map, it will be passed to the next hop. ## Inserting metadata -(To be implemented) - Envoy filters can be used to add new metadata to a stream. If users need to add new metadata for a request from downstream to upstream, a StreamDecoderFilter should be created. The StreamDecoderFilterCallbacks object that Envoy passes to the StreamDecoderFilter has an interface MetadataMapVector& StreamDecoderFilterCallbacks::addDecodedMetadata(). By calling the interface, -users get a reference to the metadata map associated with the request stream. Users can -insert new metadata to the metadata map, and Envoy will proxy the new metadata -map to the upstream. +users get a reference to a vector of metadata map associated with the request stream. Users can +insert new metadata map to the metadata map vector, and Envoy will proxy the new metadata +map to the upstream. StreamDecoderFilterCallbacks::addDecodedMetadata() can be called in +StreamDecoderFilter::decodeHeaders(), StreamDecoderFilter::decodeData() and +StreamDecoderFilter::decodeTrailers(). Do not call +StreamDecoderFilterCallbacks::addDecodedMetadata() in +StreamDecoderFilter::decodeMetadata(MetadataMap metadata\_map). New metadata can +be added directly to metadata\_map. If users need to add new metadata for a response to downstream, a StreamFilter should be created. Users pass the metadata to be added to @@ -171,3 +169,37 @@ ConnectionManagerImpl::ActiveStream::encodeMetadata(ActiveStreamEncoderFilter\* to go through all the encoding filters. Or new metadata can be added to metadata\_map in StreamFilter::encodeMetadata(MetadataMap& metadata\_map) directly. + +## Request metadata handling + +We first explain how request metadata get consumed or proxied. +In function EnvoyConnectionManagerImpl::ActiveStream::decodeMetadata(ActiveStreamDecoderFilter\* filter, +MetadataMap& metadata\_map), Envoy passes request metadata received from downstream to filters by +calling the following filter interface: + +FilterMetadatasStatus StreamDecoderFilter::decodeMetadata(MetadataMap& metadata\_map). + +Filters, by implementing the interface, can consume or modify request metadata. If no filter +touches the metadata, it is proxied to upstream unchanged. + +The last filter in the filter chain is router filter. The router filter calls +Filter::request\_encoder\_-\>encodeMetadata(const MetadataMapVector& metadata\_map\_vector) to pass +the metadata to codec, and codec encodes and forwards the metadata to the upstream. If the connection +to the upstream has not been established when metadata is received, the metadata is temporarily stored in +Filter::downstream\_metadata\_map\_vector\_. When the connection is ready +(Filter::UpstreamRequest::onPoolReady()), the metadata is then passed to codec, and forwarded to +the upstream. + +Envoy can also add new request metadata through filters's decoding interfaces (See section +[Inserting metadata](#inserting-metadata) for detailed interfaces). Filters can add new +metadata to ActiveStream::request\_metadata\_map\_vector\_ by calling +StreamDecoderFilterCallbacks::addDecodedMetadata(). After calling each filter's decoding function, +Envoy checks if new metadata is added to ActiveStream::request\_metadata\_map\_vector\_. If so, +then Envoy calls ConnectionManagerImpl::ActiveStream::decodeMetadata(ActiveStreamEncoderFilter\* filter, +MetadataMapPtr&& metadata\_map) to go through all the filters. + +Note that, because metadata frames do not carry end\_stream, if new metadata is added to a headers +only request, Envoy moves end\_stream from headers to an empty data frame which is sent after the new +metadata. In addition, Envoy drains metadata in router filter before any other types of +frames except headers to make sure end\_stream is handled correctly. + diff --git a/source/extensions/filters/http/common/pass_through_filter.h b/source/extensions/filters/http/common/pass_through_filter.h index 2527e05bde352..30d3e4e44d82d 100644 --- a/source/extensions/filters/http/common/pass_through_filter.h +++ b/source/extensions/filters/http/common/pass_through_filter.h @@ -18,7 +18,6 @@ class PassThroughDecoderFilter : public virtual StreamDecoderFilter { Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override { return Http::FilterDataStatus::Continue; } - Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override { return Http::FilterTrailersStatus::Continue; } diff --git a/source/extensions/filters/http/csrf/csrf_filter.h b/source/extensions/filters/http/csrf/csrf_filter.h index da62a78008734..57def213db6ef 100644 --- a/source/extensions/filters/http/csrf/csrf_filter.h +++ b/source/extensions/filters/http/csrf/csrf_filter.h @@ -96,13 +96,13 @@ class CsrfFilter : public Http::StreamDecoderFilter { Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override; Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override { return Http::FilterDataStatus::Continue; - }; + } Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override { return Http::FilterTrailersStatus::Continue; - }; + } void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { callbacks_ = &callbacks; - }; + } private: void determinePolicy(); diff --git a/test/extensions/filters/http/buffer/buffer_filter_test.cc b/test/extensions/filters/http/buffer/buffer_filter_test.cc index 34c1bd05298b6..4af97078d5c71 100644 --- a/test/extensions/filters/http/buffer/buffer_filter_test.cc +++ b/test/extensions/filters/http/buffer/buffer_filter_test.cc @@ -59,6 +59,11 @@ TEST_F(BufferFilterTest, HeaderOnlyRequest) { EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(headers, true)); } +TEST_F(BufferFilterTest, TestMetadata) { + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); +} + TEST_F(BufferFilterTest, RequestWithData) { InSequence s; diff --git a/test/extensions/filters/http/cors/cors_filter_test.cc b/test/extensions/filters/http/cors/cors_filter_test.cc index 1558eb3462abd..d56cb8ac6a4aa 100644 --- a/test/extensions/filters/http/cors/cors_filter_test.cc +++ b/test/extensions/filters/http/cors/cors_filter_test.cc @@ -70,6 +70,8 @@ TEST_F(CorsFilterTest, RequestWithoutOrigin) { EXPECT_EQ(0, stats_.counter("test.cors.origin_valid").value()); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.decodeData(data_, false)); EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(request_headers_)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.encodeHeaders(request_headers_, false)); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(data_, false)); diff --git a/test/extensions/filters/http/csrf/csrf_filter_test.cc b/test/extensions/filters/http/csrf/csrf_filter_test.cc index 55def661cb194..99835d935c1a9 100644 --- a/test/extensions/filters/http/csrf/csrf_filter_test.cc +++ b/test/extensions/filters/http/csrf/csrf_filter_test.cc @@ -102,6 +102,8 @@ TEST_F(CsrfFilterTest, RequestWithNonMutableMethod) { EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(request_headers, false)); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.decodeData(data_, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(request_headers_)); EXPECT_EQ(0U, config_->stats().missing_source_origin_.value()); diff --git a/test/extensions/filters/http/dynamo/dynamo_filter_test.cc b/test/extensions/filters/http/dynamo/dynamo_filter_test.cc index df5c19b318cb6..f7021a9b05e95 100644 --- a/test/extensions/filters/http/dynamo/dynamo_filter_test.cc +++ b/test/extensions/filters/http/dynamo/dynamo_filter_test.cc @@ -58,14 +58,14 @@ TEST_F(DynamoFilterTest, operatorPresent) { EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers, true)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); Http::TestHeaderMapImpl continue_headers{{":status", "100"}}; EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(continue_headers)); - Http::MetadataMap metadata_map{{"metadata", "metadata"}}; - EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); - Http::TestHeaderMapImpl response_headers{{":status", "200"}}; EXPECT_CALL(stats_, counter("prefix.dynamodb.operation_missing")).Times(0); EXPECT_CALL(stats_, counter("prefix.dynamodb.table_missing")); diff --git a/test/extensions/filters/http/ext_authz/ext_authz_test.cc b/test/extensions/filters/http/ext_authz/ext_authz_test.cc index f2f07af5f8fff..76c3ca3759a95 100644 --- a/test/extensions/filters/http/ext_authz/ext_authz_test.cc +++ b/test/extensions/filters/http/ext_authz/ext_authz_test.cc @@ -806,6 +806,8 @@ TEST_F(HttpFilterTestParam, ContextExtensions) { // Engage the filter so that check is called. filter_->decodeHeaders(request_headers_, false); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); // Make sure that the extensions appear in the check request issued by the filter. EXPECT_EQ("value_vhost", check_request.attributes().context_extensions().at("key_vhost")); diff --git a/test/extensions/filters/http/fault/fault_filter_test.cc b/test/extensions/filters/http/fault/fault_filter_test.cc index a22a9819a7f74..f6181e4676752 100644 --- a/test/extensions/filters/http/fault/fault_filter_test.cc +++ b/test/extensions/filters/http/fault/fault_filter_test.cc @@ -299,6 +299,8 @@ TEST_F(FaultFilterTest, AbortWithHttpStatus) { EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); EXPECT_EQ(1UL, config_->stats().active_faults_.value()); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data_, false)); EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_headers_)); diff --git a/test/extensions/filters/http/grpc_http1_bridge/http1_bridge_filter_test.cc b/test/extensions/filters/http/grpc_http1_bridge/http1_bridge_filter_test.cc index d7c40ab869917..aedebdfa6f0be 100644 --- a/test/extensions/filters/http/grpc_http1_bridge/http1_bridge_filter_test.cc +++ b/test/extensions/filters/http/grpc_http1_bridge/http1_bridge_filter_test.cc @@ -52,6 +52,8 @@ TEST_F(GrpcHttp1BridgeFilterTest, NoRoute) { {":path", "/lyft.users.BadCompanions/GetBadCompanions"}}; EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(request_headers, true)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); Http::TestHeaderMapImpl response_headers{{":status", "404"}}; } diff --git a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc index 3fbb89ac8bd21..437bc9d373b4b 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc @@ -335,6 +335,8 @@ TEST_F(GrpcJsonTranscoderFilterTest, NoTranscoding) { EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(request_headers, false)); EXPECT_EQ(expected_request_headers, request_headers); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); Buffer::OwnedImpl request_data{"{}"}; EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.decodeData(request_data, false)); diff --git a/test/extensions/filters/http/grpc_web/grpc_web_filter_test.cc b/test/extensions/filters/http/grpc_web/grpc_web_filter_test.cc index 1500708fe3c6c..e0a501f48d697 100644 --- a/test/extensions/filters/http/grpc_web/grpc_web_filter_test.cc +++ b/test/extensions/filters/http/grpc_web/grpc_web_filter_test.cc @@ -123,6 +123,8 @@ TEST_F(GrpcWebFilterTest, SupportedContentTypes) { Http::TestHeaderMapImpl request_headers; request_headers.addCopy(Http::Headers::get().ContentType, content_type); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(request_headers, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); EXPECT_EQ(Http::Headers::get().ContentTypeValues.Grpc, request_headers.ContentType()->value().getStringView()); } diff --git a/test/extensions/filters/http/gzip/gzip_filter_test.cc b/test/extensions/filters/http/gzip/gzip_filter_test.cc index 6eb74308e67d3..a45e7760a2229 100644 --- a/test/extensions/filters/http/gzip/gzip_filter_test.cc +++ b/test/extensions/filters/http/gzip/gzip_filter_test.cc @@ -226,6 +226,8 @@ TEST_F(GzipFilterTest, AvailableCombinationCompressionStrategyAndLevelConfig) { // Acceptance Testing with default configuration. TEST_F(GzipFilterTest, AcceptanceGzipEncoding) { doRequest({{":method", "get"}, {"accept-encoding", "deflate, gzip"}}, false); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); Buffer::OwnedImpl data("hello"); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data, false)); Http::TestHeaderMapImpl trailers; diff --git a/test/extensions/filters/http/header_to_metadata/header_to_metadata_filter_test.cc b/test/extensions/filters/http/header_to_metadata/header_to_metadata_filter_test.cc index 5908af6e63610..1cfa6acfe73f4 100644 --- a/test/extensions/filters/http/header_to_metadata/header_to_metadata_filter_test.cc +++ b/test/extensions/filters/http/header_to_metadata/header_to_metadata_filter_test.cc @@ -79,6 +79,8 @@ TEST_F(HeaderToMetadataTest, BasicRequestTest) { EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(req_info_)); EXPECT_CALL(req_info_, setDynamicMetadata("envoy.lb", MapEq(expected))); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(incoming_headers, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); Buffer::OwnedImpl data("data"); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data, false)); EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(incoming_headers)); diff --git a/test/extensions/filters/http/health_check/health_check_test.cc b/test/extensions/filters/http/health_check/health_check_test.cc index 03ead8af99cd8..6a2d4b5de94bc 100644 --- a/test/extensions/filters/http/health_check/health_check_test.cc +++ b/test/extensions/filters/http/health_check/health_check_test.cc @@ -99,6 +99,8 @@ TEST_F(HealthCheckFilterNoPassThroughTest, OkOrFailed) { EXPECT_CALL(callbacks_.active_span_, setSampled(false)); EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); } TEST_F(HealthCheckFilterNoPassThroughTest, NotHcRequest) { diff --git a/test/extensions/filters/http/jwt_authn/filter_test.cc b/test/extensions/filters/http/jwt_authn/filter_test.cc index d408599540eed..3a65d74c3bcc8 100644 --- a/test/extensions/filters/http/jwt_authn/filter_test.cc +++ b/test/extensions/filters/http/jwt_authn/filter_test.cc @@ -70,6 +70,8 @@ TEST_F(FilterTest, InlineOK) { auto headers = Http::TestHeaderMapImpl{}; EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); EXPECT_EQ(1U, mock_config_->stats().allowed_.value()); Buffer::OwnedImpl data(""); diff --git a/test/extensions/filters/http/lua/lua_filter_test.cc b/test/extensions/filters/http/lua/lua_filter_test.cc index e24b267b7823a..0fd1186bed00b 100644 --- a/test/extensions/filters/http/lua/lua_filter_test.cc +++ b/test/extensions/filters/http/lua/lua_filter_test.cc @@ -250,6 +250,8 @@ TEST_F(LuaHttpFilterTest, ScriptBodyChunksRequestBody) { Http::TestHeaderMapImpl request_headers{{":path", "/"}}; EXPECT_CALL(*filter_, scriptLog(spdlog::level::trace, StrEq("/"))); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); Buffer::OwnedImpl data("hello"); EXPECT_CALL(*filter_, scriptLog(spdlog::level::trace, StrEq("5"))); diff --git a/test/extensions/filters/http/ratelimit/ratelimit_test.cc b/test/extensions/filters/http/ratelimit/ratelimit_test.cc index ed1b4afc135de..c956709a2a547 100644 --- a/test/extensions/filters/http/ratelimit/ratelimit_test.cc +++ b/test/extensions/filters/http/ratelimit/ratelimit_test.cc @@ -198,6 +198,8 @@ TEST_F(HttpRateLimitFilterTest, OkResponse) { request_headers_.addCopy(Http::Headers::get().RequestId, "requestid"); EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false)); EXPECT_EQ(Http::FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_headers_)); EXPECT_EQ(Http::FilterHeadersStatus::Continue, diff --git a/test/extensions/filters/http/rbac/rbac_filter_test.cc b/test/extensions/filters/http/rbac/rbac_filter_test.cc index c7ab01f931099..62967602dfc6b 100644 --- a/test/extensions/filters/http/rbac/rbac_filter_test.cc +++ b/test/extensions/filters/http/rbac/rbac_filter_test.cc @@ -88,6 +88,8 @@ TEST_F(RoleBasedAccessControlFilterTest, Allowed) { setDestinationPort(123); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(headers_, false)); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); EXPECT_EQ(1U, config_->stats().allowed_.value()); EXPECT_EQ(1U, config_->stats().shadow_denied_.value()); diff --git a/test/extensions/filters/http/squash/squash_filter_test.cc b/test/extensions/filters/http/squash/squash_filter_test.cc index 03c19f42d95ba..534b59b87a740 100644 --- a/test/extensions/filters/http/squash/squash_filter_test.cc +++ b/test/extensions/filters/http/squash/squash_filter_test.cc @@ -209,6 +209,8 @@ class SquashFilterTest : public testing::Test { void doDownstreamRequest() { startDownstreamRequest(); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); Envoy::Http::TestHeaderMapImpl trailers{}; // Complete a full request cycle Envoy::Buffer::OwnedImpl buffer("nothing here"); diff --git a/test/integration/BUILD b/test/integration/BUILD index ebd2696a39ac0..fd2ae31225f69 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -211,6 +211,9 @@ envoy_cc_test( "//source/extensions/filters/http/buffer:config", "//source/extensions/filters/http/dynamo:config", "//source/extensions/filters/http/health_check:config", + "//test/integration/filters:metadata_stop_all_filter_config_lib", + "//test/integration/filters:request_metadata_filter_config_lib", + "//test/integration/filters:response_metadata_filter_config_lib", "//test/integration/filters:stop_iteration_and_continue", "//test/mocks/http:http_mocks", "//test/mocks/upstream:upstream_mocks", @@ -317,7 +320,6 @@ envoy_cc_test_library( "//test/integration/filters:modify_buffer_filter_config_lib", "//test/integration/filters:passthrough_filter_config_lib", "//test/integration/filters:pause_filter_lib", - "//test/integration/filters:response_metadata_filter_config_lib", "//test/test_common:registry_lib", ], ) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 546f159fa8951..8cca9265525e3 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -51,6 +51,7 @@ void FakeStream::decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { } void FakeStream::decodeData(Buffer::Instance& data, bool end_stream) { + received_data_ = true; Thread::LockGuard lock(lock_); body_.add(data); setEndStream(end_stream); @@ -64,6 +65,13 @@ void FakeStream::decodeTrailers(Http::HeaderMapPtr&& trailers) { decoder_event_.notifyOne(); } +void FakeStream::decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) { + for (const auto& metadata : *metadata_map_ptr) { + duplicated_metadata_key_count_[metadata.first]++; + metadata_map_.insert(metadata); + } +} + void FakeStream::encode100ContinueHeaders(const Http::HeaderMapImpl& headers) { std::shared_ptr headers_copy( new Http::HeaderMapImpl(static_cast(headers))); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 38d66690e41e5..7ea255edf4f93 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -62,6 +62,7 @@ class FakeStream : public Http::StreamDecoder, const Http::HeaderMap& headers() { return *headers_; } void setAddServedByHeader(bool add_header) { add_served_by_header_ = add_header; } const Http::HeaderMapPtr& trailers() { return trailers_; } + bool receivedData() { return received_data_; } ABSL_MUST_USE_RESULT testing::AssertionResult @@ -150,7 +151,7 @@ class FakeStream : public Http::StreamDecoder, void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; void decodeData(Buffer::Instance& data, bool end_stream) override; void decodeTrailers(Http::HeaderMapPtr&& trailers) override; - void decodeMetadata(Http::MetadataMapPtr&&) override {} + void decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) override; // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason, @@ -162,6 +163,11 @@ class FakeStream : public Http::StreamDecoder, Event::TestTimeSystem& timeSystem() { return time_system_; } + Http::MetadataMap& metadata_map() { return metadata_map_; } + std::unordered_map& duplicated_metadata_key_count() { + return duplicated_metadata_key_count_; + } + protected: Http::HeaderMapPtr headers_; @@ -178,6 +184,9 @@ class FakeStream : public Http::StreamDecoder, std::vector decoded_grpc_frames_; bool add_served_by_header_{}; Event::TestTimeSystem& time_system_; + Http::MetadataMap metadata_map_; + std::unordered_map duplicated_metadata_key_count_; + bool received_data_{false}; }; using FakeStreamPtr = std::unique_ptr; diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 7e2647880eb7b..b06b673294d6e 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -155,6 +155,20 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "request_metadata_filter_config_lib", + srcs = [ + "request_metadata_filter.cc", + ], + deps = [ + "//include/envoy/http:filter_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:empty_http_filter_config_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + ], +) + envoy_cc_test_library( name = "random_pause_filter_lib", srcs = [ @@ -247,3 +261,19 @@ envoy_cc_test_library( "//source/common/network:connection_lib", ], ) + +envoy_cc_test_library( + name = "metadata_stop_all_filter_config_lib", + srcs = [ + "metadata_stop_all_filter.cc", + ], + deps = [ + ":common_lib", + "//include/envoy/event:timer_interface", + "//include/envoy/http:filter_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:empty_http_filter_config_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + ], +) diff --git a/test/integration/filters/metadata_stop_all_filter.cc b/test/integration/filters/metadata_stop_all_filter.cc new file mode 100644 index 0000000000000..3ff6b7983d010 --- /dev/null +++ b/test/integration/filters/metadata_stop_all_filter.cc @@ -0,0 +1,74 @@ +#include +#include + +#include "envoy/event/timer.h" +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "common/buffer/buffer_impl.h" + +#include "extensions/filters/http/common/empty_http_filter_config.h" +#include "extensions/filters/http/common/pass_through_filter.h" + +#include "test/integration/filters/common.h" + +#include "gtest/gtest.h" + +namespace Envoy { + +class MetadataStopAllFilter : public Http::PassThroughFilter { +public: + constexpr static char name[] = "metadata-stop-all-filter"; + + Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& header_map, bool) override { + Http::HeaderEntry* entry_content = header_map.get(Envoy::Http::LowerCaseString("content_size")); + ASSERT(entry_content != nullptr); + content_size_ = std::stoul(std::string(entry_content->value().getStringView())); + + createTimerForContinue(); + + return Http::FilterHeadersStatus::StopAllIterationAndBuffer; + } + + Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override { + ASSERT(timer_triggered_); + return Http::FilterDataStatus::Continue; + } + + Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override { + ASSERT(timer_triggered_); + return Http::FilterTrailersStatus::Continue; + } + + Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap&) override { + ASSERT(timer_triggered_); + return Http::FilterMetadataStatus::Continue; + } + +private: + // Creates a timer to continue iteration after conditions meet. + void createTimerForContinue() { + delay_timer_ = decoder_callbacks_->dispatcher().createTimer([this]() -> void { + if (content_size_ > 0 && decoder_callbacks_->streamInfo().bytesReceived() >= content_size_) { + timer_triggered_ = true; + decoder_callbacks_->continueDecoding(); + } else { + // Creates a new timer to try again later. + createTimerForContinue(); + } + }); + delay_timer_->enableTimer(std::chrono::milliseconds(50)); + } + + Event::TimerPtr delay_timer_; + bool timer_triggered_ = false; + size_t content_size_ = 0; +}; + +constexpr char MetadataStopAllFilter::name[]; +static Registry::RegisterFactory, + Server::Configuration::NamedHttpFilterConfigFactory> + register_; + +} // namespace Envoy diff --git a/test/integration/filters/request_metadata_filter.cc b/test/integration/filters/request_metadata_filter.cc new file mode 100644 index 0000000000000..19c6425bdc3a0 --- /dev/null +++ b/test/integration/filters/request_metadata_filter.cc @@ -0,0 +1,65 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "extensions/filters/http/common/empty_http_filter_config.h" +#include "extensions/filters/http/common/pass_through_filter.h" + +namespace Envoy { +// A filter tests request metadata consuming and inserting. The filter inserts new +// metadata when decodeHeaders/Data/Trailers() are called. If the received metadata with key +// "consume", the metadata will be consumed and not forwarded to the next hop. +class RequestMetadataStreamFilter : public Http::PassThroughFilter { +public: + Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override { + Http::MetadataMap metadata_map = {{"headers", "headers"}}; + Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); + decoder_callbacks_->addDecodedMetadata().emplace_back(std::move(metadata_map_ptr)); + return Http::FilterHeadersStatus::Continue; + } + + Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override { + Http::MetadataMap metadata_map = {{"data", "data"}}; + Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); + decoder_callbacks_->addDecodedMetadata().emplace_back(std::move(metadata_map_ptr)); + return Http::FilterDataStatus::Continue; + } + + Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override { + Http::MetadataMap metadata_map = {{"trailers", "trailers"}}; + Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); + decoder_callbacks_->addDecodedMetadata().emplace_back(std::move(metadata_map_ptr)); + return Http::FilterTrailersStatus::Continue; + } + + // If metadata_map contains key "consume", consumes the metadata, and replace it with a new one. + // The function also adds a new metadata using addDecodedMetadata(). + Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override { + auto it = metadata_map.find("consume"); + if (it != metadata_map.end()) { + metadata_map.erase("consume"); + metadata_map.emplace("replace", "replace"); + } + metadata_map["metadata"] = "metadata"; + return Http::FilterMetadataStatus::Continue; + } +}; + +class AddRequestMetadataStreamFilterConfig + : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { +public: + AddRequestMetadataStreamFilterConfig() : EmptyHttpFilterConfig("request-metadata-filter") {} + Http::FilterFactoryCb createFilter(const std::string&, Server::Configuration::FactoryContext&) { + return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared<::Envoy::RequestMetadataStreamFilter>()); + }; + } +}; + +// perform static registration +static Registry::RegisterFactory + register_; +} // namespace Envoy diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index cddc0a62530c0..22929816f97f3 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -265,7 +265,6 @@ void verifyExpectedMetadata(Http::MetadataMap metadata_map, std::setcomplete()); } +// Verifies small metadata can be sent at different locations of a request. +TEST_P(Http2MetadataIntegrationTest, ProxySmallMetadataInRequest) { + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + Http::MetadataMap metadata_map = {{"key", "value"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 1, false); + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 1, false); + codec_client_->sendMetadata(*request_encoder_, metadata_map); + Http::TestHeaderMapImpl request_trailers{{"request", "trailer"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + waitForNextUpstreamRequest(); + + // Verifies metadata is received by upstream. + upstream_request_->encodeHeaders(default_response_headers_, true); + EXPECT_EQ(upstream_request_->metadata_map().find("key")->second, "value"); + EXPECT_EQ(upstream_request_->metadata_map().size(), 1); + EXPECT_EQ(upstream_request_->duplicated_metadata_key_count().find("key")->second, 3); + + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); +} + +// Verifies large metadata can be sent at different locations of a request. +TEST_P(Http2MetadataIntegrationTest, ProxyLargeMetadataInRequest) { + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + std::string value = std::string(80 * 1024, '1'); + Http::MetadataMap metadata_map = {{"key", value}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 1, false); + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 1, false); + codec_client_->sendMetadata(*request_encoder_, metadata_map); + Http::TestHeaderMapImpl request_trailers{{"request", "trailer"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + waitForNextUpstreamRequest(); + + // Verifies metadata is received upstream. + upstream_request_->encodeHeaders(default_response_headers_, true); + EXPECT_EQ(upstream_request_->metadata_map().find("key")->second, value); + EXPECT_EQ(upstream_request_->metadata_map().size(), 1); + EXPECT_EQ(upstream_request_->duplicated_metadata_key_count().find("key")->second, 3); + + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); +} + +static std::string request_metadata_filter = R"EOF( +name: request-metadata-filter +config: {} +)EOF"; + +TEST_P(Http2MetadataIntegrationTest, ConsumeAndInsertRequestMetadata) { + addFilters({request_metadata_filter}); + config_helper_.addConfigModifier( + [&](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) + -> void { hcm.set_proxy_100_continue(true); }); + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends a headers only request. + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + // Verifies a headers metadata added. + std::set expected_metadata_keys = {"headers"}; + expected_metadata_keys.insert("metadata"); + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + + // Sends a headers only request with metadata. An empty data frame carries end_stream. + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder.first; + response = std::move(encoder_decoder.second); + Http::MetadataMap metadata_map = {{"consume", "consume"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 0, true); + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + expected_metadata_keys.insert("data"); + expected_metadata_keys.insert("metadata"); + expected_metadata_keys.insert("replace"); + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + EXPECT_EQ(upstream_request_->duplicated_metadata_key_count().find("metadata")->second, 3); + // Verifies zero length data received, and end_stream is true. + EXPECT_EQ(true, upstream_request_->receivedData()); + EXPECT_EQ(0, upstream_request_->bodyLength()); + EXPECT_EQ(true, upstream_request_->complete()); + + // Sends headers, data, metadata and trailer. + auto encoder_decoder_2 = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder_2.first; + response = std::move(encoder_decoder_2.second); + codec_client_->sendData(*request_encoder_, 10, false); + metadata_map = {{"consume", "consume"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + Http::TestHeaderMapImpl request_trailers{{"trailer", "trailer"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + expected_metadata_keys.insert("trailers"); + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + EXPECT_EQ(upstream_request_->duplicated_metadata_key_count().find("metadata")->second, 4); + + // Sends headers, large data, metadata. Large data triggers decodeData() multiple times, and each + // time, a "data" metadata is added. + auto encoder_decoder_3 = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder_3.first; + response = std::move(encoder_decoder_3.second); + codec_client_->sendData(*request_encoder_, 100000, false); + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 100000, true); + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + + expected_metadata_keys.erase("trailers"); + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + EXPECT_GE(upstream_request_->duplicated_metadata_key_count().find("data")->second, 2); + EXPECT_GE(upstream_request_->duplicated_metadata_key_count().find("metadata")->second, 3); + + // Sends multiple metadata. + auto encoder_decoder_4 = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder_4.first; + response = std::move(encoder_decoder_4.second); + metadata_map = {{"metadata1", "metadata1"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, 10, false); + metadata_map = {{"metadata2", "metadata2"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + metadata_map = {{"consume", "consume"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendTrailers(*request_encoder_, request_trailers); + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + expected_metadata_keys.insert("metadata1"); + expected_metadata_keys.insert("metadata2"); + expected_metadata_keys.insert("trailers"); + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + EXPECT_EQ(upstream_request_->duplicated_metadata_key_count().find("metadata")->second, 6); +} + +static std::string decode_headers_only = R"EOF( +name: decode-headers-only +config: {} +)EOF"; + +void Http2MetadataIntegrationTest::runHeaderOnlyTest(bool send_request_body, size_t body_size) { + config_helper_.addConfigModifier( + [&](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) + -> void { hcm.set_proxy_100_continue(true); }); + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends a request with body. Only headers will pass through filters. + IntegrationStreamDecoderPtr response; + if (send_request_body) { + response = + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + body_size); + } else { + response = + codec_client_->makeHeaderOnlyRequest(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}); + } + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); +} + +void Http2MetadataIntegrationTest::verifyHeadersOnlyTest() { + // Verifies a headers metadata added. + std::set expected_metadata_keys = {"headers"}; + expected_metadata_keys.insert("metadata"); + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + + // Verifies zero length data received, and end_stream is true. + EXPECT_EQ(true, upstream_request_->receivedData()); + EXPECT_EQ(0, upstream_request_->bodyLength()); + EXPECT_EQ(true, upstream_request_->complete()); +} + +TEST_P(Http2MetadataIntegrationTest, DecodingHeadersOnlyRequestWithRequestMetadataEmptyData) { + addFilters({request_metadata_filter, decode_headers_only}); + + // Send a request with body, and body size is 0. + runHeaderOnlyTest(true, 0); + verifyHeadersOnlyTest(); +} + +TEST_P(Http2MetadataIntegrationTest, DecodingHeadersOnlyRequestWithRequestMetadataNoneEmptyData) { + addFilters({request_metadata_filter, decode_headers_only}); + // Send a request with body, and body size is 128. + runHeaderOnlyTest(true, 128); + verifyHeadersOnlyTest(); +} + +TEST_P(Http2MetadataIntegrationTest, DecodingHeadersOnlyRequestWithRequestMetadataDiffFilterOrder) { + addFilters({decode_headers_only, request_metadata_filter}); + // Send a request with body, and body size is 128. + runHeaderOnlyTest(true, 128); + verifyHeadersOnlyTest(); +} + +TEST_P(Http2MetadataIntegrationTest, HeadersOnlyRequestWithRequestMetadata) { + addFilters({request_metadata_filter}); + // Send a headers only request. + runHeaderOnlyTest(false, 0); + verifyHeadersOnlyTest(); +} + +void Http2MetadataIntegrationTest::testRequestMetadataWithStopAllFilter() { + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Sends multiple metadata. + const size_t size = 10; + default_request_headers_.addCopy("content_size", std::to_string(size)); + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + Http::MetadataMap metadata_map = {{"metadata1", "metadata1"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + codec_client_->sendData(*request_encoder_, size, false); + metadata_map = {{"metadata2", "metadata2"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + metadata_map = {{"consume", "consume"}}; + codec_client_->sendMetadata(*request_encoder_, metadata_map); + Http::TestHeaderMapImpl request_trailers{{"trailer", "trailer"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + waitForNextUpstreamRequest(); + + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + std::set expected_metadata_keys = {"headers", "data", "metadata", "metadata1", + "metadata2", "replace", "trailers"}; + verifyExpectedMetadata(upstream_request_->metadata_map(), expected_metadata_keys); + EXPECT_EQ(upstream_request_->duplicated_metadata_key_count().find("metadata")->second, 6); +} + +static std::string metadata_stop_all_filter = R"EOF( +name: metadata-stop-all-filter +config: {} +)EOF"; + +TEST_P(Http2MetadataIntegrationTest, RequestMetadataWithStopAllFilterBeforeMetadataFilter) { + addFilters({request_metadata_filter, metadata_stop_all_filter}); + testRequestMetadataWithStopAllFilter(); +} + +TEST_P(Http2MetadataIntegrationTest, RequestMetadataWithStopAllFilterAfterMetadataFilter) { + addFilters({metadata_stop_all_filter, request_metadata_filter}); + testRequestMetadataWithStopAllFilter(); +} + TEST_P(Http2IntegrationTest, GrpcRouterNotFound) { config_helper_.setDefaultHostAndRoute("foo.com", "/found"); initialize(); diff --git a/test/integration/http2_integration_test.h b/test/integration/http2_integration_test.h index c910d6e78cbd3..efa53788f2bdd 100644 --- a/test/integration/http2_integration_test.h +++ b/test/integration/http2_integration_test.h @@ -13,6 +13,14 @@ class Http2IntegrationTest : public testing::TestWithParam filters) { + for (const auto& filter : filters) { + config_helper_.addFilter(filter); + } + } }; class Http2RingHashIntegrationTest : public Http2IntegrationTest { @@ -45,5 +53,11 @@ class Http2MetadataIntegrationTest : public Http2IntegrationTest { setDownstreamProtocol(Http::CodecClient::Type::HTTP2); setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); } + + void testRequestMetadataWithStopAllFilter(); + + void verifyHeadersOnlyTest(); + + void runHeaderOnlyTest(bool send_request_body, size_t body_size); }; } // namespace Envoy diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index e4f16603652e9..fbd7308fd4a64 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -132,6 +132,15 @@ void IntegrationCodecClient::sendReset(Http::StreamEncoder& encoder) { flushWrite(); } +void IntegrationCodecClient::sendMetadata(Http::StreamEncoder& encoder, + Http::MetadataMap metadata_map) { + Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); + Http::MetadataMapVector metadata_map_vector; + metadata_map_vector.push_back(std::move(metadata_map_ptr)); + encoder.encodeMetadata(metadata_map_vector); + flushWrite(); +} + std::pair IntegrationCodecClient::startRequest(const Http::HeaderMap& headers) { auto response = std::make_unique(dispatcher_); @@ -300,6 +309,7 @@ HttpIntegrationTest::waitForNextUpstreamRequest(const std::vector& ups uint64_t upstream_with_request; // If there is no upstream connection, wait for it to be established. if (!fake_upstream_connection_) { + AssertionResult result = AssertionFailure(); for (auto upstream_index : upstream_indices) { result = fake_upstreams_[upstream_index]->waitForHttpConnection( @@ -327,12 +337,6 @@ void HttpIntegrationTest::waitForNextUpstreamRequest(uint64_t upstream_index) { waitForNextUpstreamRequest(std::vector({upstream_index})); } -void HttpIntegrationTest::addFilters(std::vector filters) { - for (const auto& filter : filters) { - config_helper_.addFilter(filter); - } -} - void HttpIntegrationTest::checkSimpleRequestSuccess(uint64_t expected_request_size, uint64_t expected_response_size, IntegrationStreamDecoder* response) { diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 5db6086cf5723..2df23c0462d0d 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -34,12 +34,15 @@ class IntegrationCodecClient : public Http::CodecClientProd { void sendData(Http::StreamEncoder& encoder, uint64_t size, bool end_stream); void sendTrailers(Http::StreamEncoder& encoder, const Http::HeaderMap& trailers); void sendReset(Http::StreamEncoder& encoder); + // Intentionally makes a copy of metadata_map. + void sendMetadata(Http::StreamEncoder& encoder, Http::MetadataMap metadata_map); std::pair startRequest(const Http::HeaderMap& headers); bool waitForDisconnect(std::chrono::milliseconds time_to_wait = std::chrono::milliseconds(0)); Network::ClientConnection* connection() const { return connection_.get(); } Network::ConnectionEvent last_connection_event() const { return last_connection_event_; } Network::Connection& rawConnection() { return *connection_; } + bool disconnected() { return disconnected_; } private: struct ConnectionCallbacks : public Network::ConnectionCallbacks { @@ -135,9 +138,6 @@ class HttpIntegrationTest : public BaseIntegrationTest { // Close |codec_client_| and |fake_upstream_connection_| cleanly. void cleanupUpstreamAndDownstream(); - // Utility function to add filters. - void addFilters(std::vector filters); - // Check for completion of upstream_request_, and a simple "200" response. void checkSimpleRequestSuccess(uint64_t expected_request_size, uint64_t expected_response_size, IntegrationStreamDecoder* response); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 64ebef385c5f9..a3a3f53768c93 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -172,6 +172,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming)); MOCK_METHOD2(injectDecodedDataToFilterChain, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD0(addDecodedTrailers, HeaderMap&()); + MOCK_METHOD0(addDecodedMetadata, MetadataMapVector&()); MOCK_METHOD0(decodingBuffer, const Buffer::Instance*()); MOCK_METHOD1(modifyDecodingBuffer, void(std::function)); MOCK_METHOD1(encode100ContinueHeaders_, void(HeaderMap& headers)); @@ -241,6 +242,7 @@ class MockStreamDecoderFilter : public StreamDecoderFilter { MOCK_METHOD2(decodeHeaders, FilterHeadersStatus(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(decodeData, FilterDataStatus(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(decodeTrailers, FilterTrailersStatus(HeaderMap& trailers)); + MOCK_METHOD1(decodeMetadata, FilterMetadataStatus(Http::MetadataMap& metadata_map)); MOCK_METHOD1(setDecoderFilterCallbacks, void(StreamDecoderFilterCallbacks& callbacks)); MOCK_METHOD0(decodeComplete, void()); @@ -279,6 +281,7 @@ class MockStreamFilter : public StreamFilter { MOCK_METHOD2(decodeHeaders, FilterHeadersStatus(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(decodeData, FilterDataStatus(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(decodeTrailers, FilterTrailersStatus(HeaderMap& trailers)); + MOCK_METHOD1(decodeMetadata, FilterMetadataStatus(Http::MetadataMap& metadata_map)); MOCK_METHOD1(setDecoderFilterCallbacks, void(StreamDecoderFilterCallbacks& callbacks)); // Http::MockStreamEncoderFilter diff --git a/test/server/http/admin_test.cc b/test/server/http/admin_test.cc index c72496089e7e9..274f4dfcd524a 100644 --- a/test/server/http/admin_test.cc +++ b/test/server/http/admin_test.cc @@ -557,6 +557,8 @@ TEST_P(AdminFilterTest, Body) { EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_.decodeHeaders(request_headers_, false)); Buffer::OwnedImpl data("hello"); + Http::MetadataMap metadata_map{{"metadata", "metadata"}}; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_.decodeMetadata(metadata_map)); EXPECT_CALL(callbacks_, addDecodedData(_, false)); EXPECT_CALL(callbacks_, encodeHeaders_(_, false)); EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_.decodeData(data, true));