diff --git a/docs/root/intro/arch_overview/http/upgrades.rst b/docs/root/intro/arch_overview/http/upgrades.rst index 08e5254a8ca74..6a4338d39ee57 100644 --- a/docs/root/intro/arch_overview/http/upgrades.rst +++ b/docs/root/intro/arch_overview/http/upgrades.rst @@ -123,3 +123,6 @@ and `bazel-bin/source/exe/envoy-static --config-path configs/terminate_http2_con In both cases you will be running a first Envoy listening for TCP traffic on port 10000 and encapsulating it in an HTTP CONNECT request, and a second one listening on 10001, stripping the CONNECT headers, and forwarding the original TCP upstream, in this case to google.com. + +Envoy waits for the HTTP tunnel to be established (i.e. a successful response to the CONNECT request is received), +before start streaming the downstream TCP data to the upstream. diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 19fb421f6bc9e..21e313b41c3ba 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -28,6 +28,7 @@ Minor Behavior Changes * mongo proxy metrics: swapped network connection remote and local closed counters previously set reversed (`cx_destroy_local_with_active_rq` and `cx_destroy_remote_with_active_rq`). * outlier detection: added :ref:`max_ejection_time ` to limit ejection time growth when a node stays unhealthy for extended period of time. By default :ref:`max_ejection_time ` limits ejection time to 5 minutes. Additionally, when the node stays healthy, ejection time decreases. See :ref:`ejection algorithm` for more info. Previously, ejection time could grow without limit and never decreased. * performance: improve performance when handling large HTTP/1 bodies. +* tcp_proxy: now waits for HTTP tunnel to be established before start streaming the downstream data, the runtime guard `envoy.reloadable_features.http_upstream_wait_connect_response` can be set to "false" to disable this behavior. * tls: removed RSA key transport and SHA-1 cipher suites from the client-side defaults. * watchdog: the watchdog action :ref:`abort_action ` is now the default action to terminate the process if watchdog kill / multikill is enabled. * xds: to support TTLs, heartbeating has been added to xDS. As a result, responses that contain empty resources without updating the version will no longer be propagated to the diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 4600455f3e468..3181699080f5a 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -76,6 +76,7 @@ constexpr const char* runtime_features[] = { "envoy.reloadable_features.http_match_on_all_headers", "envoy.reloadable_features.http_set_copy_replace_all_headers", "envoy.reloadable_features.http_transport_failure_reason_in_body", + "envoy.reloadable_features.http_upstream_wait_connect_response", "envoy.reloadable_features.http2_skip_encoding_empty_trailers", "envoy.reloadable_features.listener_in_place_filterchain_update", "envoy.reloadable_features.overload_manager_disable_keepalive_drain_http2", diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 5b286f64dbc79..5ad176e77c5b0 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -7,6 +7,7 @@ #include "common/http/header_map_impl.h" #include "common/http/headers.h" #include "common/http/utility.h" +#include "common/runtime/runtime_features.h" namespace Envoy { namespace TcpProxy { @@ -116,6 +117,12 @@ void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_down request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset); } request_encoder_ = nullptr; + // If we did not receive a valid CONNECT response yet we treat this as a pool + // failure, otherwise we forward the event downstream. + if (conn_pool_callbacks_ != nullptr) { + conn_pool_callbacks_->onFailure(); + return; + } if (inform_downstream) { upstream_callbacks_.onEvent(event); } @@ -223,9 +230,22 @@ void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, Http::RequestEncoder* latched_encoder = &request_encoder; upstream_->setRequestEncoder(request_encoder, host->transportSocketFactory().implementsSecureTransport()); - callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, - latched_encoder->getStream().connectionLocalAddress(), - info.downstreamSslConnection()); + + if (Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.http_upstream_wait_connect_response")) { + upstream_->setConnPoolCallbacks( + std::make_unique(*this, host, info.downstreamSslConnection())); + } else { + callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, + latched_encoder->getStream().connectionLocalAddress(), + info.downstreamSslConnection()); + } +} + +void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) { + callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, local_address, ssl_info); } Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index 1aed24c8093ec..80f8babfef482 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -64,7 +64,36 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, absl::optional) override; + class Callbacks { + public: + Callbacks(HttpConnPool& conn_pool, Upstream::HostDescriptionConstSharedPtr host, + Ssl::ConnectionInfoConstSharedPtr ssl_info) + : conn_pool_(&conn_pool), host_(host), ssl_info_(ssl_info) {} + virtual ~Callbacks() = default; + virtual void onSuccess(Http::RequestEncoder& request_encoder) { + ASSERT(conn_pool_ != nullptr); + conn_pool_->onGenericPoolReady(host_, request_encoder.getStream().connectionLocalAddress(), + ssl_info_); + } + virtual void onFailure() { + ASSERT(conn_pool_ != nullptr); + conn_pool_->callbacks_->onGenericPoolFailure( + ConnectionPool::PoolFailureReason::RemoteConnectionFailure, host_); + } + + protected: + Callbacks() = default; + + private: + HttpConnPool* conn_pool_{}; + Upstream::HostDescriptionConstSharedPtr host_; + Ssl::ConnectionInfoConstSharedPtr ssl_info_; + }; + private: + void onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info); const std::string hostname_; Http::CodecClient::Type type_; Http::ConnectionPool::Instance* conn_pool_{}; @@ -92,7 +121,6 @@ class TcpUpstream : public GenericUpstream { class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { public: ~HttpUpstream() override; - virtual bool isValidResponse(const Http::ResponseHeaderMap&) PURE; void doneReading(); @@ -112,6 +140,9 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { void onBelowWriteBufferLowWatermark() override; virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) PURE; + void setConnPoolCallbacks(std::unique_ptr&& callbacks) { + conn_pool_callbacks_ = std::move(callbacks); + } protected: HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname); @@ -129,6 +160,9 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override { if (!parent_.isValidResponse(*headers) || end_stream) { parent_.resetEncoder(Network::ConnectionEvent::LocalClose); + } else if (parent_.conn_pool_callbacks_ != nullptr) { + parent_.conn_pool_callbacks_->onSuccess(*parent_.request_encoder_); + parent_.conn_pool_callbacks_.reset(); } } void decodeData(Buffer::Instance& data, bool end_stream) override { @@ -147,6 +181,10 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; bool read_half_closed_{}; bool write_half_closed_{}; + + // Used to defer onGenericPoolReady and onGenericPoolFailure to the reception + // of the CONNECT response or the resetEncoder. + std::unique_ptr conn_pool_callbacks_; }; class Http1Upstream : public HttpUpstream { diff --git a/test/common/tcp_proxy/upstream_test.cc b/test/common/tcp_proxy/upstream_test.cc index 2a066e46f2664..1d632552a0f70 100644 --- a/test/common/tcp_proxy/upstream_test.cc +++ b/test/common/tcp_proxy/upstream_test.cc @@ -117,6 +117,41 @@ TYPED_TEST(HttpUpstreamTest, UpstreamWatermarks) { this->upstream_->onBelowWriteBufferLowWatermark(); } +class MockHttpConnPoolCallbacks : public HttpConnPool::Callbacks { +public: + MOCK_METHOD(void, onSuccess, (Http::RequestEncoder & request_encoder)); + MOCK_METHOD(void, onFailure, ()); +}; + +TYPED_TEST(HttpUpstreamTest, DownstreamDisconnectBeforeConnectResponse) { + auto conn_pool_callbacks = std::make_unique(); + auto conn_pool_callbacks_raw = conn_pool_callbacks.get(); + this->upstream_->setConnPoolCallbacks(std::move(conn_pool_callbacks)); + EXPECT_CALL(*conn_pool_callbacks_raw, onFailure()); + EXPECT_CALL(*conn_pool_callbacks_raw, onSuccess(_)).Times(0); + EXPECT_TRUE(this->upstream_->onDownstreamEvent(Network::ConnectionEvent::LocalClose) == nullptr); +} + +TYPED_TEST(HttpUpstreamTest, OnSuccessCalledOnValidResponse) { + auto conn_pool_callbacks = std::make_unique(); + auto conn_pool_callbacks_raw = conn_pool_callbacks.get(); + this->upstream_->setConnPoolCallbacks(std::move(conn_pool_callbacks)); + EXPECT_CALL(*conn_pool_callbacks_raw, onFailure()).Times(0); + EXPECT_CALL(*conn_pool_callbacks_raw, onSuccess(_)); + Http::ResponseHeaderMapPtr headers{new Http::TestResponseHeaderMapImpl{{":status", "200"}}}; + this->upstream_->responseDecoder().decodeHeaders(std::move(headers), false); +} + +TYPED_TEST(HttpUpstreamTest, OnFailureCalledOnInvalidResponse) { + auto conn_pool_callbacks = std::make_unique(); + auto conn_pool_callbacks_raw = conn_pool_callbacks.get(); + this->upstream_->setConnPoolCallbacks(std::move(conn_pool_callbacks)); + EXPECT_CALL(*conn_pool_callbacks_raw, onFailure()); + EXPECT_CALL(*conn_pool_callbacks_raw, onSuccess(_)).Times(0); + Http::ResponseHeaderMapPtr headers{new Http::TestResponseHeaderMapImpl{{":status", "404"}}}; + this->upstream_->responseDecoder().decodeHeaders(std::move(headers), false); +} + } // namespace } // namespace TcpProxy } // namespace Envoy diff --git a/test/integration/tcp_tunneling_integration_test.cc b/test/integration/tcp_tunneling_integration_test.cc index f8b5dd8273b2b..aef3d5f66fef8 100644 --- a/test/integration/tcp_tunneling_integration_test.cc +++ b/test/integration/tcp_tunneling_integration_test.cc @@ -217,6 +217,7 @@ class ProxyingConnectIntegrationTest : public HttpProtocolIntegrationTest { config_helper_.addConfigModifier( [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& hcm) -> void { ConfigHelper::setConnectConfig(hcm, false); }); + HttpProtocolIntegrationTest::initialize(); } @@ -313,7 +314,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ConnectTerminationIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); -using Params = std::tuple; +using Params = std::tuple; // Tunneling downstream TCP over an upstream HTTP CONNECT tunnel. class TcpTunnelingIntegrationTest : public testing::TestWithParam, @@ -323,17 +324,25 @@ class TcpTunnelingIntegrationTest : public testing::TestWithParam, : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) {} static std::string paramsToString(const testing::TestParamInfo& p) { - return fmt::format("{}_{}", - std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", - std::get<1>(p.param) == FakeHttpConnection::Type::HTTP1 ? "HTTP1Upstream" - : "HTTP2Upstream"); + return fmt::format( + "{}_{}_{}", std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", + std::get<1>(p.param) == FakeHttpConnection::Type::HTTP1 ? "HTTP1Upstream" : "HTTP2Upstream", + std::get<2>(p.param) ? "WaitConnectResponse" : "DoNotWaitConnectResponse"); } void SetUp() override { + wait_for_connect_response_ = std::get<2>(GetParam()); enableHalfClose(true); setDownstreamProtocol(Http::CodecClient::Type::HTTP2); setUpstreamProtocol(std::get<1>(GetParam())); + if (wait_for_connect_response_) { + config_helper_.addRuntimeOverride( + "envoy.reloadable_features.http_upstream_wait_connect_response", "true"); + } else { + config_helper_.addRuntimeOverride( + "envoy.reloadable_features.http_upstream_wait_connect_response", "false"); + } config_helper_.addConfigModifier( [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy proxy_config; @@ -354,6 +363,7 @@ class TcpTunnelingIntegrationTest : public testing::TestWithParam, filter->set_name("envoy.filters.network.tcp_proxy"); }); } + bool wait_for_connect_response_{}; }; TEST_P(TcpTunnelingIntegrationTest, Basic) { @@ -811,11 +821,90 @@ TEST_P(TcpTunnelingIntegrationTest, DISABLED_TransferEncodingHeaderIgnoredHttp1) ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); } +TEST_P(TcpTunnelingIntegrationTest, DeferTransmitDataUntilSuccessConnectResponseIsReceived) { + if (!wait_for_connect_response_) { + return; + } + initialize(); + + // Start a connection, and verify the upgrade headers are received upstream. + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + + // Send some data straight away. + ASSERT_TRUE(tcp_client->write("hello", false)); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + + // Wait a bit, no data should go through. + ASSERT_FALSE(upstream_request_->waitForData(*dispatcher_, 1, std::chrono::milliseconds(100))); + + upstream_request_->encodeHeaders(default_response_headers_, false); + + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); + + tcp_client->close(); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + } else { + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + // If the upstream now sends 'end stream' the connection is fully closed. + upstream_request_->encodeData(0, true); + } +} + +TEST_P(TcpTunnelingIntegrationTest, NoDataTransmittedIfConnectFailureResponseIsReceived) { + if (!wait_for_connect_response_) { + return; + } + initialize(); + + // Start a connection, and verify the upgrade headers are received upstream. + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + + // Send some data straight away. + ASSERT_TRUE(tcp_client->write("hello", false)); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + + default_response_headers_.setStatus(enumToInt(Http::Code::ServiceUnavailable)); + upstream_request_->encodeHeaders(default_response_headers_, false); + + // Wait a bit, no data should go through. + ASSERT_FALSE(upstream_request_->waitForData(*dispatcher_, 1, std::chrono::milliseconds(100))); + + tcp_client->close(); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + } else { + ASSERT_TRUE(upstream_request_->waitForReset()); + } +} + +TEST_P(TcpTunnelingIntegrationTest, UpstreamDisconnectBeforeResponseReceived) { + initialize(); + + // Start a connection, and verify the upgrade headers are received upstream. + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + + ASSERT_TRUE(fake_upstream_connection_->close()); + tcp_client->waitForHalfClose(); + tcp_client->close(); +} + INSTANTIATE_TEST_SUITE_P( IpAndHttpVersions, TcpTunnelingIntegrationTest, ::testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Values(FakeHttpConnection::Type::HTTP1, - FakeHttpConnection::Type::HTTP2)), + FakeHttpConnection::Type::HTTP2), + testing::Values(false, true)), TcpTunnelingIntegrationTest::paramsToString); } // namespace