Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1387,8 +1387,13 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
}

filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
const FilterManager::CreateChainResult create_chain_result =
filter_manager_.createDownstreamFilterChain();
if (create_chain_result.upgradeAccepted()) {
connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
state_.successful_upgrade_ = true;
}

if (connection_manager_.config_->flushAccessLogOnNewRequest()) {
log(AccessLog::AccessLogType::DownstreamStart);
Expand All @@ -1398,7 +1403,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
// should return 404. The current returns no response if there is no router filter.
if (hasCachedRoute()) {
// Do not allow upgrades if the route does not support it.
if (upgrade_rejected) {
if (create_chain_result.upgradeRejected()) {
// While downstream servers should not send upgrade payload without the upgrade being
// accepted, err on the side of caution and refuse to process any further requests on this
// connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
Expand Down
44 changes: 18 additions & 26 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,6 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
}
void onDecoderFilterBelowWriteBufferLowWatermark() override;
void onDecoderFilterAboveWriteBufferHighWatermark() override;
void upgradeFilterChainCreated() override {
connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
state_.successful_upgrade_ = true;
}
void disarmRequestTimeout() override;
void resetIdleTimer() override;
void recreateStream(StreamInfo::FilterStateSharedPtr filter_state) override;
Expand Down Expand Up @@ -334,44 +329,41 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// All state for the stream. Put here for readability.
struct State {
State()
: codec_saw_local_complete_(false), codec_encode_complete_(false),
on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false),
is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false),
decorated_propagate_(true), deferred_to_next_io_iteration_(false),
deferred_end_stream_(false) {}

// It's possibly for the codec to see the completed response but not fully
// encode it.
bool codec_saw_local_complete_ : 1; // This indicates that local is complete as the completed
// response has made its way to the codec.
bool codec_encode_complete_ : 1; // This indicates that the codec has
// completed encoding the response.
bool on_reset_stream_called_ : 1; // Whether the stream has been reset.
bool is_zombie_stream_ : 1; // Whether stream is waiting for signal
// the underlying codec to be destroyed.
bool successful_upgrade_ : 1;
Comment on lines -337 to -365

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes mainly used to fix clang.tidy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think that removing the : 1 means that each of these fields will get 8x larger. What's the clang tidy error?

@wbpcode wbpcode Nov 28, 2024

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example:

Use default member initializer for 'codec_saw_local_complete_' (fix available)clang-tidy(modernize-use-default-member-init)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think we'd prefer to avoid making this struct larger if we don't need to. Can you do a sizeof(State) before and after this change to see what it's doing? I wonder if we could use both : 1 and = false to get the best of both worlds?

@wbpcode wbpcode Dec 2, 2024

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right, it make the state memory footprint from 2 bytes to 11 bytes.

Although I am not sure if that make sense or not for software like envoy. we never pursue extreme low memory overhead.

But I still revert all these changes. I think @phlax is working on clang-tidy related things. we can do all these at there or change our clang-tidy rule. :)

bool codec_saw_local_complete_{}; // This indicates that local is complete as the completed
Comment thread
wbpcode marked this conversation as resolved.
Outdated

// response has made its way to the codec.
bool codec_encode_complete_{}; // This indicates that the codec has
// completed encoding the response.
bool on_reset_stream_called_{}; // Whether the stream has been reset.
bool is_zombie_stream_{}; // Whether stream is waiting for signal
// the underlying codec to be destroyed.

// Whether the upgrade request has be accepted by the filter chain.
// False if the request is not an upgrade request or upgrade is rejected by the filter chain.
bool successful_upgrade_{};

// True if this stream was the original externally created stream, but was
// destroyed as part of internal redirect.
bool is_internally_destroyed_ : 1;
bool is_internally_destroyed_{};
// True if this stream is internally created. Currently only used for
// internal redirects or other streams created via recreateStream().
bool is_internally_created_ : 1;
bool is_internally_created_{};

// True if the response headers indicate a successful upgrade or connect
// response.
bool is_tunneling_ : 1;
bool is_tunneling_{};

bool decorated_propagate_ : 1;
bool decorated_propagate_{true};

// Indicates that sending headers to the filter manager is deferred to the
// next I/O cycle. If data or trailers are received when this flag is set
// they are deferred too.
// TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
// structure, so it can be atomically created and cleared.
bool deferred_to_next_io_iteration_ : 1;
bool deferred_end_stream_ : 1;
bool deferred_to_next_io_iteration_{};
bool deferred_end_stream_{};
};

bool canDestroyStream() const {
Expand Down
75 changes: 48 additions & 27 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ void DownstreamFilterManager::prepareLocalReplyViaFilterChain(
// For early error handling, do a best-effort attempt to create a filter chain
// to ensure access logging. If the filter chain already exists this will be
// a no-op.
createFilterChain();
createDownstreamFilterChain();

if (prepared_local_reply_) {
return;
Expand Down Expand Up @@ -1077,6 +1077,10 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() {
Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_));
}

FilterManager::CreateChainResult DownstreamFilterManager::createDownstreamFilterChain() {
return createFilterChain(filter_chain_factory_, false);
}

void DownstreamFilterManager::sendLocalReplyViaFilterChain(
bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
Expand All @@ -1086,7 +1090,7 @@ void DownstreamFilterManager::sendLocalReplyViaFilterChain(
// For early error handling, do a best-effort attempt to create a filter chain
// to ensure access logging. If the filter chain already exists this will be
// a no-op.
createFilterChain();
createDownstreamFilterChain();

Utility::sendLocalReply(
state_.destroyed_,
Expand Down Expand Up @@ -1623,11 +1627,9 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st
tracked_object_stack.add(filter_manager_callbacks_.scope());
}

bool FilterManager::createFilterChain() {
if (state_.created_filter_chain_) {
return false;
}
bool upgrade_rejected = false;
absl::optional<bool>
FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
const FilterChainOptionsImpl& options) {
const HeaderEntry* upgrade = nullptr;
if (filter_manager_callbacks_.requestHeaders()) {
upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade();
Expand All @@ -1638,28 +1640,47 @@ bool FilterManager::createFilterChain() {
}
}

if (upgrade == nullptr) {
// No upgrade header, no upgrade filter chain.
return absl::nullopt;
}

const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
return filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(),
upgrade_map, *this, options);
}

FilterManager::CreateChainResult
FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory,
bool only_create_if_configured) {
if (state_.create_chain_result_.created()) {
return state_.create_chain_result_;
}

OptRef<DownstreamStreamFilterCallbacks> downstream_callbacks =
filter_manager_callbacks_.downstreamCallbacks();

// This filter chain options is only used for the downstream HTTP filter chains for now. So, try
// to set valid initial route only when the downstream callbacks is available.
FilterChainOptionsImpl options(
filter_manager_callbacks_.downstreamCallbacks().has_value() ? streamInfo().route() : nullptr);
FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr);

state_.created_filter_chain_ = true;
if (upgrade != nullptr) {
const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
absl::optional<bool> upgrade = absl::nullopt;

if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),
upgrade_map, *this, options)) {
filter_manager_callbacks_.upgradeFilterChainCreated();
return true;
} else {
upgrade_rejected = true;
// Fall through to the default filter chain. The function calling this
// will send a local reply indicating that the upgrade failed.
// Only try the upgrade filter chain for downstream filter chains.
if (downstream_callbacks.has_value()) {
upgrade = createUpgradeFilterChain(filter_chain_factory, options);
if (upgrade.value_or(false)) {
// Upgrade filter chain is created. Return the result directly.
state_.create_chain_result_ = CreateChainResult(true, true);
return state_.create_chain_result_;
}
// If the upgrade is unnecessary or the upgrade filter chain is rejected, fall through to
// create the default filter chain.
}

filter_chain_factory_.createFilterChain(*this, false, options);
return !upgrade_rejected;
state_.create_chain_result_ = CreateChainResult(
filter_chain_factory.createFilterChain(*this, only_create_if_configured, options), upgrade);
return state_.create_chain_result_;
}

void ActiveStreamDecoderFilter::requestDataDrained() {
Expand Down Expand Up @@ -1714,11 +1735,11 @@ bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers)

if (headers != nullptr) {
// The call to setResponseHeaders is needed to ensure that the headers are properly logged in
// access logs before the stream is destroyed. Since the function expects a ResponseHeaderPtr&&,
// ownership of the headers must be passed. This cannot happen earlier in the flow (such as in
// the call to setupRedirect) because at that point it is still possible for the headers to be
// used in a different logical branch. We work around this by creating a copy and passing
// ownership of the copy instead.
// access logs before the stream is destroyed. Since the function expects a
// ResponseHeaderPtr&&, ownership of the headers must be passed. This cannot happen earlier in
Comment thread
wbpcode marked this conversation as resolved.
// the flow (such as in the call to setupRedirect) because at that point it is still possible
// for the headers to be used in a different logical branch. We work around this by creating a
// copy and passing ownership of the copy instead.
ResponseHeaderMapPtr headers_copy = createHeaderMap<ResponseHeaderMapImpl>(*headers);
parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers_copy));
parent_.filter_manager_callbacks_.chargeStats(*headers);
Expand Down
73 changes: 57 additions & 16 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,6 @@ class FilterManagerCallbacks {
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when the FilterManager creates an Upgrade filter chain.
*/
virtual void upgradeFilterChainCreated() PURE;

/**
* Called when request activity indicates that the request timeout should be disarmed.
*/
Expand Down Expand Up @@ -639,11 +634,10 @@ class FilterManager : public ScopeTrackedObject,
FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher,
OptRef<const Network::Connection> connection, uint64_t stream_id,
Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue,
uint32_t buffer_limit, const FilterChainFactory& filter_chain_factory)
uint32_t buffer_limit)
: filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher),
connection_(connection), stream_id_(stream_id), account_(std::move(account)),
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit),
filter_chain_factory_(filter_chain_factory) {}
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit) {}

~FilterManager() override {
ASSERT(state_.destroyed_);
Expand Down Expand Up @@ -830,15 +824,55 @@ class FilterManager : public ScopeTrackedObject,
* a local reply without the overhead of creating and traversing the filters.
*/
void skipFilterChainCreation() {
ASSERT(!state_.created_filter_chain_);
state_.created_filter_chain_ = true;
ASSERT(!state_.create_chain_result_.created());
state_.create_chain_result_ = CreateChainResult(true, absl::nullopt);
}

virtual StreamInfo::StreamInfo& streamInfo() PURE;
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;

// Set up the Encoder/Decoder filter chain.
bool createFilterChain();
/**
* Filter chain creation result.
*/
class CreateChainResult {
public:
CreateChainResult() = default;

/**
* @param created whether the filter chain was created.
* @param upgrade whether the upgrade was accepted or rejected. absl::nullopt if no upgrade
* was requested. True if the upgrade was accepted, false if it was rejected.
*/
CreateChainResult(bool created, absl::optional<bool> upgrade)
: created_(created), upgrade_(upgrade) {}

/**
* @return whether the filter chain was created.
*/
bool created() const { return created_; }
/**
* @return whether the upgrade was accepted.
*/
bool upgradeAccepted() const { return upgrade_.has_value() ? upgrade_.value() : false; }
/**
* @return whether the upgrade was rejected.
*/
bool upgradeRejected() const { return upgrade_.has_value() ? !upgrade_.value() : false; }

private:
bool created_{};
absl::optional<bool> upgrade_{};
};

/**
* Set up the Encoder/Decoder filter chain.
* @param filter_chain_factory the factory to create the filter chain.
* @param only_create_if_configured whether to only create the filter chain if it is configured
* explicitly. This only makes sense for upstream HTTP filter chain.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc is it ever not true for upstream filter chain? I'm wondering if we can infer this as we do the upgrade allowed option if they're just the inverse of each other

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc is it ever not true for upstream filter chain?

Yeah...it will be false when we try to create filters from router and cluster at second time. As I said before, i think we should remove this by providing a default filter chain factory for upstream filter chain.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth doing that now? default filter chain is just codec filter right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth doing that now? default filter chain is just codec filter right?

I think it deserves. It not just make code simple also reduce memory overhead of cluster because we now need keep a default filter chain for every cluster.

*
*/
CreateChainResult createFilterChain(const FilterChainFactory& filter_chain_factory,
bool only_create_if_configured);

OptRef<const Network::Connection> connection() const { return connection_; }

Expand Down Expand Up @@ -881,7 +915,6 @@ class FilterManager : public ScopeTrackedObject,
// By default, we will assume there are no 1xx. If encode1xxHeaders
// is ever called, this is set to true so commonContinue resumes processing the 1xx.
bool has_1xx_headers_{};
bool created_filter_chain_{};
// These two are latched on initial header read, to determine if the original headers
// constituted a HEAD or gRPC request, respectively.
bool is_head_request_{};
Expand All @@ -904,6 +937,9 @@ class FilterManager : public ScopeTrackedObject,
bool decoder_filters_streaming_{true};
bool destroyed_{false};

// Result of filter chain creation.
CreateChainResult create_chain_result_{};

// Used to track which filter is the latest filter that has received data.
ActiveStreamEncoderFilter* latest_data_encoding_filter_{};
ActiveStreamDecoderFilter* latest_data_decoding_filter_{};
Expand Down Expand Up @@ -966,6 +1002,9 @@ class FilterManager : public ScopeTrackedObject,
// Indicates which filter to start the iteration with.
enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent };

absl::optional<bool> createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
Comment thread
wbpcode marked this conversation as resolved.
Outdated
const FilterChainOptionsImpl& options);

// Returns the encoder filter to start iteration with.
std::list<ActiveStreamEncoderFilterPtr>::iterator
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
Expand Down Expand Up @@ -1053,7 +1092,6 @@ class FilterManager : public ScopeTrackedObject,
std::make_shared<Network::Socket::Options>();
absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstream_override_host_;

const FilterChainFactory& filter_chain_factory_;
// TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM,
// at which point they no longer need to be friends.
friend ActiveStreamFilterBase;
Expand Down Expand Up @@ -1108,11 +1146,11 @@ class DownstreamFilterManager : public FilterManager {
StreamInfo::FilterStateSharedPtr parent_filter_state,
Server::OverloadManager& overload_manager)
: FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
proxy_100_continue, buffer_limit, filter_chain_factory),
proxy_100_continue, buffer_limit),
stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(),
StreamInfo::FilterState::LifeSpan::FilterChain,
std::move(parent_filter_state)),
local_reply_(local_reply),
local_reply_(local_reply), filter_chain_factory_(filter_chain_factory),
downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)),
use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled(
Expand All @@ -1136,6 +1174,8 @@ class DownstreamFilterManager : public FilterManager {
stream_info_.setDownstreamRemoteAddress(downstream_remote_address);
}

CreateChainResult createDownstreamFilterChain();

/**
* Called before local reply is made by the filter manager.
* @param data the data associated with the local reply.
Expand Down Expand Up @@ -1212,6 +1252,7 @@ class DownstreamFilterManager : public FilterManager {
private:
OverridableRemoteConnectionInfoSetterStreamInfo stream_info_;
const LocalReply::LocalReply& local_reply_;
const FilterChainFactory& filter_chain_factory_;
Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class FilterUtility {
/**
* Configuration for the router filter.
*/
class FilterConfig : Http::FilterChainFactory {
class FilterConfig : public Http::FilterChainFactory {
public:
FilterConfig(Server::Configuration::CommonFactoryContext& factory_context,
Stats::StatName stat_prefix, const LocalInfo::LocalInfo& local_info,
Expand Down
Loading