Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import "gogoproto/gogo.proto";
// [#protodoc-title: HTTP connection manager]
// HTTP connection manager :ref:`configuration overview <config_http_conn_man>`.

// [#comment:next free field: 23]
// [#comment:next free field: 24]
message HttpConnectionManager {
enum CodecType {
option (gogoproto.goproto_enum_prefix) = false;
Expand Down Expand Up @@ -270,6 +270,29 @@ message HttpConnectionManager {
// <config_http_conn_man_runtime_represent_ipv4_remote_address_as_ipv4_mapped_ipv6>` for runtime
// control.
bool represent_ipv4_remote_address_as_ipv4_mapped_ipv6 = 20;

// [#not-implemented-hide:]
// The configuration for HTTP upgrades.
// For each upgrade type desired, an UpgradeConfig must be added.
//
// .. warning::
//
// The current implementation of upgrade headers does not handle
// multi-valued upgrade headers. Support for multi-valued headers may be
// added in future if needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/in/in the

message UpgradeConfig {
// The case-insensitive name of this upgrade, e.g. "websocket".
// For each upgrade type present in upgrade_configs, requests with
// Upgrade: [upgrade_type]
// will be proxied upstream.
string upgrade_type = 1;
// If present, this represents the filter chain which will be created for
// this type of upgrade. If no filters are present, the filter chain for
// HTTP connections will be used for this upgrade type.
repeated HttpFilter filters = 5;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more clear from a config perspective to make this a oneof which can either be a bool (use primary filter chain) or a repeated filter list with in size 1? Just throwing it out there as I think it might be a bit more clear from a config perspective.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, initially I thought that'd add clarity but on second thought I think it's sort of weird, because the one-of boolean could really only have one value of true. Also it's a bit more complicated since I believe oneof can't be repeated so we'd have to have a sub-message for "repeated filter chain". I'm up for doing the change if you still think it's worth it - let me know!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly about it if you think the current way is better.

};
// [#not-implemented-hide:]
repeated UpgradeConfig upgrade_configs = 23;
}

message Rds {
Expand Down
1 change: 1 addition & 0 deletions include/envoy/http/codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Http {
enum class Code {
// clang-format off
Continue = 100,
SwitchingProtocols = 101,

OK = 200,
Created = 201,
Expand Down
18 changes: 17 additions & 1 deletion include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,27 @@ class FilterChainFactory {
virtual ~FilterChainFactory() {}

/**
* Called when a new stream is created on the connection.
* Called when a new HTTP stream is created on the connection.
* @param callbacks supplies the "sink" that is used for actually creating the filter chain. @see
* FilterChainFactoryCallbacks.
*/
virtual void createFilterChain(FilterChainFactoryCallbacks& callbacks) PURE;

/**
* Called when a new upgrade stream is created on the connection.
* @param upgrade supplies the upgrade header from downstream
* @param callbacks supplies the "sink" that is used for actually creating the filter chain. @see
* FilterChainFactoryCallbacks.
*/
virtual bool createUpgradeFilterChain(absl::string_view upgrade,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be dense (obviously this is going to be wired up in a forthcoming PR) but what will the behavior be if no upgrade filter chain matches? Do we create the normal one? Would it be worth it to doc that here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if there's no upgrade filter chain, upgrades are not supported and the HCM will auto-reply with a failure.

Per #3599 we no longer need to create a filter chain for early local-only replies, so I opted to not create the filter chain in this case. We could create the default and return false if we think that's better

FilterChainFactoryCallbacks& callbacks) PURE;

/**
* Called to determine if this type of upgrade is supported.
* @param the upgrade type requested.
* @return true if upgrades of this type are allowed, false otherwise.
*/
virtual bool upgradeSupported(absl::string_view upgrade) const PURE;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the called from in prod code? Is the intention that a filter say at runtime that it doesn't support an upgrade? That scares me a little bit just from a config stability perspective, but mainly wondering how this is intended to work. I assume this is going to be used in a follow up? Maybe more docs here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently used in the HCM where we decide if we're accepting or rejecting upgrade headers. That said we call createFilterChain before we make that go no-go call, so I could latch the return value of createUpgradeFilterChain, return that from createFilterChain and use that for the reject instead of calling isUpgradeSupported

master...alyssawilk:websocket_delay#diff-d70ea2d9706e0246700148b2e2bb63ceR569 (doesn't have error handling yet)

};

} // namespace Http
Expand Down
5 changes: 4 additions & 1 deletion include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,11 @@ class RouteEntry : public ResponseEntry {

/**
* @return bool true if this route should use WebSockets.
* Per https://github.com/envoyproxy/envoy/issues/3301 this is the "old style"
* websocket" where headers are proxied upstream unchanged, and the websocket
* is handed off to a tcp proxy session.
*/
virtual bool useWebSocket() const PURE;
virtual bool useOldStyleWebSocket() const PURE;

/**
* Create an instance of a WebSocketProxy, using the configuration in this route.
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
}
const Router::VirtualHost& virtualHost() const override { return virtual_host_; }
bool autoHostRewrite() const override { return false; }
bool useWebSocket() const override { return false; }
bool useOldStyleWebSocket() const override { return false; }
Http::WebSocketProxyPtr createWebSocketProxy(Http::HeaderMap&, RequestInfo::RequestInfo&,
Http::WebSocketProxyCallbacks&,
Upstream::ClusterManager&,
Expand Down
1 change: 1 addition & 0 deletions source/common/http/codes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ const char* CodeUtility::toString(Code code) {
switch (code) {
// 1xx
case Code::Continue: return "Continue";
case Code::SwitchingProtocols: return "Switching Protocols";

// 2xx
case Code::OK: return "OK";
Expand Down
13 changes: 7 additions & 6 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ ConnectionManagerImpl::~ConnectionManagerImpl() {
if (codec_->protocol() == Protocol::Http2) {
stats_.named_.downstream_cx_http2_active_.dec();
} else {
if (isWebSocketConnection()) {
if (isOldStyleWebSocketConnection()) {
stats_.named_.downstream_cx_websocket_active_.dec();
} else {
stats_.named_.downstream_cx_http1_active_.dec();
Expand Down Expand Up @@ -204,7 +204,7 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool
// will still be processed as a normal HTTP/1.1 request, where Envoy will
// detect the WebSocket upgrade and establish a connection to the
// upstream.
if (isWebSocketConnection()) {
if (isOldStyleWebSocketConnection()) {
return ws_connection_->onData(data, end_stream);
}

Expand Down Expand Up @@ -254,7 +254,7 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool
}

if (!streams_.empty() && streams_.front()->state_.remote_complete_ &&
!isWebSocketConnection()) {
!isOldStyleWebSocketConnection()) {
read_callbacks_->connection().readDisable(true);
}
}
Expand Down Expand Up @@ -553,10 +553,11 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
// should return 404. The current returns no response if there is no router filter.
if (protocol == Protocol::Http11 && cached_route_.value()) {
const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
const bool websocket_allowed = (route_entry != nullptr) && route_entry->useWebSocket();
const bool old_style_websocket =
(route_entry != nullptr) && route_entry->useOldStyleWebSocket();
const bool websocket_requested = Utility::isWebSocketUpgradeRequest(*request_headers_);

if (websocket_requested && websocket_allowed) {
if (websocket_requested && old_style_websocket) {
ENVOY_STREAM_LOG(debug, "found websocket connection. (end_stream={}):", *this, end_stream);

connection_manager_.ws_connection_ = route_entry->createWebSocketProxy(
Expand Down Expand Up @@ -688,7 +689,7 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo

// If the initial websocket upgrade request had an HTTP body
// let's send this up
if (connection_manager_.isWebSocketConnection()) {
if (connection_manager_.isOldStyleWebSocketConnection()) {
if (data.length() > 0) {
connection_manager_.ws_connection_->onData(data, false);
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void onDrainTimeout();
void startDrainSequence();

bool isWebSocketConnection() const { return ws_connection_ != nullptr; }
bool isOldStyleWebSocketConnection() const { return ws_connection_ != nullptr; }

enum class DrainState { NotDraining, Draining, Closing };

Expand Down
4 changes: 2 additions & 2 deletions source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class RouteEntryImplBase : public RouteEntry,
}
const VirtualHost& virtualHost() const override { return vhost_; }
bool autoHostRewrite() const override { return auto_host_rewrite_; }
bool useWebSocket() const override { return websocket_config_ != nullptr; }
bool useOldStyleWebSocket() const override { return websocket_config_ != nullptr; }
Http::WebSocketProxyPtr
createWebSocketProxy(Http::HeaderMap& request_headers, RequestInfo::RequestInfo& request_info,
Http::WebSocketProxyCallbacks& callbacks,
Expand Down Expand Up @@ -410,7 +410,7 @@ class RouteEntryImplBase : public RouteEntry,

const VirtualHost& virtualHost() const override { return parent_->virtualHost(); }
bool autoHostRewrite() const override { return parent_->autoHostRewrite(); }
bool useWebSocket() const override { return parent_->useWebSocket(); }
bool useOldStyleWebSocket() const override { return parent_->useOldStyleWebSocket(); }
Http::WebSocketProxyPtr
createWebSocketProxy(Http::HeaderMap& request_headers, RequestInfo::RequestInfo& request_info,
Http::WebSocketProxyCallbacks& callbacks,
Expand Down
107 changes: 85 additions & 22 deletions source/extensions/filters/network/http_connection_manager/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace HttpConnectionManager {
namespace {

typedef std::list<Http::FilterFactoryCb> FilterFactoriesList;
typedef std::map<std::string, std::unique_ptr<FilterFactoriesList>> FilterFactoryMap;

FilterFactoryMap::const_iterator findUpgradeCaseInsensitive(const FilterFactoryMap& upgrade_map,
absl::string_view upgrade_type) {
for (auto it = upgrade_map.begin(); it != upgrade_map.end(); ++it) {
if (StringUtil::CaseInsensitiveCompare()(it->first, upgrade_type)) {
return it;
}
}
return upgrade_map.end();
}

} // namespace

// Singleton registration via macro defined in envoy/singleton/manager.h
SINGLETON_MANAGER_REGISTRATION(date_provider);
Expand Down Expand Up @@ -226,33 +242,57 @@ HttpConnectionManagerConfig::HttpConnectionManagerConfig(

const auto& filters = config.http_filters();
for (int32_t i = 0; i < filters.size(); i++) {
const ProtobufTypes::String& string_name = filters[i].name();
const auto& proto_config = filters[i];

ENVOY_LOG(debug, " filter #{}", i);
ENVOY_LOG(debug, " name: {}", string_name);

const Json::ObjectSharedPtr filter_config =
MessageUtil::getJsonObjectFromMessage(proto_config.config());
ENVOY_LOG(debug, " config: {}", filter_config->asJsonString());

// Now see if there is a factory that will accept the config.
auto& factory =
Config::Utility::getAndCheckFactory<Server::Configuration::NamedHttpFilterConfigFactory>(
string_name);
Http::FilterFactoryCb callback;
if (filter_config->getBoolean("deprecated_v1", false)) {
callback = factory.createFilterFactory(*filter_config->getObject("value", true),
stats_prefix_, context);
processFilter(filters[i], i, "http", filter_factories_);
}

for (auto upgrade_config : config.upgrade_configs()) {
const std::string& name = upgrade_config.upgrade_type();
if (findUpgradeCaseInsensitive(upgrade_filter_factories_, name) !=
upgrade_filter_factories_.end()) {
throw EnvoyException(
fmt::format("Error: multiple upgrade configs with the same name: '{}'", name));
}
if (upgrade_config.filters().size() > 0) {
std::unique_ptr<FilterFactoriesList> factories = std::make_unique<FilterFactoriesList>();
for (int32_t i = 0; i < upgrade_config.filters().size(); i++) {
processFilter(upgrade_config.filters(i), i, name, *factories);
}
upgrade_filter_factories_.emplace(std::make_pair(name, std::move(factories)));
} else {
ProtobufTypes::MessagePtr message =
Config::Utility::translateToFactoryConfig(proto_config, factory);
callback = factory.createFilterFactoryFromProto(*message, stats_prefix_, context);
std::unique_ptr<FilterFactoriesList> factories(nullptr);
upgrade_filter_factories_.emplace(std::make_pair(name, std::move(factories)));
}
filter_factories_.push_back(callback);
}
}

void HttpConnectionManagerConfig::processFilter(
const envoy::config::filter::network::http_connection_manager::v2::HttpFilter& proto_config,
int i, absl::string_view prefix, std::list<Http::FilterFactoryCb>& filter_factories) {
const ProtobufTypes::String& string_name = proto_config.name();

ENVOY_LOG(debug, " {} filter #{}", prefix, i);
ENVOY_LOG(debug, " name: {}", string_name);

const Json::ObjectSharedPtr filter_config =
MessageUtil::getJsonObjectFromMessage(proto_config.config());
ENVOY_LOG(debug, " config: {}", filter_config->asJsonString());

// Now see if there is a factory that will accept the config.
auto& factory =
Config::Utility::getAndCheckFactory<Server::Configuration::NamedHttpFilterConfigFactory>(
string_name);
Http::FilterFactoryCb callback;
if (filter_config->getBoolean("deprecated_v1", false)) {
callback = factory.createFilterFactory(*filter_config->getObject("value", true), stats_prefix_,
context_);
} else {
ProtobufTypes::MessagePtr message =
Config::Utility::translateToFactoryConfig(proto_config, factory);
callback = factory.createFilterFactoryFromProto(*message, stats_prefix_, context_);
}
filter_factories.push_back(callback);
}

Http::ServerConnectionPtr
HttpConnectionManagerConfig::createCodec(Network::Connection& connection,
const Buffer::Instance& data,
Expand Down Expand Up @@ -284,6 +324,29 @@ void HttpConnectionManagerConfig::createFilterChain(Http::FilterChainFactoryCall
}
}

bool HttpConnectionManagerConfig::createUpgradeFilterChain(
absl::string_view upgrade_type, Http::FilterChainFactoryCallbacks& callbacks) {
auto it = findUpgradeCaseInsensitive(upgrade_filter_factories_, upgrade_type);
if (it != upgrade_filter_factories_.end()) {
FilterFactoriesList* filters_to_use = nullptr;
if (it->second != nullptr) {
filters_to_use = it->second.get();
} else {
filters_to_use = &filter_factories_;
}
for (const Http::FilterFactoryCb& factory : *filters_to_use) {
factory(callbacks);
}
return true;
}
return false;
}

bool HttpConnectionManagerConfig::upgradeSupported(absl::string_view upgrade_type) const {
return findUpgradeCaseInsensitive(upgrade_filter_factories_, upgrade_type) !=
upgrade_filter_factories_.end();
}

const Network::Address::Instance& HttpConnectionManagerConfig::localAddress() {
return *context_.localInfo().address();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstdint>
#include <functional>
#include <list>
#include <map>
#include <string>

#include "envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.pb.validate.h"
Expand Down Expand Up @@ -74,6 +75,9 @@ class HttpConnectionManagerConfig : Logger::Loggable<Logger::Id::config>,

// Http::FilterChainFactory
void createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) override;
bool createUpgradeFilterChain(absl::string_view upgrade_type,
Http::FilterChainFactoryCallbacks& callbacks) override;
bool upgradeSupported(absl::string_view upgrade_type) const override;

// Http::ConnectionManagerConfig
const std::list<AccessLog::InstanceSharedPtr>& accessLogs() override { return access_logs_; }
Expand Down Expand Up @@ -107,10 +111,15 @@ class HttpConnectionManagerConfig : Logger::Loggable<Logger::Id::config>,
const Http::Http1Settings& http1Settings() const override { return http1_settings_; }

private:
typedef std::list<Http::FilterFactoryCb> FilterFactoriesList;
enum class CodecType { HTTP1, HTTP2, AUTO };
void processFilter(
const envoy::config::filter::network::http_connection_manager::v2::HttpFilter& proto_config,
int i, absl::string_view prefix, FilterFactoriesList& filter_factories);

Server::Configuration::FactoryContext& context_;
std::list<Http::FilterFactoryCb> filter_factories_;
FilterFactoriesList filter_factories_;
std::map<std::string, std::unique_ptr<FilterFactoriesList>> upgrade_filter_factories_;
std::list<AccessLog::InstanceSharedPtr> access_logs_;
const std::string stats_prefix_;
Http::ConnectionManagerStats stats_;
Expand Down
4 changes: 4 additions & 0 deletions source/server/http/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class AdminImpl : public Admin,

// Http::FilterChainFactory
void createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) override;
bool createUpgradeFilterChain(absl::string_view, Http::FilterChainFactoryCallbacks&) override {
return false;
}
bool upgradeSupported(absl::string_view) const override { return false; }

// Http::ConnectionManagerConfig
const std::list<AccessLog::InstanceSharedPtr>& accessLogs() override { return access_logs_; }
Expand Down
1 change: 1 addition & 0 deletions test/common/http/codes_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ TEST_F(CodeUtilityTest, Canary) {
TEST_F(CodeUtilityTest, All) {
const std::vector<std::pair<Code, std::string>> test_set = {
std::make_pair(Code::Continue, "Continue"),
std::make_pair(Code::SwitchingProtocols, "Switching Protocols"),
std::make_pair(Code::OK, "OK"),
std::make_pair(Code::Created, "Created"),
std::make_pair(Code::Accepted, "Accepted"),
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi
}

void configureRouteForWebsocket(Router::MockRouteEntry& route_entry) {
ON_CALL(route_entry, useWebSocket()).WillByDefault(Return(true));
ON_CALL(route_entry, useOldStyleWebSocket()).WillByDefault(Return(true));
ON_CALL(route_entry, createWebSocketProxy(_, _, _, _, _))
.WillByDefault(Invoke([this, &route_entry](Http::HeaderMap& request_headers,
RequestInfo::RequestInfo& request_info,
Expand Down
2 changes: 1 addition & 1 deletion test/common/router/config_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2673,7 +2673,7 @@ TEST(RouteMatcherTest, WeightedClusters) {
EXPECT_EQ(nullptr, route_entry->hashPolicy());
EXPECT_TRUE(route_entry->opaqueConfig().empty());
EXPECT_FALSE(route_entry->autoHostRewrite());
EXPECT_FALSE(route_entry->useWebSocket());
EXPECT_FALSE(route_entry->useOldStyleWebSocket());
EXPECT_TRUE(route_entry->includeVirtualHostRateLimits());
EXPECT_EQ(Http::Code::ServiceUnavailable, route_entry->clusterNotFoundResponseCode());
EXPECT_EQ(nullptr, route_entry->corsPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ envoy_extension_cc_test(
"//source/common/config:filter_json_lib",
"//source/common/event:dispatcher_lib",
"//source/extensions/filters/http/dynamo:config",
"//source/extensions/filters/http/router:config",
"//source/extensions/filters/network/http_connection_manager:config",
"//test/mocks/network:network_mocks",
"//test/mocks/server:server_mocks",
Expand Down
Loading