From d28e7cf507399e9cc8fbcd3325426df2ad78088f Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Tue, 12 Nov 2024 15:23:53 +0000 Subject: [PATCH 01/11] http: unified the filter chain creation in the FilterManager Signed-off-by: wangbaiping(wbpcode) --- source/common/http/conn_manager_impl.cc | 2 +- source/common/http/filter_manager.cc | 83 +++++++++++++++++------- source/common/http/filter_manager.h | 35 +++++++--- source/common/router/router.h | 2 +- source/common/router/upstream_request.cc | 18 ++--- test/common/http/filter_manager_test.cc | 30 ++++----- test/extensions/common/wasm/wasm_test.cc | 4 +- 7 files changed, 113 insertions(+), 61 deletions(-) diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index a580f5aedab5b..1eb1ef2288785 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -1388,7 +1388,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt filter_manager_.streamInfo().setRequestHeaders(*request_headers_); - const bool upgrade_rejected = filter_manager_.createFilterChain() == false; + const bool upgrade_rejected = filter_manager_.createDownstreamFilterChain(); if (connection_manager_.config_->flushAccessLogOnNewRequest()) { log(AccessLog::AccessLogType::DownstreamStart); diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index d158ea8088047..e56c0c31d8f53 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1036,7 +1036,7 @@ void DownstreamFilterManager::prepareLocalReplyViaFilterChain( // For early error handling, do a best-effort attempt to create a filter chain // to ensure access logging. If the filter chain already exists this will be // a no-op. - createFilterChain(); + createDownstreamFilterChain(); if (prepared_local_reply_) { return; @@ -1077,6 +1077,14 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() { Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_)); } +bool DownstreamFilterManager::createDownstreamFilterChain() { + const auto result = + createFilterChain(filter_chain_factory_, /*allow_upgrade_filter_chain=*/true, false); + ASSERT(result.created); + + return result.upgrade_rejected; +} + void DownstreamFilterManager::sendLocalReplyViaFilterChain( bool is_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, bool is_head_request, @@ -1086,7 +1094,7 @@ void DownstreamFilterManager::sendLocalReplyViaFilterChain( // For early error handling, do a best-effort attempt to create a filter chain // to ensure access logging. If the filter chain already exists this will be // a no-op. - createFilterChain(); + createDownstreamFilterChain(); Utility::sendLocalReply( state_.destroyed_, @@ -1623,11 +1631,9 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st tracked_object_stack.add(filter_manager_callbacks_.scope()); } -bool FilterManager::createFilterChain() { - if (state_.created_filter_chain_) { - return false; - } - bool upgrade_rejected = false; +FilterManager::CreateFilterChainResult +FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options) { const HeaderEntry* upgrade = nullptr; if (filter_manager_callbacks_.requestHeaders()) { upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade(); @@ -1638,28 +1644,55 @@ bool FilterManager::createFilterChain() { } } + if (upgrade == nullptr) { + // No upgrade header, no upgrade filter chain. + return {false, false}; + } + + const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap(); + if (filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(), upgrade_map, + *this, options)) { + filter_manager_callbacks_.upgradeFilterChainCreated(); + // The upgrade filter chain is created successfully. + return {true, false}; + } + + // The upgrade filter chain is rejected. Fall through to the default filter chain. + // The default filter chain will be used to handle the upgrade failure local reply. + return {false, true}; +} + +FilterManager::CreateFilterChainResult +FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, + bool allow_upgrade_filter_chain, bool only_create_if_configured) { + if (state_.created_filter_chain_) { + return {true, false}; + } + // This filter chain options is only used for the downstream HTTP filter chains for now. So, try // to set valid initial route only when the downstream callbacks is available. FilterChainOptionsImpl options( filter_manager_callbacks_.downstreamCallbacks().has_value() ? streamInfo().route() : nullptr); - state_.created_filter_chain_ = true; - if (upgrade != nullptr) { - const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap(); + bool upgrade_rejected = false; - if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(), - upgrade_map, *this, options)) { - filter_manager_callbacks_.upgradeFilterChainCreated(); - return true; - } else { - upgrade_rejected = true; - // Fall through to the default filter chain. The function calling this - // will send a local reply indicating that the upgrade failed. + if (allow_upgrade_filter_chain) { + const auto upgrade_result = createUpgradeFilterChain(filter_chain_factory, options); + if (upgrade_result.created) { + ASSERT(!upgrade_result.upgrade_rejected); + state_.created_filter_chain_ = true; + return upgrade_result; } + // The upgrade filter chain may be unnecessary or be rejected. Fall through to the default + // filter chain anyway. + upgrade_rejected = upgrade_result.upgrade_rejected; } - filter_chain_factory_.createFilterChain(*this, false, options); - return !upgrade_rejected; + const bool created = + filter_chain_factory.createFilterChain(*this, only_create_if_configured, options); + state_.created_filter_chain_ = created; + + return {created, upgrade_rejected}; } void ActiveStreamDecoderFilter::requestDataDrained() { @@ -1714,11 +1747,11 @@ bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers) if (headers != nullptr) { // The call to setResponseHeaders is needed to ensure that the headers are properly logged in - // access logs before the stream is destroyed. Since the function expects a ResponseHeaderPtr&&, - // ownership of the headers must be passed. This cannot happen earlier in the flow (such as in - // the call to setupRedirect) because at that point it is still possible for the headers to be - // used in a different logical branch. We work around this by creating a copy and passing - // ownership of the copy instead. + // access logs before the stream is destroyed. Since the function expects a + // ResponseHeaderPtr&&, ownership of the headers must be passed. This cannot happen earlier in + // the flow (such as in the call to setupRedirect) because at that point it is still possible + // for the headers to be used in a different logical branch. We work around this by creating a + // copy and passing ownership of the copy instead. ResponseHeaderMapPtr headers_copy = createHeaderMap(*headers); parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers_copy)); parent_.filter_manager_callbacks_.chargeStats(*headers); diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 94500f13d1c5e..04aac2f34b5b8 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -639,11 +639,10 @@ class FilterManager : public ScopeTrackedObject, FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher, OptRef connection, uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue, - uint32_t buffer_limit, const FilterChainFactory& filter_chain_factory) + uint32_t buffer_limit) : filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher), connection_(connection), stream_id_(stream_id), account_(std::move(account)), - proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit), - filter_chain_factory_(filter_chain_factory) {} + proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit) {} ~FilterManager() override { ASSERT(state_.destroyed_); @@ -837,8 +836,23 @@ class FilterManager : public ScopeTrackedObject, virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; - // Set up the Encoder/Decoder filter chain. - bool createFilterChain(); + struct CreateFilterChainResult { + bool created{}; + bool upgrade_rejected{}; + }; + + /** + * Set up the Encoder/Decoder filter chain. + * @param filter_chain_factory the factory to create the filter chain. + * @param allow_upgrade_filter_chain whether to allow the creation of an upgrade filter chain. + * This only should be set to true for downstream HTTP filter chain. + * @param only_create_if_configured whether to only create the filter chain if it is configured + * explicitly. This only makes sense for upstream HTTP filter chain. + * + */ + CreateFilterChainResult createFilterChain(const FilterChainFactory& filter_chain_factory, + bool allow_upgrade_filter_chain, + bool only_create_if_configured); OptRef connection() const { return connection_; } @@ -966,6 +980,9 @@ class FilterManager : public ScopeTrackedObject, // Indicates which filter to start the iteration with. enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent }; + CreateFilterChainResult createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options); + // Returns the encoder filter to start iteration with. std::list::iterator commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream, @@ -1053,7 +1070,6 @@ class FilterManager : public ScopeTrackedObject, std::make_shared(); absl::optional upstream_override_host_; - const FilterChainFactory& filter_chain_factory_; // TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM, // at which point they no longer need to be friends. friend ActiveStreamFilterBase; @@ -1108,11 +1124,11 @@ class DownstreamFilterManager : public FilterManager { StreamInfo::FilterStateSharedPtr parent_filter_state, Server::OverloadManager& overload_manager) : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account, - proxy_100_continue, buffer_limit, filter_chain_factory), + proxy_100_continue, buffer_limit), stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(), StreamInfo::FilterState::LifeSpan::FilterChain, std::move(parent_filter_state)), - local_reply_(local_reply), + local_reply_(local_reply), filter_chain_factory_(filter_chain_factory), downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint( Server::LoadShedPointName::get().HttpDownstreamFilterCheck)), use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled( @@ -1136,6 +1152,8 @@ class DownstreamFilterManager : public FilterManager { stream_info_.setDownstreamRemoteAddress(downstream_remote_address); } + bool createDownstreamFilterChain(); + /** * Called before local reply is made by the filter manager. * @param data the data associated with the local reply. @@ -1212,6 +1230,7 @@ class DownstreamFilterManager : public FilterManager { private: OverridableRemoteConnectionInfoSetterStreamInfo stream_info_; const LocalReply::LocalReply& local_reply_; + const FilterChainFactory& filter_chain_factory_; Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr}; Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr}; // Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime diff --git a/source/common/router/router.h b/source/common/router/router.h index 730c3a4068c1a..598c47fd8296e 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -199,7 +199,7 @@ class FilterUtility { /** * Configuration for the router filter. */ -class FilterConfig : Http::FilterChainFactory { +class FilterConfig : public Http::FilterChainFactory { public: FilterConfig(Server::Configuration::CommonFactoryContext& factory_context, Stats::StatName stat_prefix, const LocalInfo::LocalInfo& local_info, diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 143b4b3033db4..40d375c67338b 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -47,11 +47,9 @@ class UpstreamFilterManager : public Http::FilterManager { UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher, OptRef connection, uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account, - bool proxy_100_continue, uint32_t buffer_limit, - const Http::FilterChainFactory& filter_chain_factory, - UpstreamRequest& request) + bool proxy_100_continue, uint32_t buffer_limit, UpstreamRequest& request) : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account, - proxy_100_continue, buffer_limit, filter_chain_factory), + proxy_100_continue, buffer_limit), upstream_request_(request) {} StreamInfo::StreamInfo& streamInfo() override { @@ -142,18 +140,20 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent, filter_manager_ = std::make_unique( *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(), parent_.callbacks()->streamId(), parent_.callbacks()->account(), true, - parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this); + parent_.callbacks()->decoderBufferLimit(), *this); // Attempt to create custom cluster-specified filter chain - bool created = parent_.cluster()->createFilterChain(*filter_manager_, - /*only_create_if_configured=*/true); + bool created = filter_manager_ + ->createFilterChain(*parent_.cluster(), false, + /*only_create_if_configured=*/true) + .created; if (!created) { // Attempt to create custom router-specified filter chain. - created = parent_.config().createFilterChain(*filter_manager_); + created = filter_manager_->createFilterChain(parent_.config(), false, false).created; } if (!created) { // Neither cluster nor router have a custom filter chain; add the default // cluster filter chain, which only consists of the codec filter. - created = parent_.cluster()->createFilterChain(*filter_manager_, false); + created = filter_manager_->createFilterChain(*parent_.cluster(), false, false).created; } // There will always be a codec filter present, which sets the upstream // interface. Fast-fail any tests that don't set up mocks correctly. diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index 417de64780df0..b2af2b000d010 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -71,7 +71,7 @@ class FilterManagerTest : public testing::Test { EXPECT_TRUE(MessageDifferencer::Equals(*(fs_value->serializeAsProto()), *expected)); } - std::unique_ptr filter_manager_; + std::unique_ptr filter_manager_; NiceMock filter_manager_callbacks_; NiceMock dispatcher_; NiceMock connection_; @@ -98,7 +98,7 @@ TEST_F(FilterManagerTest, RequestHeadersOrResponseHeadersAccess) { manager.applyFilterFactoryCb({}, encoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); RequestHeaderMapPtr request_headers{ new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; @@ -172,7 +172,7 @@ TEST_F(FilterManagerTest, SendLocalReplyDuringDecodingGrpcClassiciation) { return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); filter_manager_->requestHeadersInitialized(); @@ -237,7 +237,7 @@ TEST_F(FilterManagerTest, SendLocalReplyDuringEncodingGrpcClassiciation) { ON_CALL(filter_manager_callbacks_, requestHeaders()) .WillByDefault(Return(makeOptRef(*grpc_headers))); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); filter_manager_->requestHeadersInitialized(); EXPECT_CALL(local_reply_, rewrite(_, _, _, _, _, _)); @@ -280,7 +280,7 @@ TEST_F(FilterManagerTest, OnLocalReply) { return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); filter_manager_->requestHeadersInitialized(); filter_manager_->decodeHeaders(*headers, true); @@ -343,7 +343,7 @@ TEST_F(FilterManagerTest, MultipleOnLocalReply) { return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); filter_manager_->requestHeadersInitialized(); filter_manager_->decodeHeaders(*headers, true); @@ -399,7 +399,7 @@ TEST_F(FilterManagerTest, ResetIdleTimer) { manager.applyFilterFactoryCb({}, decoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); EXPECT_CALL(filter_manager_callbacks_, resetIdleTimer()); decoder_filter->callbacks_->resetIdleTimer(); @@ -418,7 +418,7 @@ TEST_F(FilterManagerTest, SetAndGetUpstreamOverrideHost) { manager.applyFilterFactoryCb({}, decoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); decoder_filter->callbacks_->setUpstreamOverrideHost(std::make_pair("1.2.3.4", true)); @@ -440,7 +440,7 @@ TEST_F(FilterManagerTest, GetRouteLevelFilterConfig) { manager.applyFilterFactoryCb({"custom-name"}, decoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); std::shared_ptr route(new NiceMock()); auto route_config = std::make_shared(); @@ -488,7 +488,7 @@ TEST_F(FilterManagerTest, GetRouteLevelFilterConfigForNullRoute) { manager.applyFilterFactoryCb({"custom-name"}, decoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); std::shared_ptr route(new NiceMock()); auto route_config = std::make_shared(); @@ -521,7 +521,7 @@ TEST_F(FilterManagerTest, MetadataContinueAll) { manager.applyFilterFactoryCb({"configName2"}, decoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); // Decode path: EXPECT_CALL(*filter_1, decodeHeaders(_, _)).WillOnce(Return(FilterHeadersStatus::StopIteration)); @@ -593,7 +593,7 @@ TEST_F(FilterManagerTest, DecodeMetadataSendsLocalReply) { manager.applyFilterFactoryCb({"configName2"}, factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); RequestHeaderMapPtr basic_headers{ new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; @@ -640,7 +640,7 @@ TEST_F(FilterManagerTest, MetadataContinueAllFollowedByHeadersLocalReply) { manager.applyFilterFactoryCb({"configName2"}, decoder_factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); // Decode path: EXPECT_CALL(*filter_1, decodeHeaders(_, _)).WillOnce(Return(FilterHeadersStatus::StopIteration)); @@ -680,7 +680,7 @@ TEST_F(FilterManagerTest, EncodeMetadataSendsLocalReply) { manager.applyFilterFactoryCb({"configName2"}, factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); // Encode headers first, as metadata can't get ahead of headers. ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; @@ -717,7 +717,7 @@ TEST_F(FilterManagerTest, IdleTimerResets) { manager.applyFilterFactoryCb({"configName1"}, factory); return true; })); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); RequestHeaderMapPtr basic_headers{ new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; diff --git a/test/extensions/common/wasm/wasm_test.cc b/test/extensions/common/wasm/wasm_test.cc index 67917351605a4..e23db333f89d2 100644 --- a/test/extensions/common/wasm/wasm_test.cc +++ b/test/extensions/common/wasm/wasm_test.cc @@ -1476,11 +1476,11 @@ class WasmLocalReplyTest : public WasmCommonContextTest { })); ON_CALL(filter_manager_callbacks_, requestHeaders()) .WillByDefault(Return(makeOptRef(*request_headers_))); - filter_manager_->createFilterChain(); + filter_manager_->createDownstreamFilterChain(); filter_manager_->requestHeadersInitialized(); } - std::unique_ptr filter_manager_; + std::unique_ptr filter_manager_; NiceMock filter_manager_callbacks_; NiceMock dispatcher_; NiceMock connection_; From b37f5b7a512561cb55c369279e2644bcd5f8167f Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Wed, 13 Nov 2024 06:14:01 +0000 Subject: [PATCH 02/11] fix the test case where no filters Signed-off-by: wangbaiping(wbpcode) --- source/common/http/filter_manager.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index e56c0c31d8f53..fc1b00b485ff1 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1078,11 +1078,8 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() { } bool DownstreamFilterManager::createDownstreamFilterChain() { - const auto result = - createFilterChain(filter_chain_factory_, /*allow_upgrade_filter_chain=*/true, false); - ASSERT(result.created); - - return result.upgrade_rejected; + return createFilterChain(filter_chain_factory_, /*allow_upgrade_filter_chain=*/true, false) + .upgrade_rejected; } void DownstreamFilterManager::sendLocalReplyViaFilterChain( From 4a73cb7bcb0f05c13a20d638e3f5e76a68349737 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Thu, 14 Nov 2024 02:16:02 +0000 Subject: [PATCH 03/11] address comments Signed-off-by: wangbaiping(wbpcode) --- source/common/http/filter_manager.cc | 15 +++++++++------ source/common/http/filter_manager.h | 3 --- source/common/router/upstream_request.cc | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index fc1b00b485ff1..34ad65979db25 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1078,8 +1078,7 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() { } bool DownstreamFilterManager::createDownstreamFilterChain() { - return createFilterChain(filter_chain_factory_, /*allow_upgrade_filter_chain=*/true, false) - .upgrade_rejected; + return createFilterChain(filter_chain_factory_, false).upgrade_rejected; } void DownstreamFilterManager::sendLocalReplyViaFilterChain( @@ -1661,19 +1660,23 @@ FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_f FilterManager::CreateFilterChainResult FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, - bool allow_upgrade_filter_chain, bool only_create_if_configured) { + bool only_create_if_configured) { if (state_.created_filter_chain_) { return {true, false}; } + OptRef downstream_callbacks = + filter_manager_callbacks_.downstreamCallbacks(); + // This filter chain options is only used for the downstream HTTP filter chains for now. So, try // to set valid initial route only when the downstream callbacks is available. - FilterChainOptionsImpl options( - filter_manager_callbacks_.downstreamCallbacks().has_value() ? streamInfo().route() : nullptr); + FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr); bool upgrade_rejected = false; - if (allow_upgrade_filter_chain) { + if (downstream_callbacks.has_value()) { + // Only try the upgrade filter chain for downstream filter chains. + const auto upgrade_result = createUpgradeFilterChain(filter_chain_factory, options); if (upgrade_result.created) { ASSERT(!upgrade_result.upgrade_rejected); diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 04aac2f34b5b8..f758433ed038d 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -844,14 +844,11 @@ class FilterManager : public ScopeTrackedObject, /** * Set up the Encoder/Decoder filter chain. * @param filter_chain_factory the factory to create the filter chain. - * @param allow_upgrade_filter_chain whether to allow the creation of an upgrade filter chain. - * This only should be set to true for downstream HTTP filter chain. * @param only_create_if_configured whether to only create the filter chain if it is configured * explicitly. This only makes sense for upstream HTTP filter chain. * */ CreateFilterChainResult createFilterChain(const FilterChainFactory& filter_chain_factory, - bool allow_upgrade_filter_chain, bool only_create_if_configured); OptRef connection() const { return connection_; } diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 40d375c67338b..d960e2a91bc3c 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -143,17 +143,17 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent, parent_.callbacks()->decoderBufferLimit(), *this); // Attempt to create custom cluster-specified filter chain bool created = filter_manager_ - ->createFilterChain(*parent_.cluster(), false, + ->createFilterChain(*parent_.cluster(), /*only_create_if_configured=*/true) .created; if (!created) { // Attempt to create custom router-specified filter chain. - created = filter_manager_->createFilterChain(parent_.config(), false, false).created; + created = filter_manager_->createFilterChain(parent_.config(), false).created; } if (!created) { // Neither cluster nor router have a custom filter chain; add the default // cluster filter chain, which only consists of the codec filter. - created = filter_manager_->createFilterChain(*parent_.cluster(), false, false).created; + created = filter_manager_->createFilterChain(*parent_.cluster(), false).created; } // There will always be a codec filter present, which sets the upstream // interface. Fast-fail any tests that don't set up mocks correctly. From 64eb0f97ed9b1b420ccb78389aef3c528e576913 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Fri, 15 Nov 2024 07:24:26 +0000 Subject: [PATCH 04/11] make the code more simpler Signed-off-by: wangbaiping(wbpcode) --- source/common/http/conn_manager_impl.cc | 7 ++-- source/common/http/conn_manager_impl.h | 52 ++++++++++++------------ source/common/http/filter_manager.cc | 50 ++++++++--------------- source/common/http/filter_manager.h | 17 ++++---- source/common/router/upstream_request.cc | 11 +++-- source/common/router/upstream_request.h | 2 +- test/mocks/http/mocks.h | 2 +- 7 files changed, 62 insertions(+), 79 deletions(-) diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 1eb1ef2288785..ba4033a45887b 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -952,7 +952,7 @@ void ConnectionManagerImpl::ActiveStream::completeRequest() { *active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(), filter_manager_.streamInfo(), *this); } - if (state_.successful_upgrade_) { + if (state_.upgrade_accepted_) { connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec(); } } @@ -1387,8 +1387,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt } filter_manager_.streamInfo().setRequestHeaders(*request_headers_); - - const bool upgrade_rejected = filter_manager_.createDownstreamFilterChain(); + filter_manager_.createDownstreamFilterChain(); if (connection_manager_.config_->flushAccessLogOnNewRequest()) { log(AccessLog::AccessLogType::DownstreamStart); @@ -1398,7 +1397,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt // should return 404. The current returns no response if there is no router filter. if (hasCachedRoute()) { // Do not allow upgrades if the route does not support it. - if (upgrade_rejected) { + if (state_.upgrade_rejected_) { // While downstream servers should not send upgrade payload without the upgrade being // accepted, err on the side of caution and refuse to process any further requests on this // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index ffa0aab7f6529..48f455a0aa038 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -274,10 +274,14 @@ class ConnectionManagerImpl : Logger::Loggable, } void onDecoderFilterBelowWriteBufferLowWatermark() override; void onDecoderFilterAboveWriteBufferHighWatermark() override; - void upgradeFilterChainCreated() override { - connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc(); - connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc(); - state_.successful_upgrade_ = true; + void upgradeFilterChainCreated(bool created) override { + if (created) { + connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc(); + connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc(); + state_.upgrade_accepted_ = true; + } else { + state_.upgrade_rejected_ = true; + } } void disarmRequestTimeout() override; void resetIdleTimer() override; @@ -334,44 +338,42 @@ class ConnectionManagerImpl : Logger::Loggable, // All state for the stream. Put here for readability. struct State { - State() - : codec_saw_local_complete_(false), codec_encode_complete_(false), - on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false), - is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false), - decorated_propagate_(true), deferred_to_next_io_iteration_(false), - deferred_end_stream_(false) {} - // It's possibly for the codec to see the completed response but not fully // encode it. - bool codec_saw_local_complete_ : 1; // This indicates that local is complete as the completed - // response has made its way to the codec. - bool codec_encode_complete_ : 1; // This indicates that the codec has - // completed encoding the response. - bool on_reset_stream_called_ : 1; // Whether the stream has been reset. - bool is_zombie_stream_ : 1; // Whether stream is waiting for signal - // the underlying codec to be destroyed. - bool successful_upgrade_ : 1; + bool codec_saw_local_complete_{}; // This indicates that local is complete as the completed + + // response has made its way to the codec. + bool codec_encode_complete_{}; // This indicates that the codec has + // completed encoding the response. + bool on_reset_stream_called_{}; // Whether the stream has been reset. + bool is_zombie_stream_{}; // Whether stream is waiting for signal + // the underlying codec to be destroyed. + + // Whether the upgrade request has be accepted or rejected by the filter chain. + // Both are false if the request is not an upgrade request. + bool upgrade_accepted_{}; + bool upgrade_rejected_{}; // True if this stream was the original externally created stream, but was // destroyed as part of internal redirect. - bool is_internally_destroyed_ : 1; + bool is_internally_destroyed_{}; // True if this stream is internally created. Currently only used for // internal redirects or other streams created via recreateStream(). - bool is_internally_created_ : 1; + bool is_internally_created_{}; // True if the response headers indicate a successful upgrade or connect // response. - bool is_tunneling_ : 1; + bool is_tunneling_{}; - bool decorated_propagate_ : 1; + bool decorated_propagate_{true}; // Indicates that sending headers to the filter manager is deferred to the // next I/O cycle. If data or trailers are received when this flag is set // they are deferred too. // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate // structure, so it can be atomically created and cleared. - bool deferred_to_next_io_iteration_ : 1; - bool deferred_end_stream_ : 1; + bool deferred_to_next_io_iteration_{}; + bool deferred_end_stream_{}; }; bool canDestroyStream() const { diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 34ad65979db25..6f2cd67739123 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1078,7 +1078,7 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() { } bool DownstreamFilterManager::createDownstreamFilterChain() { - return createFilterChain(filter_chain_factory_, false).upgrade_rejected; + return createFilterChain(filter_chain_factory_, false); } void DownstreamFilterManager::sendLocalReplyViaFilterChain( @@ -1627,9 +1627,8 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st tracked_object_stack.add(filter_manager_callbacks_.scope()); } -FilterManager::CreateFilterChainResult -FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, - const FilterChainOptionsImpl& options) { +bool FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options) { const HeaderEntry* upgrade = nullptr; if (filter_manager_callbacks_.requestHeaders()) { upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade(); @@ -1642,27 +1641,26 @@ FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_f if (upgrade == nullptr) { // No upgrade header, no upgrade filter chain. - return {false, false}; + return false; } const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap(); if (filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(), upgrade_map, *this, options)) { - filter_manager_callbacks_.upgradeFilterChainCreated(); - // The upgrade filter chain is created successfully. - return {true, false}; + filter_manager_callbacks_.upgradeFilterChainCreated(true); + return true; + } else { + filter_manager_callbacks_.upgradeFilterChainCreated(false); + // The upgrade filter chain is rejected. Fall through to the default filter chain. + // The default filter chain will be used to handle the upgrade failure local reply. + return false; } - - // The upgrade filter chain is rejected. Fall through to the default filter chain. - // The default filter chain will be used to handle the upgrade failure local reply. - return {false, true}; } -FilterManager::CreateFilterChainResult -FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, - bool only_create_if_configured) { +bool FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, + bool only_create_if_configured) { if (state_.created_filter_chain_) { - return {true, false}; + return true; } OptRef downstream_callbacks = @@ -1672,27 +1670,15 @@ FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, // to set valid initial route only when the downstream callbacks is available. FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr); - bool upgrade_rejected = false; - if (downstream_callbacks.has_value()) { // Only try the upgrade filter chain for downstream filter chains. - - const auto upgrade_result = createUpgradeFilterChain(filter_chain_factory, options); - if (upgrade_result.created) { - ASSERT(!upgrade_result.upgrade_rejected); - state_.created_filter_chain_ = true; - return upgrade_result; + const bool created = createUpgradeFilterChain(filter_chain_factory, options); + if (created) { + return true; } - // The upgrade filter chain may be unnecessary or be rejected. Fall through to the default - // filter chain anyway. - upgrade_rejected = upgrade_result.upgrade_rejected; } - const bool created = - filter_chain_factory.createFilterChain(*this, only_create_if_configured, options); - state_.created_filter_chain_ = created; - - return {created, upgrade_rejected}; + return filter_chain_factory.createFilterChain(*this, only_create_if_configured, options); } void ActiveStreamDecoderFilter::requestDataDrained() { diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index f758433ed038d..d12c2c2e86b43 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -461,8 +461,10 @@ class FilterManagerCallbacks { /** * Called when the FilterManager creates an Upgrade filter chain. + * @param created whether the upgrade filter chain was created. If false, the upgrade + * is rejected. */ - virtual void upgradeFilterChainCreated() PURE; + virtual void upgradeFilterChainCreated(bool created) PURE; /** * Called when request activity indicates that the request timeout should be disarmed. @@ -836,11 +838,6 @@ class FilterManager : public ScopeTrackedObject, virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; - struct CreateFilterChainResult { - bool created{}; - bool upgrade_rejected{}; - }; - /** * Set up the Encoder/Decoder filter chain. * @param filter_chain_factory the factory to create the filter chain. @@ -848,8 +845,8 @@ class FilterManager : public ScopeTrackedObject, * explicitly. This only makes sense for upstream HTTP filter chain. * */ - CreateFilterChainResult createFilterChain(const FilterChainFactory& filter_chain_factory, - bool only_create_if_configured); + bool createFilterChain(const FilterChainFactory& filter_chain_factory, + bool only_create_if_configured); OptRef connection() const { return connection_; } @@ -977,8 +974,8 @@ class FilterManager : public ScopeTrackedObject, // Indicates which filter to start the iteration with. enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent }; - CreateFilterChainResult createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, - const FilterChainOptionsImpl& options); + bool createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options); // Returns the encoder filter to start iteration with. std::list::iterator diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index d960e2a91bc3c..71784d87f8157 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -142,18 +142,17 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent, parent_.callbacks()->streamId(), parent_.callbacks()->account(), true, parent_.callbacks()->decoderBufferLimit(), *this); // Attempt to create custom cluster-specified filter chain - bool created = filter_manager_ - ->createFilterChain(*parent_.cluster(), - /*only_create_if_configured=*/true) - .created; + bool created = filter_manager_->createFilterChain(*parent_.cluster(), + /*only_create_if_configured=*/true); + if (!created) { // Attempt to create custom router-specified filter chain. - created = filter_manager_->createFilterChain(parent_.config(), false).created; + created = filter_manager_->createFilterChain(parent_.config(), false); } if (!created) { // Neither cluster nor router have a custom filter chain; add the default // cluster filter chain, which only consists of the codec filter. - created = filter_manager_->createFilterChain(*parent_.cluster(), false).created; + created = filter_manager_->createFilterChain(*parent_.cluster(), false); } // There will always be a codec filter present, which sets the upstream // interface. Fast-fail any tests that don't set up mocks correctly. diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index f69e1f2678fec..1605264eec56a 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -345,7 +345,7 @@ class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallback void recreateStream(StreamInfo::FilterStateSharedPtr) override { IS_ENVOY_BUG("recreateStream called from upstream HTTP filter"); } - void upgradeFilterChainCreated() override { + void upgradeFilterChainCreated(bool) override { IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter"); } OptRef upstreamCallbacks() override { return {*this}; } diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 28f83b7cdf73f..548150e02be35 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -86,7 +86,7 @@ class MockFilterManagerCallbacks : public FilterManagerCallbacks { MOCK_METHOD(void, endStream, ()); MOCK_METHOD(void, onDecoderFilterBelowWriteBufferLowWatermark, ()); MOCK_METHOD(void, onDecoderFilterAboveWriteBufferHighWatermark, ()); - MOCK_METHOD(void, upgradeFilterChainCreated, ()); + MOCK_METHOD(void, upgradeFilterChainCreated, (bool)); MOCK_METHOD(void, disarmRequestTimeout, ()); MOCK_METHOD(void, resetIdleTimer, ()); MOCK_METHOD(void, recreateStream, (StreamInfo::FilterStateSharedPtr filter_state)); From 0308ca1b5e7a99d083922d02dc3182ff1d430aa2 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Fri, 15 Nov 2024 09:58:08 +0000 Subject: [PATCH 05/11] fix test Signed-off-by: wangbaiping(wbpcode) --- source/common/http/filter_manager.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 6f2cd67739123..a38d2eff5ee8d 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1672,13 +1672,16 @@ bool FilterManager::createFilterChain(const FilterChainFactory& filter_chain_fac if (downstream_callbacks.has_value()) { // Only try the upgrade filter chain for downstream filter chains. - const bool created = createUpgradeFilterChain(filter_chain_factory, options); - if (created) { + if (createUpgradeFilterChain(filter_chain_factory, options)) { + state_.created_filter_chain_ = true; return true; } } - return filter_chain_factory.createFilterChain(*this, only_create_if_configured, options); + const bool created = + filter_chain_factory.createFilterChain(*this, only_create_if_configured, options); + state_.created_filter_chain_ = created; + return created; } void ActiveStreamDecoderFilter::requestDataDrained() { From e13c9a90f12abd311a4e3af8766879d594f8ac23 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Wed, 20 Nov 2024 15:52:29 +0000 Subject: [PATCH 06/11] address comments Signed-off-by: wangbaiping(wbpcode) --- source/common/http/conn_manager_impl.cc | 12 ++++-- source/common/http/conn_manager_impl.h | 11 +---- source/common/http/filter_manager.cc | 43 +++++++++---------- source/common/http/filter_manager.h | 54 ++++++++++++++++++------ source/common/router/upstream_request.cc | 10 +++-- source/common/router/upstream_request.h | 3 -- test/mocks/http/mocks.h | 1 - 7 files changed, 78 insertions(+), 56 deletions(-) diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index ba4033a45887b..d4439f7887af2 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -952,7 +952,7 @@ void ConnectionManagerImpl::ActiveStream::completeRequest() { *active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(), filter_manager_.streamInfo(), *this); } - if (state_.upgrade_accepted_) { + if (state_.successful_upgrade_) { connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec(); } } @@ -1387,7 +1387,13 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt } filter_manager_.streamInfo().setRequestHeaders(*request_headers_); - filter_manager_.createDownstreamFilterChain(); + const FilterManager::CreateChainResult create_chain_result = + filter_manager_.createDownstreamFilterChain(); + if (create_chain_result.upgradeAccepted()) { + connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc(); + connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc(); + state_.successful_upgrade_ = true; + } if (connection_manager_.config_->flushAccessLogOnNewRequest()) { log(AccessLog::AccessLogType::DownstreamStart); @@ -1397,7 +1403,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt // should return 404. The current returns no response if there is no router filter. if (hasCachedRoute()) { // Do not allow upgrades if the route does not support it. - if (state_.upgrade_rejected_) { + if (create_chain_result.upgradeRejected()) { // While downstream servers should not send upgrade payload without the upgrade being // accepted, err on the side of caution and refuse to process any further requests on this // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 48f455a0aa038..907ddb1c4d3ff 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -274,15 +274,6 @@ class ConnectionManagerImpl : Logger::Loggable, } void onDecoderFilterBelowWriteBufferLowWatermark() override; void onDecoderFilterAboveWriteBufferHighWatermark() override; - void upgradeFilterChainCreated(bool created) override { - if (created) { - connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc(); - connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc(); - state_.upgrade_accepted_ = true; - } else { - state_.upgrade_rejected_ = true; - } - } void disarmRequestTimeout() override; void resetIdleTimer() override; void recreateStream(StreamInfo::FilterStateSharedPtr filter_state) override; @@ -351,7 +342,7 @@ class ConnectionManagerImpl : Logger::Loggable, // Whether the upgrade request has be accepted or rejected by the filter chain. // Both are false if the request is not an upgrade request. - bool upgrade_accepted_{}; + bool successful_upgrade_{}; bool upgrade_rejected_{}; // True if this stream was the original externally created stream, but was diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index a38d2eff5ee8d..71c30849e6cee 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1077,7 +1077,7 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() { Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_)); } -bool DownstreamFilterManager::createDownstreamFilterChain() { +FilterManager::CreateChainResult DownstreamFilterManager::createDownstreamFilterChain() { return createFilterChain(filter_chain_factory_, false); } @@ -1645,22 +1645,15 @@ bool FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_ch } const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap(); - if (filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(), upgrade_map, - *this, options)) { - filter_manager_callbacks_.upgradeFilterChainCreated(true); - return true; - } else { - filter_manager_callbacks_.upgradeFilterChainCreated(false); - // The upgrade filter chain is rejected. Fall through to the default filter chain. - // The default filter chain will be used to handle the upgrade failure local reply. - return false; - } + return filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(), + upgrade_map, *this, options); } -bool FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, - bool only_create_if_configured) { - if (state_.created_filter_chain_) { - return true; +FilterManager::CreateChainResult +FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, + bool only_create_if_configured) { + if (state_.create_chain_result_.created()) { + return state_.create_chain_result_; } OptRef downstream_callbacks = @@ -1670,18 +1663,24 @@ bool FilterManager::createFilterChain(const FilterChainFactory& filter_chain_fac // to set valid initial route only when the downstream callbacks is available. FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr); + absl::optional upgrade = absl::nullopt; + + // Only try the upgrade filter chain for downstream filter chains. if (downstream_callbacks.has_value()) { - // Only try the upgrade filter chain for downstream filter chains. if (createUpgradeFilterChain(filter_chain_factory, options)) { - state_.created_filter_chain_ = true; - return true; + // Upgrade filter chain is created. Return the result directly. + state_.create_chain_result_ = CreateChainResult(true, true); + return state_.create_chain_result_; + } else { + // Set the upgrade flag to false if the upgrade filter chain is rejected and fall through to + // the default filter chain. + upgrade = false; } } - const bool created = - filter_chain_factory.createFilterChain(*this, only_create_if_configured, options); - state_.created_filter_chain_ = created; - return created; + state_.create_chain_result_ = CreateChainResult( + filter_chain_factory.createFilterChain(*this, only_create_if_configured, options), upgrade); + return state_.create_chain_result_; } void ActiveStreamDecoderFilter::requestDataDrained() { diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index d12c2c2e86b43..bdb981c25987e 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -459,13 +459,6 @@ class FilterManagerCallbacks { */ virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE; - /** - * Called when the FilterManager creates an Upgrade filter chain. - * @param created whether the upgrade filter chain was created. If false, the upgrade - * is rejected. - */ - virtual void upgradeFilterChainCreated(bool created) PURE; - /** * Called when request activity indicates that the request timeout should be disarmed. */ @@ -831,13 +824,46 @@ class FilterManager : public ScopeTrackedObject, * a local reply without the overhead of creating and traversing the filters. */ void skipFilterChainCreation() { - ASSERT(!state_.created_filter_chain_); - state_.created_filter_chain_ = true; + ASSERT(!state_.create_chain_result_.created()); + state_.create_chain_result_ = CreateChainResult(true, absl::nullopt); } virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; + /** + * Filter chain creation result. + */ + class CreateChainResult { + public: + CreateChainResult() = default; + + /** + * @param created whether the filter chain was created. + * @param upgrade whether the upgrade was accepted or rejected. absl::nullopt if no upgrade + * was requested. True if the upgrade was accepted, false if it was rejected. + */ + CreateChainResult(bool created, absl::optional upgrade) + : created_(created), upgrade_(upgrade) {} + + /** + * @return whether the filter chain was created. + */ + bool created() const { return created_; } + /** + * @return whether the upgrade was accepted. + */ + bool upgradeAccepted() const { return upgrade_.has_value() ? upgrade_.value() : false; } + /** + * @return whether the upgrade was rejected. + */ + bool upgradeRejected() const { return upgrade_.has_value() ? !upgrade_.value() : false; } + + private: + bool created_{}; + absl::optional upgrade_{}; + }; + /** * Set up the Encoder/Decoder filter chain. * @param filter_chain_factory the factory to create the filter chain. @@ -845,8 +871,8 @@ class FilterManager : public ScopeTrackedObject, * explicitly. This only makes sense for upstream HTTP filter chain. * */ - bool createFilterChain(const FilterChainFactory& filter_chain_factory, - bool only_create_if_configured); + CreateChainResult createFilterChain(const FilterChainFactory& filter_chain_factory, + bool only_create_if_configured); OptRef connection() const { return connection_; } @@ -889,7 +915,6 @@ class FilterManager : public ScopeTrackedObject, // By default, we will assume there are no 1xx. If encode1xxHeaders // is ever called, this is set to true so commonContinue resumes processing the 1xx. bool has_1xx_headers_{}; - bool created_filter_chain_{}; // These two are latched on initial header read, to determine if the original headers // constituted a HEAD or gRPC request, respectively. bool is_head_request_{}; @@ -912,6 +937,9 @@ class FilterManager : public ScopeTrackedObject, bool decoder_filters_streaming_{true}; bool destroyed_{false}; + // Result of filter chain creation. + CreateChainResult create_chain_result_{}; + // Used to track which filter is the latest filter that has received data. ActiveStreamEncoderFilter* latest_data_encoding_filter_{}; ActiveStreamDecoderFilter* latest_data_decoding_filter_{}; @@ -1146,7 +1174,7 @@ class DownstreamFilterManager : public FilterManager { stream_info_.setDownstreamRemoteAddress(downstream_remote_address); } - bool createDownstreamFilterChain(); + CreateChainResult createDownstreamFilterChain(); /** * Called before local reply is made by the filter manager. diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 71784d87f8157..603ea0e35eb26 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -142,17 +142,19 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent, parent_.callbacks()->streamId(), parent_.callbacks()->account(), true, parent_.callbacks()->decoderBufferLimit(), *this); // Attempt to create custom cluster-specified filter chain - bool created = filter_manager_->createFilterChain(*parent_.cluster(), - /*only_create_if_configured=*/true); + bool created = filter_manager_ + ->createFilterChain(*parent_.cluster(), + /*only_create_if_configured=*/true) + .created(); if (!created) { // Attempt to create custom router-specified filter chain. - created = filter_manager_->createFilterChain(parent_.config(), false); + created = filter_manager_->createFilterChain(parent_.config(), false).created(); } if (!created) { // Neither cluster nor router have a custom filter chain; add the default // cluster filter chain, which only consists of the codec filter. - created = filter_manager_->createFilterChain(*parent_.cluster(), false); + created = filter_manager_->createFilterChain(*parent_.cluster(), false).created(); } // There will always be a codec filter present, which sets the upstream // interface. Fast-fail any tests that don't set up mocks correctly. diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 1605264eec56a..0f4d93a287476 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -345,9 +345,6 @@ class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallback void recreateStream(StreamInfo::FilterStateSharedPtr) override { IS_ENVOY_BUG("recreateStream called from upstream HTTP filter"); } - void upgradeFilterChainCreated(bool) override { - IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter"); - } OptRef upstreamCallbacks() override { return {*this}; } // Http::UpstreamStreamFilterCallbacks diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 548150e02be35..606c16f27dd7e 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -86,7 +86,6 @@ class MockFilterManagerCallbacks : public FilterManagerCallbacks { MOCK_METHOD(void, endStream, ()); MOCK_METHOD(void, onDecoderFilterBelowWriteBufferLowWatermark, ()); MOCK_METHOD(void, onDecoderFilterAboveWriteBufferHighWatermark, ()); - MOCK_METHOD(void, upgradeFilterChainCreated, (bool)); MOCK_METHOD(void, disarmRequestTimeout, ()); MOCK_METHOD(void, resetIdleTimer, ()); MOCK_METHOD(void, recreateStream, (StreamInfo::FilterStateSharedPtr filter_state)); From dcd4bdfa81562d1196f549b972e3aa5e3ba477d5 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Thu, 21 Nov 2024 00:22:21 +0000 Subject: [PATCH 07/11] fix logic Signed-off-by: wangbaiping(wbpcode) --- source/common/http/filter_manager.cc | 16 ++++++++-------- source/common/http/filter_manager.h | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 71c30849e6cee..92169814b4bcb 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1627,8 +1627,9 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st tracked_object_stack.add(filter_manager_callbacks_.scope()); } -bool FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, - const FilterChainOptionsImpl& options) { +absl::optional +FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options) { const HeaderEntry* upgrade = nullptr; if (filter_manager_callbacks_.requestHeaders()) { upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade(); @@ -1641,7 +1642,7 @@ bool FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_ch if (upgrade == nullptr) { // No upgrade header, no upgrade filter chain. - return false; + return absl::nullopt; } const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap(); @@ -1667,15 +1668,14 @@ FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, // Only try the upgrade filter chain for downstream filter chains. if (downstream_callbacks.has_value()) { - if (createUpgradeFilterChain(filter_chain_factory, options)) { + upgrade = createUpgradeFilterChain(filter_chain_factory, options); + if (upgrade.value_or(false)) { // Upgrade filter chain is created. Return the result directly. state_.create_chain_result_ = CreateChainResult(true, true); return state_.create_chain_result_; - } else { - // Set the upgrade flag to false if the upgrade filter chain is rejected and fall through to - // the default filter chain. - upgrade = false; } + // If the upgrade is unnecessary or the upgrade filter chain is rejected, fall through to + // create the default filter chain. } state_.create_chain_result_ = CreateChainResult( diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index bdb981c25987e..e8b7a2a1ad0cf 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -1002,8 +1002,8 @@ class FilterManager : public ScopeTrackedObject, // Indicates which filter to start the iteration with. enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent }; - bool createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, - const FilterChainOptionsImpl& options); + absl::optional createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options); // Returns the encoder filter to start iteration with. std::list::iterator From 2bd25920762b3af9fcc77ae76e16e0884c7d8319 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Fri, 22 Nov 2024 02:28:47 +0000 Subject: [PATCH 08/11] remove unnecessary field Signed-off-by: wangbaiping(wbpcode) --- source/common/http/conn_manager_impl.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 907ddb1c4d3ff..b2b19a20faabe 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -340,10 +340,9 @@ class ConnectionManagerImpl : Logger::Loggable, bool is_zombie_stream_{}; // Whether stream is waiting for signal // the underlying codec to be destroyed. - // Whether the upgrade request has be accepted or rejected by the filter chain. - // Both are false if the request is not an upgrade request. + // Whether the upgrade request has be accepted by the filter chain. + // False if the request is not an upgrade request or upgrade is rejected by the filter chain. bool successful_upgrade_{}; - bool upgrade_rejected_{}; // True if this stream was the original externally created stream, but was // destroyed as part of internal redirect. From f2958c518fad50feb1d35e37e743aa2583a1a23a Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Thu, 28 Nov 2024 02:24:28 +0000 Subject: [PATCH 09/11] address comments Signed-off-by: wangbaiping(wbpcode) --- source/common/http/conn_manager_impl.h | 27 +++++++++++++------------- source/common/http/filter_manager.cc | 14 +++++++------ source/common/http/filter_manager.h | 19 +++++++++--------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index b2b19a20faabe..61b81f7e9fb7a 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -331,39 +331,40 @@ class ConnectionManagerImpl : Logger::Loggable, struct State { // It's possibly for the codec to see the completed response but not fully // encode it. - bool codec_saw_local_complete_{}; // This indicates that local is complete as the completed + bool codec_saw_local_complete_ = false; // This indicates that local is complete + // as the completed // response has made its way to the codec. - bool codec_encode_complete_{}; // This indicates that the codec has - // completed encoding the response. - bool on_reset_stream_called_{}; // Whether the stream has been reset. - bool is_zombie_stream_{}; // Whether stream is waiting for signal - // the underlying codec to be destroyed. + bool codec_encode_complete_ = false; // This indicates that the codec has + // completed encoding the response. + bool on_reset_stream_called_ = false; // Whether the stream has been reset. + bool is_zombie_stream_ = false; // Whether stream is waiting for signal + // the underlying codec to be destroyed. // Whether the upgrade request has be accepted by the filter chain. // False if the request is not an upgrade request or upgrade is rejected by the filter chain. - bool successful_upgrade_{}; + bool successful_upgrade_ = false; // True if this stream was the original externally created stream, but was // destroyed as part of internal redirect. - bool is_internally_destroyed_{}; + bool is_internally_destroyed_ = false; // True if this stream is internally created. Currently only used for // internal redirects or other streams created via recreateStream(). - bool is_internally_created_{}; + bool is_internally_created_ = false; // True if the response headers indicate a successful upgrade or connect // response. - bool is_tunneling_{}; + bool is_tunneling_ = false; - bool decorated_propagate_{true}; + bool decorated_propagate_ = true; // Indicates that sending headers to the filter manager is deferred to the // next I/O cycle. If data or trailers are received when this flag is set // they are deferred too. // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate // structure, so it can be atomically created and cleared. - bool deferred_to_next_io_iteration_{}; - bool deferred_end_stream_{}; + bool deferred_to_next_io_iteration_ = false; + bool deferred_end_stream_ = false; }; bool canDestroyStream() const { diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 92169814b4bcb..8e167357738e9 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1627,7 +1627,7 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st tracked_object_stack.add(filter_manager_callbacks_.scope()); } -absl::optional +FilterManager::UpgradeResult FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, const FilterChainOptionsImpl& options) { const HeaderEntry* upgrade = nullptr; @@ -1642,12 +1642,14 @@ FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_f if (upgrade == nullptr) { // No upgrade header, no upgrade filter chain. - return absl::nullopt; + return UpgradeResult::UpgradeUnneeded; } const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap(); return filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(), - upgrade_map, *this, options); + upgrade_map, *this, options) + ? UpgradeResult::UpgradeAccepted + : UpgradeResult::UpgradeRejected; } FilterManager::CreateChainResult @@ -1664,14 +1666,14 @@ FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory, // to set valid initial route only when the downstream callbacks is available. FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr); - absl::optional upgrade = absl::nullopt; + UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded; // Only try the upgrade filter chain for downstream filter chains. if (downstream_callbacks.has_value()) { upgrade = createUpgradeFilterChain(filter_chain_factory, options); - if (upgrade.value_or(false)) { + if (upgrade == UpgradeResult::UpgradeAccepted) { // Upgrade filter chain is created. Return the result directly. - state_.create_chain_result_ = CreateChainResult(true, true); + state_.create_chain_result_ = CreateChainResult(true, upgrade); return state_.create_chain_result_; } // If the upgrade is unnecessary or the upgrade filter chain is rejected, fall through to diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index e8b7a2a1ad0cf..af3fca59a350a 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -825,12 +825,14 @@ class FilterManager : public ScopeTrackedObject, */ void skipFilterChainCreation() { ASSERT(!state_.create_chain_result_.created()); - state_.create_chain_result_ = CreateChainResult(true, absl::nullopt); + state_.create_chain_result_ = CreateChainResult(true); } virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; + enum class UpgradeResult : uint8_t { UpgradeUnneeded, UpgradeAccepted, UpgradeRejected }; + /** * Filter chain creation result. */ @@ -840,10 +842,9 @@ class FilterManager : public ScopeTrackedObject, /** * @param created whether the filter chain was created. - * @param upgrade whether the upgrade was accepted or rejected. absl::nullopt if no upgrade - * was requested. True if the upgrade was accepted, false if it was rejected. + * @param upgrade the upgrade result. */ - CreateChainResult(bool created, absl::optional upgrade) + CreateChainResult(bool created, UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded) : created_(created), upgrade_(upgrade) {} /** @@ -853,15 +854,15 @@ class FilterManager : public ScopeTrackedObject, /** * @return whether the upgrade was accepted. */ - bool upgradeAccepted() const { return upgrade_.has_value() ? upgrade_.value() : false; } + bool upgradeAccepted() const { return upgrade_ == UpgradeResult::UpgradeAccepted; } /** * @return whether the upgrade was rejected. */ - bool upgradeRejected() const { return upgrade_.has_value() ? !upgrade_.value() : false; } + bool upgradeRejected() const { return upgrade_ == UpgradeResult::UpgradeRejected; } private: bool created_{}; - absl::optional upgrade_{}; + UpgradeResult upgrade_{}; }; /** @@ -1002,8 +1003,8 @@ class FilterManager : public ScopeTrackedObject, // Indicates which filter to start the iteration with. enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent }; - absl::optional createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, - const FilterChainOptionsImpl& options); + UpgradeResult createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, + const FilterChainOptionsImpl& options); // Returns the encoder filter to start iteration with. std::list::iterator From 8e4b1156792294b4ed659db0294d5ddbe101fc86 Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Thu, 28 Nov 2024 02:28:30 +0000 Subject: [PATCH 10/11] style Signed-off-by: wangbaiping(wbpcode) --- source/common/http/filter_manager.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 641f1bec543b1..bbf5a0336a4a3 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -870,8 +870,8 @@ class FilterManager : public ScopeTrackedObject, bool upgradeRejected() const { return upgrade_ == UpgradeResult::UpgradeRejected; } private: - bool created_{}; - UpgradeResult upgrade_{}; + bool created_ = false; + UpgradeResult upgrade_ = UpgradeResult::UpgradeUnneeded; }; /** From cd2e199e86da4031751ea6cfcc6466cf85e7723f Mon Sep 17 00:00:00 2001 From: "wangbaiping(wbpcode)" Date: Mon, 2 Dec 2024 03:13:11 +0000 Subject: [PATCH 11/11] revert state change Signed-off-by: wangbaiping(wbpcode) --- source/common/http/conn_manager_impl.h | 40 ++++++++++++++------------ 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index d51276b8a93c0..01062379d01cf 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -341,42 +341,44 @@ class ConnectionManagerImpl : Logger::Loggable, // All state for the stream. Put here for readability. struct State { + State() + : codec_saw_local_complete_(false), codec_encode_complete_(false), + on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false), + is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false), + decorated_propagate_(true), deferred_to_next_io_iteration_(false), + deferred_end_stream_(false) {} + // It's possibly for the codec to see the completed response but not fully // encode it. - bool codec_saw_local_complete_ = false; // This indicates that local is complete - // as the completed - - // response has made its way to the codec. - bool codec_encode_complete_ = false; // This indicates that the codec has - // completed encoding the response. - bool on_reset_stream_called_ = false; // Whether the stream has been reset. - bool is_zombie_stream_ = false; // Whether stream is waiting for signal - // the underlying codec to be destroyed. - - // Whether the upgrade request has be accepted by the filter chain. - // False if the request is not an upgrade request or upgrade is rejected by the filter chain. - bool successful_upgrade_ = false; + bool codec_saw_local_complete_ : 1; // This indicates that local is complete as the completed + // response has made its way to the codec. + bool codec_encode_complete_ : 1; // This indicates that the codec has + // completed encoding the response. + bool on_reset_stream_called_ : 1; // Whether the stream has been reset. + bool is_zombie_stream_ : 1; // Whether stream is waiting for signal + // the underlying codec to be destroyed. + bool successful_upgrade_ : 1; // True if this stream was the original externally created stream, but was // destroyed as part of internal redirect. - bool is_internally_destroyed_ = false; + bool is_internally_destroyed_ : 1; // True if this stream is internally created. Currently only used for // internal redirects or other streams created via recreateStream(). - bool is_internally_created_ = false; + bool is_internally_created_ : 1; // True if the response headers indicate a successful upgrade or connect // response. - bool is_tunneling_ = false; + bool is_tunneling_ : 1; - bool decorated_propagate_ = true; + bool decorated_propagate_ : 1; // Indicates that sending headers to the filter manager is deferred to the // next I/O cycle. If data or trailers are received when this flag is set // they are deferred too. // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate // structure, so it can be atomically created and cleared. - bool deferred_to_next_io_iteration_ = false; - bool deferred_end_stream_ = false; + bool deferred_to_next_io_iteration_ : 1; + bool deferred_end_stream_ : 1; }; bool canDestroyStream() const {