-
Notifications
You must be signed in to change notification settings - Fork 5.3k
http: unified the filter chain creation in the FilterManager #37112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d28e7cf
b37f5b7
4a73cb7
64eb0f9
0308ca1
e13c9a9
dcd4bdf
2bd2592
f2958c5
b7c67b4
8e4b115
cd2e199
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -465,11 +465,6 @@ class FilterManagerCallbacks { | |
| */ | ||
| virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE; | ||
|
|
||
| /** | ||
| * Called when the FilterManager creates an Upgrade filter chain. | ||
| */ | ||
| virtual void upgradeFilterChainCreated() PURE; | ||
|
|
||
| /** | ||
| * Called when request activity indicates that the request timeout should be disarmed. | ||
| */ | ||
|
|
@@ -648,11 +643,10 @@ class FilterManager : public ScopeTrackedObject, | |
| FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher, | ||
| OptRef<const Network::Connection> connection, uint64_t stream_id, | ||
| Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue, | ||
| uint32_t buffer_limit, const FilterChainFactory& filter_chain_factory) | ||
| uint32_t buffer_limit) | ||
| : filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher), | ||
| connection_(connection), stream_id_(stream_id), account_(std::move(account)), | ||
| proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit), | ||
| filter_chain_factory_(filter_chain_factory) {} | ||
| proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit) {} | ||
|
|
||
| ~FilterManager() override { | ||
| ASSERT(state_.destroyed_); | ||
|
|
@@ -839,15 +833,56 @@ class FilterManager : public ScopeTrackedObject, | |
| * a local reply without the overhead of creating and traversing the filters. | ||
| */ | ||
| void skipFilterChainCreation() { | ||
| ASSERT(!state_.created_filter_chain_); | ||
| state_.created_filter_chain_ = true; | ||
| ASSERT(!state_.create_chain_result_.created()); | ||
| state_.create_chain_result_ = CreateChainResult(true); | ||
| } | ||
|
|
||
| virtual StreamInfo::StreamInfo& streamInfo() PURE; | ||
| virtual const StreamInfo::StreamInfo& streamInfo() const PURE; | ||
|
|
||
| // Set up the Encoder/Decoder filter chain. | ||
| bool createFilterChain(); | ||
| enum class UpgradeResult : uint8_t { UpgradeUnneeded, UpgradeAccepted, UpgradeRejected }; | ||
|
|
||
| /** | ||
| * Filter chain creation result. | ||
| */ | ||
| class CreateChainResult { | ||
| public: | ||
| CreateChainResult() = default; | ||
|
|
||
| /** | ||
| * @param created whether the filter chain was created. | ||
| * @param upgrade the upgrade result. | ||
| */ | ||
| CreateChainResult(bool created, UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded) | ||
| : created_(created), upgrade_(upgrade) {} | ||
|
|
||
| /** | ||
| * @return whether the filter chain was created. | ||
| */ | ||
| bool created() const { return created_; } | ||
| /** | ||
| * @return whether the upgrade was accepted. | ||
| */ | ||
| bool upgradeAccepted() const { return upgrade_ == UpgradeResult::UpgradeAccepted; } | ||
| /** | ||
| * @return whether the upgrade was rejected. | ||
| */ | ||
| bool upgradeRejected() const { return upgrade_ == UpgradeResult::UpgradeRejected; } | ||
|
|
||
| private: | ||
| bool created_ = false; | ||
| UpgradeResult upgrade_ = UpgradeResult::UpgradeUnneeded; | ||
| }; | ||
|
|
||
| /** | ||
| * Set up the Encoder/Decoder filter chain. | ||
| * @param filter_chain_factory the factory to create the filter chain. | ||
| * @param only_create_if_configured whether to only create the filter chain if it is configured | ||
| * explicitly. This only makes sense for upstream HTTP filter chain. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooc is it ever not true for upstream filter chain? I'm wondering if we can infer this as we do the upgrade allowed option if they're just the inverse of each other
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah...it will be false when we try to create filters from router and cluster at second time. As I said before, i think we should remove this by providing a default filter chain factory for upstream filter chain.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be worth doing that now? default filter chain is just codec filter right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it deserves. It not just make code simple also reduce memory overhead of cluster because we now need keep a default filter chain for every cluster. |
||
| * | ||
| */ | ||
| CreateChainResult createFilterChain(const FilterChainFactory& filter_chain_factory, | ||
| bool only_create_if_configured); | ||
|
|
||
| OptRef<const Network::Connection> connection() const { return connection_; } | ||
|
|
||
|
|
@@ -900,7 +935,6 @@ class FilterManager : public ScopeTrackedObject, | |
| // By default, we will assume there are no 1xx. If encode1xxHeaders | ||
| // is ever called, this is set to true so commonContinue resumes processing the 1xx. | ||
| bool has_1xx_headers_{}; | ||
| bool created_filter_chain_{}; | ||
| // These two are latched on initial header read, to determine if the original headers | ||
| // constituted a HEAD or gRPC request, respectively. | ||
| bool is_head_request_{}; | ||
|
|
@@ -923,6 +957,9 @@ class FilterManager : public ScopeTrackedObject, | |
| bool decoder_filters_streaming_{true}; | ||
| bool destroyed_{false}; | ||
|
|
||
| // Result of filter chain creation. | ||
| CreateChainResult create_chain_result_{}; | ||
|
|
||
| // Used to track which filter is the latest filter that has received data. | ||
| ActiveStreamEncoderFilter* latest_data_encoding_filter_{}; | ||
| ActiveStreamDecoderFilter* latest_data_decoding_filter_{}; | ||
|
|
@@ -985,6 +1022,9 @@ class FilterManager : public ScopeTrackedObject, | |
| // Indicates which filter to start the iteration with. | ||
| enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent }; | ||
|
|
||
| UpgradeResult createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory, | ||
| const FilterChainOptionsImpl& options); | ||
|
|
||
| // Returns the encoder filter to start iteration with. | ||
| std::list<ActiveStreamEncoderFilterPtr>::iterator | ||
| commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream, | ||
|
|
@@ -1072,7 +1112,6 @@ class FilterManager : public ScopeTrackedObject, | |
| std::make_shared<Network::Socket::Options>(); | ||
| absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstream_override_host_; | ||
|
|
||
| const FilterChainFactory& filter_chain_factory_; | ||
| // TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM, | ||
| // at which point they no longer need to be friends. | ||
| friend ActiveStreamFilterBase; | ||
|
|
@@ -1127,11 +1166,11 @@ class DownstreamFilterManager : public FilterManager { | |
| StreamInfo::FilterStateSharedPtr parent_filter_state, | ||
| Server::OverloadManager& overload_manager) | ||
| : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account, | ||
| proxy_100_continue, buffer_limit, filter_chain_factory), | ||
| proxy_100_continue, buffer_limit), | ||
| stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(), | ||
| StreamInfo::FilterState::LifeSpan::FilterChain, | ||
| std::move(parent_filter_state)), | ||
| local_reply_(local_reply), | ||
| local_reply_(local_reply), filter_chain_factory_(filter_chain_factory), | ||
| downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint( | ||
| Server::LoadShedPointName::get().HttpDownstreamFilterCheck)), | ||
| use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled( | ||
|
|
@@ -1155,6 +1194,8 @@ class DownstreamFilterManager : public FilterManager { | |
| stream_info_.setDownstreamRemoteAddress(downstream_remote_address); | ||
| } | ||
|
|
||
| CreateChainResult createDownstreamFilterChain(); | ||
|
|
||
| /** | ||
| * Called before local reply is made by the filter manager. | ||
| * @param data the data associated with the local reply. | ||
|
|
@@ -1231,6 +1272,7 @@ class DownstreamFilterManager : public FilterManager { | |
| private: | ||
| OverridableRemoteConnectionInfoSetterStreamInfo stream_info_; | ||
| const LocalReply::LocalReply& local_reply_; | ||
| const FilterChainFactory& filter_chain_factory_; | ||
| Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr}; | ||
| Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr}; | ||
| // Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.