diff --git a/docs/root/intro/arch_overview/http/http.rst b/docs/root/intro/arch_overview/http/http.rst index f5729560e0f6c..33b7ebffa6fa2 100644 --- a/docs/root/intro/arch_overview/http/http.rst +++ b/docs/root/intro/arch_overview/http/http.rst @@ -7,5 +7,5 @@ HTTP http_connection_management http_filters http_routing - websocket + upgrades http_proxy diff --git a/docs/root/intro/arch_overview/http/websocket.rst b/docs/root/intro/arch_overview/http/upgrades.rst similarity index 63% rename from docs/root/intro/arch_overview/http/websocket.rst rename to docs/root/intro/arch_overview/http/upgrades.rst index fa4e0b1f055d9..6f3b88728273f 100644 --- a/docs/root/intro/arch_overview/http/websocket.rst +++ b/docs/root/intro/arch_overview/http/upgrades.rst @@ -1,19 +1,19 @@ .. _arch_overview_websocket: -WebSocket and HTTP upgrades +HTTP upgrades =========================== -Envoy Upgrade support is intended mainly for WebSocket but may be used for non-WebSocket -upgrades as well. Upgrades pass both the HTTP headers and the upgrade payload +Envoy Upgrade support is intended mainly for WebSocket and CONNECT support, but may be used for +arbitrary upgrades as well. Upgrades pass both the HTTP headers and the upgrade payload through an HTTP filter chain. One may configure the :ref:`upgrade_configs ` with or without custom filter chains. If only the :ref:`upgrade_type ` -is specified, both the upgrade headers, any request and response body, and WebSocket payload will +is specified, both the upgrade headers, any request and response body, and HTTP data payload will pass through the default HTTP filter chain. To avoid the use of HTTP-only filters for upgrade payload, one can set up custom :ref:`filters ` -for the given upgrade type, up to and including only using the router filter to send the WebSocket +for the given upgrade type, up to and including only using the router filter to send the HTTP data upstream. Upgrades can be enabled or disabled on a :ref:`per-route ` basis. @@ -32,12 +32,12 @@ laid out below, but custom filter chains can only be configured on a per-HttpCon | F | F | F | +-----------------------+-------------------------+-------------------+ -Note that the statistics for upgrades are all bundled together so WebSocket +Note that the statistics for upgrades are all bundled together so WebSocket and other upgrades :ref:`statistics ` are tracked by stats such as downstream_cx_upgrades_total and downstream_cx_upgrades_active -Handling HTTP/2 hops -^^^^^^^^^^^^^^^^^^^^ +Websocket over HTTP/2 hops +^^^^^^^^^^^^^^^^^^^^^^^^^^ While HTTP/2 support for WebSockets is off by default, Envoy does support tunneling WebSockets over HTTP/2 streams for deployments that prefer a uniform HTTP/2 mesh throughout; this enables, for example, @@ -61,3 +61,31 @@ a GET method on the final Envoy-Upstream hop. Note that the HTTP/2 upgrade path has very strict HTTP/1.1 compliance, so will not proxy WebSocket upgrade requests or responses with bodies. + +.. TODO(alyssawilk) unhide this when unhiding config +.. CONNECT support +.. ^^^^^^^^^^^^^^^ + +.. Envoy CONNECT support is off by default (Envoy will send an internally generated 403 in response to +.. CONNECT requests). CONNECT support can be enabled via the upgrade options described above, setting +.. the upgrade value to the special keyword "CONNECT". + +.. While for HTTP/2, CONNECT request may have a path, in general and for HTTP/1.1 CONNECT requests do +.. not have a path, and can only be matched using a +.. :ref:`connect_matcher ` +.. +.. Envoy can handle CONNECT in one of two ways, either proxying the CONNECT headers through as if they +.. were any other request, and letting the upstream terminate the CONNECT request, or by terminating the +.. CONNECT request, and forwarding the payload as raw TCP data. When CONNECT upgrade configuration is +.. set up, the default behavior is to proxy the CONNECT request, treating it like any other request using +.. the upgrade path. +.. If termination is desired, this can be accomplished by setting +.. :ref:`connect_config ` +.. If it that message is present for CONNECT requests, the router filter will strip the request headers, +.. and forward the HTTP payload upstream. On receipt of initial TCP data from upstream, the router +.. will synthesize 200 response headers, and then forward the TCP data as the HTTP response body. + +.. .. warning:: +.. This mode of CONNECT support can create major security holes if configured correctly, as the upstream +.. will be forwarded *unsanitized* headers if they are in the body payload. Please use with caution + diff --git a/source/common/router/router.cc b/source/common/router/router.cc index e507e06a4a5c0..140e44f5c0a7a 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -115,6 +115,16 @@ bool convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream constexpr uint64_t TimeoutPrecisionFactor = 100; +Http::ConnectionPool::Instance* +httpPool(absl::variant pool) { + return absl::get(pool); +} + +Tcp::ConnectionPool::Instance* +tcpPool(absl::variant pool) { + return absl::get(pool); +} + const absl::string_view getPath(const Http::RequestHeaderMap& headers) { return headers.Path() ? headers.Path()->value().getStringView() : ""; } @@ -549,12 +559,10 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, } } - Http::ConnectionPool::Instance* http_pool = getHttpConnPool(); Upstream::HostDescriptionConstSharedPtr host; + Filter::HttpOrTcpPool conn_pool = createConnPool(host); - if (http_pool) { - host = http_pool->host(); - } else { + if (!host) { sendNoHealthyUpstreamResponse(); return Http::FilterHeadersStatus::StopIteration; } @@ -644,8 +652,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, // Hang onto the modify_headers function for later use in handling upstream responses. modify_headers_ = modify_headers; - UpstreamRequestPtr upstream_request = - std::make_unique(*this, std::make_unique(*http_pool)); + UpstreamRequestPtr upstream_request = createUpstreamRequest(conn_pool); upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); upstream_requests_.front()->encodeHeaders(end_stream); if (end_stream) { @@ -655,6 +662,38 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, return Http::FilterHeadersStatus::StopIteration; } +Filter::HttpOrTcpPool Filter::createConnPool(Upstream::HostDescriptionConstSharedPtr& host) { + Filter::HttpOrTcpPool conn_pool; + bool should_tcp_proxy = route_entry_->connectConfig().has_value() && + downstream_headers_->Method()->value().getStringView() == + Http::Headers::get().MethodValues.Connect; + + if (!should_tcp_proxy) { + conn_pool = getHttpConnPool(); + if (httpPool(conn_pool)) { + host = httpPool(conn_pool)->host(); + } + } else { + transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState( + *callbacks_->streamInfo().filterState()); + conn_pool = config_.cm_.tcpConnPoolForCluster(route_entry_->clusterName(), + Upstream::ResourcePriority::Default, this); + if (tcpPool(conn_pool)) { + host = tcpPool(conn_pool)->host(); + } + } + return conn_pool; +} + +UpstreamRequestPtr Filter::createUpstreamRequest(Filter::HttpOrTcpPool conn_pool) { + if (absl::holds_alternative(conn_pool)) { + return std::make_unique(*this, + std::make_unique(*httpPool(conn_pool))); + } + return std::make_unique(*this, + std::make_unique(tcpPool(conn_pool))); +} + Http::ConnectionPool::Instance* Filter::getHttpConnPool() { // Choose protocol based on cluster configuration and downstream connection // Note: Cluster may downgrade HTTP2 to HTTP1 based on runtime configuration. @@ -1454,19 +1493,15 @@ void Filter::doRetry() { attempt_count_++; ASSERT(pending_retries_ > 0); pending_retries_--; - UpstreamRequestPtr upstream_request; - - Http::ConnectionPool::Instance* conn_pool = getHttpConnPool(); - if (conn_pool) { - upstream_request = - std::make_unique(*this, std::make_unique(*conn_pool)); - } - if (!upstream_request) { + Upstream::HostDescriptionConstSharedPtr host; + Filter::HttpOrTcpPool conn_pool = createConnPool(host); + if (!host) { sendNoHealthyUpstreamResponse(); cleanup(); return; } + UpstreamRequestPtr upstream_request = createUpstreamRequest(conn_pool); if (include_attempt_count_in_request_) { downstream_headers_->setEnvoyAttemptCount(attempt_count_); diff --git a/source/common/router/router.h b/source/common/router/router.h index 9f532d08fb5f6..c74a333233773 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -467,6 +467,12 @@ class Filter : Logger::Loggable, const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) PURE; + + using HttpOrTcpPool = + absl::variant; + HttpOrTcpPool createConnPool(Upstream::HostDescriptionConstSharedPtr& host); + UpstreamRequestPtr createUpstreamRequest(Filter::HttpOrTcpPool conn_pool); + Http::ConnectionPool::Instance* getHttpConnPool(); void maybeDoShadowing(); bool maybeRetryReset(Http::StreamResetReason reset_reason, UpstreamRequest& upstream_request); diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 085e6c4a71429..ed6a0c7e6905f 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -490,6 +490,16 @@ void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { } } +void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + Network::Connection& latched_conn = conn_data->connection(); + auto upstream = + std::make_unique(callbacks_->upstreamRequest(), std::move(conn_data)); + callbacks_->onPoolReady(std::move(upstream), host, latched_conn.localAddress(), + latched_conn.streamInfo()); +} + bool HttpConnPool::cancelAnyPendingRequest() { if (conn_pool_stream_handle_) { conn_pool_stream_handle_->cancel(); @@ -516,5 +526,68 @@ void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, request_encoder.getStream().connectionLocalAddress(), info); } +TcpUpstream::TcpUpstream(UpstreamRequest* upstream_request, + Tcp::ConnectionPool::ConnectionDataPtr&& upstream) + : upstream_request_(upstream_request), upstream_conn_data_(std::move(upstream)) { + upstream_conn_data_->connection().enableHalfClose(true); + upstream_conn_data_->addUpstreamCallbacks(*this); +} + +void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) { + upstream_conn_data_->connection().write(data, end_stream); +} + +void TcpUpstream::encodeHeaders(const Http::RequestHeaderMap&, bool end_stream) { + if (end_stream) { + Buffer::OwnedImpl data; + upstream_conn_data_->connection().write(data, true); + } +} + +void TcpUpstream::encodeTrailers(const Http::RequestTrailerMap&) { + Buffer::OwnedImpl data; + upstream_conn_data_->connection().write(data, true); +} + +void TcpUpstream::readDisable(bool disable) { + if (upstream_conn_data_->connection().state() != Network::Connection::State::Open) { + return; + } + upstream_conn_data_->connection().readDisable(disable); +} + +void TcpUpstream::resetStream() { + upstream_request_ = nullptr; + upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); +} + +void TcpUpstream::onUpstreamData(Buffer::Instance& data, bool end_stream) { + if (!sent_headers_) { + Http::ResponseHeaderMapPtr headers{ + Http::createHeaderMap({{Http::Headers::get().Status, "200"}})}; + upstream_request_->decodeHeaders(std::move(headers), false); + sent_headers_ = true; + } + upstream_request_->decodeData(data, end_stream); +} + +void TcpUpstream::onEvent(Network::ConnectionEvent event) { + if (event != Network::ConnectionEvent::Connected && upstream_request_) { + upstream_request_->onResetStream(Http::StreamResetReason::ConnectionTermination, ""); + } +} + +void TcpUpstream::onAboveWriteBufferHighWatermark() { + if (upstream_request_) { + upstream_request_->disableDataFromDownstreamForFlowControl(); + } +} + +void TcpUpstream::onBelowWriteBufferLowWatermark() { + if (upstream_request_) { + upstream_request_->enableDataFromDownstreamForFlowControl(); + } +} + } // namespace Router } // namespace Envoy diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index a10f42163abf9..660d6e17b8b16 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -202,6 +202,41 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba GenericConnectionPoolCallbacks* callbacks_{}; }; +class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks { +public: + TcpConnPool(Tcp::ConnectionPool::Instance* conn_pool) : conn_pool_(conn_pool) {} + + void newStream(GenericConnectionPoolCallbacks* callbacks) override { + callbacks_ = callbacks; + upstream_handle_ = conn_pool_->newConnection(*this); + } + + bool cancelAnyPendingRequest() override { + if (upstream_handle_) { + upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + upstream_handle_ = nullptr; + return true; + } + return false; + } + absl::optional protocol() const override { return absl::nullopt; } + + // Tcp::ConnectionPool::Callbacks + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) override { + upstream_handle_ = nullptr; + callbacks_->onPoolFailure(reason, "", host); + } + + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) override; + +private: + Tcp::ConnectionPool::Instance* conn_pool_; + Tcp::ConnectionPool::Cancellable* upstream_handle_{}; + GenericConnectionPoolCallbacks* callbacks_{}; +}; + // A generic API which covers common functionality between HTTP and TCP upstreams. class GenericUpstream { public: @@ -261,5 +296,29 @@ class HttpUpstream : public GenericUpstream, public Http::StreamCallbacks { Http::RequestEncoder* request_encoder_{}; }; +class TcpUpstream : public GenericUpstream, public Tcp::ConnectionPool::UpstreamCallbacks { +public: + TcpUpstream(UpstreamRequest* upstream_request, Tcp::ConnectionPool::ConnectionDataPtr&& upstream); + + // GenericUpstream + void encodeData(Buffer::Instance& data, bool end_stream) override; + void encodeMetadata(const Http::MetadataMapVector&) override {} + void encodeHeaders(const Http::RequestHeaderMap&, bool end_stream) override; + void encodeTrailers(const Http::RequestTrailerMap&) override; + void readDisable(bool disable) override; + void resetStream() override; + + // Tcp::ConnectionPool::UpstreamCallbacks + void onUpstreamData(Buffer::Instance& data, bool end_stream) override; + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override; + void onBelowWriteBufferLowWatermark() override; + +private: + UpstreamRequest* upstream_request_; + Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; + bool sent_headers_{}; +}; + } // namespace Router } // namespace Envoy diff --git a/test/common/router/BUILD b/test/common/router/BUILD index c3cab9845d2c0..ab0ab8c7ecba4 100644 --- a/test/common/router/BUILD +++ b/test/common/router/BUILD @@ -319,6 +319,26 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "upstream_request_test", + srcs = ["upstream_request_test.cc"], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/router:router_lib", + "//source/common/upstream:upstream_includes", + "//source/common/upstream:upstream_lib", + "//test/common/http:common_lib", + "//test/mocks/http:http_mocks", + "//test/mocks/network:network_mocks", + "//test/mocks/router:router_mocks", + "//test/mocks/server:server_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/test_common:environment_lib", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "header_formatter_test", srcs = ["header_formatter_test.cc"], diff --git a/test/common/router/upstream_request_test.cc b/test/common/router/upstream_request_test.cc new file mode 100644 index 0000000000000..d9900dc2a588e --- /dev/null +++ b/test/common/router/upstream_request_test.cc @@ -0,0 +1,245 @@ +#include "common/buffer/buffer_impl.h" +#include "common/router/config_impl.h" +#include "common/router/router.h" +#include "common/router/upstream_request.h" + +#include "test/common/http/common.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/router/mocks.h" +#include "test/mocks/server/mocks.h" +#include "test/mocks/tcp/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AnyNumber; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Router { +namespace { + +class MockGenericConnPool : public GenericConnPool { + MOCK_METHOD(void, newStream, (GenericConnectionPoolCallbacks * request)); + MOCK_METHOD(bool, cancelAnyPendingRequest, ()); + MOCK_METHOD(absl::optional, protocol, (), (const)); +}; + +class MockGenericConnectionPoolCallbacks : public GenericConnectionPoolCallbacks { +public: + MOCK_METHOD(void, onPoolFailure, + (Http::ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host)); + MOCK_METHOD(void, onPoolReady, + (std::unique_ptr && upstream, + Upstream::HostDescriptionConstSharedPtr host, + const Network::Address::InstanceConstSharedPtr& upstream_local_address, + const StreamInfo::StreamInfo& info)); + MOCK_METHOD(UpstreamRequest*, upstreamRequest, ()); +}; + +class MockRouterFilterInterface : public RouterFilterInterface { +public: + MockRouterFilterInterface() + : config_("prefix.", context_, ShadowWriterPtr(new MockShadowWriter()), router_proto) { + auto cluster_info = new NiceMock(); + cluster_info->timeout_budget_stats_ = absl::nullopt; + cluster_info_.reset(cluster_info); + ON_CALL(*this, callbacks()).WillByDefault(Return(&callbacks_)); + ON_CALL(*this, config()).WillByDefault(ReturnRef(config_)); + ON_CALL(*this, cluster()).WillByDefault(Return(cluster_info_)); + ON_CALL(*this, upstreamRequests()).WillByDefault(ReturnRef(requests_)); + EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + } + + MOCK_METHOD(void, onUpstream100ContinueHeaders, + (Http::ResponseHeaderMapPtr && headers, UpstreamRequest& upstream_request)); + MOCK_METHOD(void, onUpstreamHeaders, + (uint64_t response_code, Http::ResponseHeaderMapPtr&& headers, + UpstreamRequest& upstream_request, bool end_stream)); + MOCK_METHOD(void, onUpstreamData, + (Buffer::Instance & data, UpstreamRequest& upstream_request, bool end_stream)); + MOCK_METHOD(void, onUpstreamTrailers, + (Http::ResponseTrailerMapPtr && trailers, UpstreamRequest& upstream_request)); + MOCK_METHOD(void, onUpstreamMetadata, (Http::MetadataMapPtr && metadata_map)); + MOCK_METHOD(void, onUpstreamReset, + (Http::StreamResetReason reset_reason, absl::string_view transport_failure, + UpstreamRequest& upstream_request)); + MOCK_METHOD(void, onUpstreamHostSelected, (Upstream::HostDescriptionConstSharedPtr host)); + MOCK_METHOD(void, onPerTryTimeout, (UpstreamRequest & upstream_request)); + + MOCK_METHOD(Http::StreamDecoderFilterCallbacks*, callbacks, ()); + MOCK_METHOD(Upstream::ClusterInfoConstSharedPtr, cluster, ()); + MOCK_METHOD(FilterConfig&, config, ()); + MOCK_METHOD(FilterUtility::TimeoutData, timeout, ()); + MOCK_METHOD(Http::RequestHeaderMap*, downstreamHeaders, ()); + MOCK_METHOD(Http::RequestTrailerMap*, downstreamTrailers, ()); + MOCK_METHOD(bool, downstreamResponseStarted, (), (const)); + MOCK_METHOD(bool, downstreamEndStream, (), (const)); + MOCK_METHOD(uint32_t, attemptCount, (), (const)); + MOCK_METHOD(const VirtualCluster*, requestVcluster, (), (const)); + MOCK_METHOD(const RouteEntry*, routeEntry, (), (const)); + MOCK_METHOD(const std::list&, upstreamRequests, (), (const)); + MOCK_METHOD(const UpstreamRequest*, finalUpstreamRequest, (), (const)); + MOCK_METHOD(TimeSource&, timeSource, ()); + + NiceMock callbacks_; + + envoy::extensions::filters::http::router::v3::Router router_proto; + NiceMock context_; + FilterConfig config_; + Upstream::ClusterInfoConstSharedPtr cluster_info_; + std::list requests_; +}; + +class TcpConnPoolTest : public ::testing::Test { +public: + TcpConnPoolTest() + : conn_pool_(&mock_pool_), host_(std::make_shared>()) {} + + TcpConnPool conn_pool_; + Tcp::ConnectionPool::MockInstance mock_pool_; + MockGenericConnectionPoolCallbacks mock_generic_callbacks_; + std::shared_ptr> host_; + NiceMock cancellable_; +}; + +TEST_F(TcpConnPoolTest, Basic) { + NiceMock connection; + + EXPECT_CALL(mock_pool_, newConnection(_)).WillOnce(Return(&cancellable_)); + conn_pool_.newStream(&mock_generic_callbacks_); + + EXPECT_CALL(mock_generic_callbacks_, upstreamRequest()); + EXPECT_CALL(mock_generic_callbacks_, onPoolReady(_, _, _, _)); + auto data = std::make_unique>(); + EXPECT_CALL(*data, connection()).Times(AnyNumber()).WillRepeatedly(ReturnRef(connection)); + conn_pool_.onPoolReady(std::move(data), host_); +} + +TEST_F(TcpConnPoolTest, OnPoolFailure) { + EXPECT_CALL(mock_pool_, newConnection(_)).WillOnce(Return(&cancellable_)); + conn_pool_.newStream(&mock_generic_callbacks_); + + EXPECT_CALL(mock_generic_callbacks_, onPoolFailure(_, _, _)); + conn_pool_.onPoolFailure(Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure, host_); + + // Make sure that the pool failure nulled out the pending request. + EXPECT_FALSE(conn_pool_.cancelAnyPendingRequest()); +} + +TEST_F(TcpConnPoolTest, Cancel) { + // Initially cancel should fail as there is no pending request. + EXPECT_FALSE(conn_pool_.cancelAnyPendingRequest()); + + EXPECT_CALL(mock_pool_, newConnection(_)).WillOnce(Return(&cancellable_)); + conn_pool_.newStream(&mock_generic_callbacks_); + + // Canceling should now return true as there was an active request. + EXPECT_TRUE(conn_pool_.cancelAnyPendingRequest()); + + // A second cancel should return false as there is not a pending request. + EXPECT_FALSE(conn_pool_.cancelAnyPendingRequest()); +} + +class TcpUpstreamTest : public ::testing::Test { +public: + TcpUpstreamTest() { + mock_router_filter_.requests_.push_back(std::make_unique( + mock_router_filter_, std::make_unique>())); + auto data = std::make_unique>(); + EXPECT_CALL(*data, connection()).Times(AnyNumber()).WillRepeatedly(ReturnRef(connection_)); + tcp_upstream_ = + std::make_unique(mock_router_filter_.requests_.front().get(), std::move(data)); + } + ~TcpUpstreamTest() override { EXPECT_CALL(mock_router_filter_, config()).Times(AnyNumber()); } + +protected: + NiceMock connection_; + NiceMock mock_router_filter_; + Tcp::ConnectionPool::MockConnectionData* mock_connection_data_; + std::unique_ptr tcp_upstream_; + Http::TestRequestHeaderMapImpl request_{{":method", "CONNECT"}, + {":path", "/"}, + {":protocol", "bytestream"}, + {":scheme", "https"}, + {":authority", "host"}}; +}; + +TEST_F(TcpUpstreamTest, Basic) { + // Swallow the headers. + tcp_upstream_->encodeHeaders(request_, false); + + // Proxy the data. + EXPECT_CALL(connection_, write(BufferStringEqual("foo"), false)); + Buffer::OwnedImpl buffer("foo"); + tcp_upstream_->encodeData(buffer, false); + + // Metadata is swallowed. + Http::MetadataMapVector metadata_map_vector; + tcp_upstream_->encodeMetadata(metadata_map_vector); + + // On initial data payload, fake response headers, and forward data. + Buffer::OwnedImpl response1("bar"); + EXPECT_CALL(mock_router_filter_, onUpstreamHeaders(200, _, _, false)); + EXPECT_CALL(mock_router_filter_, onUpstreamData(BufferStringEqual("bar"), _, false)); + tcp_upstream_->onUpstreamData(response1, false); + + // On the next batch of payload there won't be additional headers. + Buffer::OwnedImpl response2("eep"); + EXPECT_CALL(mock_router_filter_, onUpstreamHeaders(_, _, _, _)).Times(0); + EXPECT_CALL(mock_router_filter_, onUpstreamData(BufferStringEqual("eep"), _, false)); + tcp_upstream_->onUpstreamData(response2, false); +} + +TEST_F(TcpUpstreamTest, TrailersEndStream) { + // Swallow the headers. + tcp_upstream_->encodeHeaders(request_, false); + + EXPECT_CALL(connection_, write(BufferStringEqual(""), true)); + Http::TestRequestTrailerMapImpl trailers{{"foo", "bar"}}; + tcp_upstream_->encodeTrailers(trailers); +} + +TEST_F(TcpUpstreamTest, HeaderEndStreamHalfClose) { + EXPECT_CALL(connection_, write(BufferStringEqual(""), true)); + tcp_upstream_->encodeHeaders(request_, true); +} + +TEST_F(TcpUpstreamTest, ReadDisable) { + EXPECT_CALL(connection_, readDisable(true)); + tcp_upstream_->readDisable(true); + + EXPECT_CALL(connection_, readDisable(false)); + tcp_upstream_->readDisable(false); + + // Once the connection is closed, don't touch it. + connection_.state_ = Network::Connection::State::Closed; + EXPECT_CALL(connection_, readDisable(_)).Times(0); + tcp_upstream_->readDisable(true); +} + +TEST_F(TcpUpstreamTest, UpstreamEvent) { + // Make sure upstream disconnects result in stream reset. + EXPECT_CALL(mock_router_filter_, + onUpstreamReset(Http::StreamResetReason::ConnectionTermination, "", _)); + tcp_upstream_->onEvent(Network::ConnectionEvent::RemoteClose); +} + +TEST_F(TcpUpstreamTest, Watermarks) { + EXPECT_CALL(mock_router_filter_, callbacks()).Times(AnyNumber()); + EXPECT_CALL(mock_router_filter_.callbacks_, onDecoderFilterAboveWriteBufferHighWatermark()); + tcp_upstream_->onAboveWriteBufferHighWatermark(); + + EXPECT_CALL(mock_router_filter_.callbacks_, onDecoderFilterBelowWriteBufferLowWatermark()); + tcp_upstream_->onBelowWriteBufferLowWatermark(); +} + +} // namespace +} // namespace Router +} // namespace Envoy diff --git a/test/integration/tcp_tunneling_integration_test.cc b/test/integration/tcp_tunneling_integration_test.cc index 2851fd79a4946..3ce0f1cec06a8 100644 --- a/test/integration/tcp_tunneling_integration_test.cc +++ b/test/integration/tcp_tunneling_integration_test.cc @@ -22,12 +22,20 @@ class ConnectTerminationIntegrationTest } void initialize() override { - auto host = config_helper_.createVirtualHost("host", "/"); - // host.mutable_proxying_config(); - config_helper_.addVirtualHost(host); config_helper_.addConfigModifier( [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& - hcm) -> void { + hcm) { + auto* route_config = hcm.mutable_route_config(); + ASSERT_EQ(1, route_config->virtual_hosts_size()); + auto* route = route_config->mutable_virtual_hosts(0)->mutable_routes(0); + auto* match = route->mutable_match(); + match->Clear(); + match->mutable_connect_matcher(); + + auto* upgrade = route->mutable_route()->add_upgrade_configs(); + upgrade->set_upgrade_type("CONNECT"); + upgrade->mutable_connect_config(); + hcm.add_upgrade_configs()->set_upgrade_type("CONNECT"); hcm.mutable_http2_protocol_options()->set_allow_connect(true); @@ -66,7 +74,7 @@ class ConnectTerminationIntegrationTest {":path", "/"}, {":protocol", "bytestream"}, {":scheme", "https"}, - {":authority", "host"}}; + {":authority", "host:80"}}; FakeRawConnectionPtr fake_raw_upstream_connection_; IntegrationStreamDecoderPtr response_; bool enable_timeout_{}; @@ -74,7 +82,7 @@ class ConnectTerminationIntegrationTest // TODO(alyssawilk) make sure that if data is sent with the connect it does not go upstream // until the 200 headers are sent before unhiding ANY config. -TEST_P(ConnectTerminationIntegrationTest, DISABLED_Basic) { +TEST_P(ConnectTerminationIntegrationTest, Basic) { initialize(); setUpConnection(); @@ -92,7 +100,7 @@ TEST_P(ConnectTerminationIntegrationTest, DISABLED_Basic) { ASSERT_FALSE(response_->reset()); } -TEST_P(ConnectTerminationIntegrationTest, DISABLED_DownstreamClose) { +TEST_P(ConnectTerminationIntegrationTest, DownstreamClose) { initialize(); setUpConnection(); @@ -103,7 +111,7 @@ TEST_P(ConnectTerminationIntegrationTest, DISABLED_DownstreamClose) { ASSERT_TRUE(fake_raw_upstream_connection_->waitForHalfClose()); } -TEST_P(ConnectTerminationIntegrationTest, DISABLED_DownstreamReset) { +TEST_P(ConnectTerminationIntegrationTest, DownstreamReset) { initialize(); setUpConnection(); @@ -114,7 +122,7 @@ TEST_P(ConnectTerminationIntegrationTest, DISABLED_DownstreamReset) { ASSERT_TRUE(fake_raw_upstream_connection_->waitForHalfClose()); } -TEST_P(ConnectTerminationIntegrationTest, DISABLED_UpstreamClose) { +TEST_P(ConnectTerminationIntegrationTest, UpstreamClose) { initialize(); setUpConnection(); @@ -125,7 +133,7 @@ TEST_P(ConnectTerminationIntegrationTest, DISABLED_UpstreamClose) { response_->waitForReset(); } -TEST_P(ConnectTerminationIntegrationTest, DISABLED_TestTimeout) { +TEST_P(ConnectTerminationIntegrationTest, TestTimeout) { enable_timeout_ = true; initialize(); @@ -136,7 +144,7 @@ TEST_P(ConnectTerminationIntegrationTest, DISABLED_TestTimeout) { ASSERT_TRUE(fake_raw_upstream_connection_->waitForHalfClose()); } -TEST_P(ConnectTerminationIntegrationTest, DISABLED_BuggyHeaders) { +TEST_P(ConnectTerminationIntegrationTest, BuggyHeaders) { initialize(); // It's possible that the FIN is received before we set half close on the // upstream connection, so allow unexpected disconnects. @@ -150,7 +158,7 @@ TEST_P(ConnectTerminationIntegrationTest, DISABLED_BuggyHeaders) { {":path", "/"}, {":protocol", "bytestream"}, {":scheme", "https"}, - {":authority", "host"}}); + {":authority", "host:80"}}); // If the connection is established (created, set to half close, and then the // FIN arrives), make sure the FIN arrives, and send a FIN from upstream. if (fake_upstreams_[0]->waitForRawConnection(fake_raw_upstream_connection_) &&