Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ void ConnectionManagerImpl::ActiveStream::completeRequest() {
*active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(),
filter_manager_.streamInfo(), *this);
}
if (state_.successful_upgrade_) {
if (state_.upgrade_accepted_) {
Comment thread
wbpcode marked this conversation as resolved.
Outdated
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec();
}
}
Expand Down Expand Up @@ -1387,8 +1387,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
}

filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
filter_manager_.createDownstreamFilterChain();

if (connection_manager_.config_->flushAccessLogOnNewRequest()) {
log(AccessLog::AccessLogType::DownstreamStart);
Expand All @@ -1398,7 +1397,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
// should return 404. The current returns no response if there is no router filter.
if (hasCachedRoute()) {
// Do not allow upgrades if the route does not support it.
if (upgrade_rejected) {
if (state_.upgrade_rejected_) {
Comment thread
wbpcode marked this conversation as resolved.
Outdated
// While downstream servers should not send upgrade payload without the upgrade being
// accepted, err on the side of caution and refuse to process any further requests on this
// connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
Expand Down
52 changes: 27 additions & 25 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
}
void onDecoderFilterBelowWriteBufferLowWatermark() override;
void onDecoderFilterAboveWriteBufferHighWatermark() override;
void upgradeFilterChainCreated() override {
connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
state_.successful_upgrade_ = true;
void upgradeFilterChainCreated(bool created) override {
Comment thread
wbpcode marked this conversation as resolved.
Outdated
if (created) {
connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
state_.upgrade_accepted_ = true;
} else {
state_.upgrade_rejected_ = true;
}
}
void disarmRequestTimeout() override;
void resetIdleTimer() override;
Expand Down Expand Up @@ -334,44 +338,42 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// All state for the stream. Put here for readability.
struct State {
State()
: codec_saw_local_complete_(false), codec_encode_complete_(false),
on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false),
is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false),
decorated_propagate_(true), deferred_to_next_io_iteration_(false),
deferred_end_stream_(false) {}

// It's possibly for the codec to see the completed response but not fully
// encode it.
bool codec_saw_local_complete_ : 1; // This indicates that local is complete as the completed
// response has made its way to the codec.
bool codec_encode_complete_ : 1; // This indicates that the codec has
// completed encoding the response.
bool on_reset_stream_called_ : 1; // Whether the stream has been reset.
bool is_zombie_stream_ : 1; // Whether stream is waiting for signal
// the underlying codec to be destroyed.
bool successful_upgrade_ : 1;
Comment on lines -337 to -365

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes mainly used to fix clang.tidy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think that removing the : 1 means that each of these fields will get 8x larger. What's the clang tidy error?

@wbpcode wbpcode Nov 28, 2024

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example:

Use default member initializer for 'codec_saw_local_complete_' (fix available)clang-tidy(modernize-use-default-member-init)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think we'd prefer to avoid making this struct larger if we don't need to. Can you do a sizeof(State) before and after this change to see what it's doing? I wonder if we could use both : 1 and = false to get the best of both worlds?

@wbpcode wbpcode Dec 2, 2024

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right, it make the state memory footprint from 2 bytes to 11 bytes.

Although I am not sure if that make sense or not for software like envoy. we never pursue extreme low memory overhead.

But I still revert all these changes. I think @phlax is working on clang-tidy related things. we can do all these at there or change our clang-tidy rule. :)

bool codec_saw_local_complete_{}; // This indicates that local is complete as the completed
Comment thread
wbpcode marked this conversation as resolved.
Outdated

// response has made its way to the codec.
bool codec_encode_complete_{}; // This indicates that the codec has
// completed encoding the response.
bool on_reset_stream_called_{}; // Whether the stream has been reset.
bool is_zombie_stream_{}; // Whether stream is waiting for signal
// the underlying codec to be destroyed.

// Whether the upgrade request has be accepted or rejected by the filter chain.
// Both are false if the request is not an upgrade request.
bool upgrade_accepted_{};
bool upgrade_rejected_{};
Comment thread
wbpcode marked this conversation as resolved.
Outdated

// True if this stream was the original externally created stream, but was
// destroyed as part of internal redirect.
bool is_internally_destroyed_ : 1;
bool is_internally_destroyed_{};
// True if this stream is internally created. Currently only used for
// internal redirects or other streams created via recreateStream().
bool is_internally_created_ : 1;
bool is_internally_created_{};

// True if the response headers indicate a successful upgrade or connect
// response.
bool is_tunneling_ : 1;
bool is_tunneling_{};

bool decorated_propagate_ : 1;
bool decorated_propagate_{true};

// Indicates that sending headers to the filter manager is deferred to the
// next I/O cycle. If data or trailers are received when this flag is set
// they are deferred too.
// TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
// structure, so it can be atomically created and cleared.
bool deferred_to_next_io_iteration_ : 1;
bool deferred_end_stream_ : 1;
bool deferred_to_next_io_iteration_{};
bool deferred_end_stream_{};
};

bool canDestroyStream() const {
Expand Down
76 changes: 49 additions & 27 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ void DownstreamFilterManager::prepareLocalReplyViaFilterChain(
// For early error handling, do a best-effort attempt to create a filter chain
// to ensure access logging. If the filter chain already exists this will be
// a no-op.
createFilterChain();
createDownstreamFilterChain();

if (prepared_local_reply_) {
return;
Expand Down Expand Up @@ -1077,6 +1077,10 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() {
Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_));
}

bool DownstreamFilterManager::createDownstreamFilterChain() {
return createFilterChain(filter_chain_factory_, false);
}

void DownstreamFilterManager::sendLocalReplyViaFilterChain(
bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
Expand All @@ -1086,7 +1090,7 @@ void DownstreamFilterManager::sendLocalReplyViaFilterChain(
// For early error handling, do a best-effort attempt to create a filter chain
// to ensure access logging. If the filter chain already exists this will be
// a no-op.
createFilterChain();
createDownstreamFilterChain();

Utility::sendLocalReply(
state_.destroyed_,
Expand Down Expand Up @@ -1623,11 +1627,8 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st
tracked_object_stack.add(filter_manager_callbacks_.scope());
}

bool FilterManager::createFilterChain() {
if (state_.created_filter_chain_) {
return false;
}
bool upgrade_rejected = false;
bool FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
const FilterChainOptionsImpl& options) {
const HeaderEntry* upgrade = nullptr;
if (filter_manager_callbacks_.requestHeaders()) {
upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade();
Expand All @@ -1638,28 +1639,49 @@ bool FilterManager::createFilterChain() {
}
}

if (upgrade == nullptr) {
// No upgrade header, no upgrade filter chain.
return false;
}

const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
if (filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(), upgrade_map,
*this, options)) {
filter_manager_callbacks_.upgradeFilterChainCreated(true);
return true;
} else {
filter_manager_callbacks_.upgradeFilterChainCreated(false);
// The upgrade filter chain is rejected. Fall through to the default filter chain.
// The default filter chain will be used to handle the upgrade failure local reply.
return false;
}
}

bool FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory,
bool only_create_if_configured) {
if (state_.created_filter_chain_) {
return true;
}

OptRef<DownstreamStreamFilterCallbacks> downstream_callbacks =
filter_manager_callbacks_.downstreamCallbacks();

// This filter chain options is only used for the downstream HTTP filter chains for now. So, try
// to set valid initial route only when the downstream callbacks is available.
FilterChainOptionsImpl options(
filter_manager_callbacks_.downstreamCallbacks().has_value() ? streamInfo().route() : nullptr);
FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr);

state_.created_filter_chain_ = true;
if (upgrade != nullptr) {
const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();

if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),
upgrade_map, *this, options)) {
filter_manager_callbacks_.upgradeFilterChainCreated();
if (downstream_callbacks.has_value()) {
// Only try the upgrade filter chain for downstream filter chains.
if (createUpgradeFilterChain(filter_chain_factory, options)) {
state_.created_filter_chain_ = true;
return true;
} else {
upgrade_rejected = true;
// Fall through to the default filter chain. The function calling this
// will send a local reply indicating that the upgrade failed.
}
}

filter_chain_factory_.createFilterChain(*this, false, options);
return !upgrade_rejected;
const bool created =
filter_chain_factory.createFilterChain(*this, only_create_if_configured, options);
state_.created_filter_chain_ = created;
return created;
}

void ActiveStreamDecoderFilter::requestDataDrained() {
Expand Down Expand Up @@ -1714,11 +1736,11 @@ bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers)

if (headers != nullptr) {
// The call to setResponseHeaders is needed to ensure that the headers are properly logged in
// access logs before the stream is destroyed. Since the function expects a ResponseHeaderPtr&&,
// ownership of the headers must be passed. This cannot happen earlier in the flow (such as in
// the call to setupRedirect) because at that point it is still possible for the headers to be
// used in a different logical branch. We work around this by creating a copy and passing
// ownership of the copy instead.
// access logs before the stream is destroyed. Since the function expects a
// ResponseHeaderPtr&&, ownership of the headers must be passed. This cannot happen earlier in
Comment thread
wbpcode marked this conversation as resolved.
// the flow (such as in the call to setupRedirect) because at that point it is still possible
// for the headers to be used in a different logical branch. We work around this by creating a
// copy and passing ownership of the copy instead.
ResponseHeaderMapPtr headers_copy = createHeaderMap<ResponseHeaderMapImpl>(*headers);
parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers_copy));
parent_.filter_manager_callbacks_.chargeStats(*headers);
Expand Down
31 changes: 22 additions & 9 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,10 @@ class FilterManagerCallbacks {

/**
* Called when the FilterManager creates an Upgrade filter chain.
* @param created whether the upgrade filter chain was created. If false, the upgrade
* is rejected.
*/
virtual void upgradeFilterChainCreated() PURE;
virtual void upgradeFilterChainCreated(bool created) PURE;

/**
* Called when request activity indicates that the request timeout should be disarmed.
Expand Down Expand Up @@ -639,11 +641,10 @@ class FilterManager : public ScopeTrackedObject,
FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher,
OptRef<const Network::Connection> connection, uint64_t stream_id,
Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue,
uint32_t buffer_limit, const FilterChainFactory& filter_chain_factory)
uint32_t buffer_limit)
: filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher),
connection_(connection), stream_id_(stream_id), account_(std::move(account)),
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit),
filter_chain_factory_(filter_chain_factory) {}
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit) {}

~FilterManager() override {
ASSERT(state_.destroyed_);
Expand Down Expand Up @@ -837,8 +838,15 @@ class FilterManager : public ScopeTrackedObject,
virtual StreamInfo::StreamInfo& streamInfo() PURE;
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;

// Set up the Encoder/Decoder filter chain.
bool createFilterChain();
/**
* Set up the Encoder/Decoder filter chain.
* @param filter_chain_factory the factory to create the filter chain.
* @param only_create_if_configured whether to only create the filter chain if it is configured
* explicitly. This only makes sense for upstream HTTP filter chain.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc is it ever not true for upstream filter chain? I'm wondering if we can infer this as we do the upgrade allowed option if they're just the inverse of each other

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc is it ever not true for upstream filter chain?

Yeah...it will be false when we try to create filters from router and cluster at second time. As I said before, i think we should remove this by providing a default filter chain factory for upstream filter chain.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth doing that now? default filter chain is just codec filter right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth doing that now? default filter chain is just codec filter right?

I think it deserves. It not just make code simple also reduce memory overhead of cluster because we now need keep a default filter chain for every cluster.

*
*/
bool createFilterChain(const FilterChainFactory& filter_chain_factory,
bool only_create_if_configured);

OptRef<const Network::Connection> connection() const { return connection_; }

Expand Down Expand Up @@ -966,6 +974,9 @@ class FilterManager : public ScopeTrackedObject,
// Indicates which filter to start the iteration with.
enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent };

bool createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
const FilterChainOptionsImpl& options);

// Returns the encoder filter to start iteration with.
std::list<ActiveStreamEncoderFilterPtr>::iterator
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
Expand Down Expand Up @@ -1053,7 +1064,6 @@ class FilterManager : public ScopeTrackedObject,
std::make_shared<Network::Socket::Options>();
absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstream_override_host_;

const FilterChainFactory& filter_chain_factory_;
// TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM,
// at which point they no longer need to be friends.
friend ActiveStreamFilterBase;
Expand Down Expand Up @@ -1108,11 +1118,11 @@ class DownstreamFilterManager : public FilterManager {
StreamInfo::FilterStateSharedPtr parent_filter_state,
Server::OverloadManager& overload_manager)
: FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
proxy_100_continue, buffer_limit, filter_chain_factory),
proxy_100_continue, buffer_limit),
stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(),
StreamInfo::FilterState::LifeSpan::FilterChain,
std::move(parent_filter_state)),
local_reply_(local_reply),
local_reply_(local_reply), filter_chain_factory_(filter_chain_factory),
downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)),
use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled(
Expand All @@ -1136,6 +1146,8 @@ class DownstreamFilterManager : public FilterManager {
stream_info_.setDownstreamRemoteAddress(downstream_remote_address);
}

bool createDownstreamFilterChain();

/**
* Called before local reply is made by the filter manager.
* @param data the data associated with the local reply.
Expand Down Expand Up @@ -1212,6 +1224,7 @@ class DownstreamFilterManager : public FilterManager {
private:
OverridableRemoteConnectionInfoSetterStreamInfo stream_info_;
const LocalReply::LocalReply& local_reply_;
const FilterChainFactory& filter_chain_factory_;
Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class FilterUtility {
/**
* Configuration for the router filter.
*/
class FilterConfig : Http::FilterChainFactory {
class FilterConfig : public Http::FilterChainFactory {
public:
FilterConfig(Server::Configuration::CommonFactoryContext& factory_context,
Stats::StatName stat_prefix, const LocalInfo::LocalInfo& local_info,
Expand Down
17 changes: 8 additions & 9 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class UpstreamFilterManager : public Http::FilterManager {
UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
bool proxy_100_continue, uint32_t buffer_limit,
const Http::FilterChainFactory& filter_chain_factory,
UpstreamRequest& request)
bool proxy_100_continue, uint32_t buffer_limit, UpstreamRequest& request)
: FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
proxy_100_continue, buffer_limit, filter_chain_factory),
proxy_100_continue, buffer_limit),
upstream_request_(request) {}

StreamInfo::StreamInfo& streamInfo() override {
Expand Down Expand Up @@ -142,18 +140,19 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
filter_manager_ = std::make_unique<UpstreamFilterManager>(
*filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(),
parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
parent_.callbacks()->decoderBufferLimit(), *this);
// Attempt to create custom cluster-specified filter chain
bool created = parent_.cluster()->createFilterChain(*filter_manager_,
/*only_create_if_configured=*/true);
bool created = filter_manager_->createFilterChain(*parent_.cluster(),
/*only_create_if_configured=*/true);

if (!created) {
// Attempt to create custom router-specified filter chain.
created = parent_.config().createFilterChain(*filter_manager_);
created = filter_manager_->createFilterChain(parent_.config(), false);
}
if (!created) {
// Neither cluster nor router have a custom filter chain; add the default
// cluster filter chain, which only consists of the codec filter.
created = parent_.cluster()->createFilterChain(*filter_manager_, false);
created = filter_manager_->createFilterChain(*parent_.cluster(), false);
}
// There will always be a codec filter present, which sets the upstream
// interface. Fast-fail any tests that don't set up mocks correctly.
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallback
void recreateStream(StreamInfo::FilterStateSharedPtr) override {
IS_ENVOY_BUG("recreateStream called from upstream HTTP filter");
}
void upgradeFilterChainCreated() override {
void upgradeFilterChainCreated(bool) override {
IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter");
}
OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; }
Expand Down
Loading