-
Notifications
You must be signed in to change notification settings - Fork 5.3k
tcp_proxy: wait for CONNECT response before start streaming data #14317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
01e9a78
e84b3fb
c0b859d
f14610d
d867f50
0d26656
267f633
954b057
a181d5f
83887e0
6caf912
342eafa
6b016d2
e3345ec
55e8de6
3a47ea4
ae297ff
ba06231
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,7 +64,36 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba | |
| Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, | ||
| absl::optional<Http::Protocol>) override; | ||
|
|
||
| class Callbacks { | ||
| public: | ||
| Callbacks(HttpConnPool& conn_pool, Upstream::HostDescriptionConstSharedPtr host, | ||
| Ssl::ConnectionInfoConstSharedPtr ssl_info) | ||
| : conn_pool_(&conn_pool), host_(host), ssl_info_(ssl_info) {} | ||
| virtual ~Callbacks() = default; | ||
| virtual void onSuccess(Http::RequestEncoder& request_encoder) { | ||
| ASSERT(conn_pool_ != nullptr); | ||
| conn_pool_->onGenericPoolReady(host_, request_encoder.getStream().connectionLocalAddress(), | ||
| ssl_info_); | ||
| } | ||
| virtual void onFailure() { | ||
| ASSERT(conn_pool_ != nullptr); | ||
| conn_pool_->callbacks_->onGenericPoolFailure( | ||
| ConnectionPool::PoolFailureReason::RemoteConnectionFailure, host_); | ||
| } | ||
|
|
||
| protected: | ||
| Callbacks() = default; | ||
|
|
||
| private: | ||
| HttpConnPool* conn_pool_{}; | ||
| Upstream::HostDescriptionConstSharedPtr host_; | ||
| Ssl::ConnectionInfoConstSharedPtr ssl_info_; | ||
| }; | ||
|
|
||
| private: | ||
| void onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host, | ||
| const Network::Address::InstanceConstSharedPtr& local_address, | ||
| Ssl::ConnectionInfoConstSharedPtr ssl_info); | ||
| const std::string hostname_; | ||
| Http::CodecClient::Type type_; | ||
| Http::ConnectionPool::Instance* conn_pool_{}; | ||
|
|
@@ -92,7 +121,6 @@ class TcpUpstream : public GenericUpstream { | |
| class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { | ||
| public: | ||
| ~HttpUpstream() override; | ||
|
|
||
| virtual bool isValidResponse(const Http::ResponseHeaderMap&) PURE; | ||
|
|
||
| void doneReading(); | ||
|
|
@@ -112,6 +140,9 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { | |
| void onBelowWriteBufferLowWatermark() override; | ||
|
|
||
| virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) PURE; | ||
| void setConnPoolCallbacks(std::unique_ptr<HttpConnPool::Callbacks>&& callbacks) { | ||
| conn_pool_callbacks_ = std::move(callbacks); | ||
| } | ||
|
|
||
| protected: | ||
| HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname); | ||
|
|
@@ -129,6 +160,9 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { | |
| void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override { | ||
| if (!parent_.isValidResponse(*headers) || end_stream) { | ||
| parent_.resetEncoder(Network::ConnectionEvent::LocalClose); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we still have a weird corner case where we resetEncoder where if "inform_downstream" is true, we send an upstream event downstream before the pool knows it has an upstream associated. I think we could simplify this by dropping onGenericPoolFailure below, and instead in resetEncoder,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, I saw your previous comment and I was still figuring out how to do it. I pushed what I did so far, not tested yet. If you have the time to take a look let me know if I'm on the good path ;-) |
||
| } else if (parent_.conn_pool_callbacks_ != nullptr) { | ||
| parent_.conn_pool_callbacks_->onSuccess(*parent_.request_encoder_); | ||
| parent_.conn_pool_callbacks_.reset(); | ||
| } | ||
| } | ||
| void decodeData(Buffer::Instance& data, bool end_stream) override { | ||
|
|
@@ -147,6 +181,10 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { | |
| Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; | ||
| bool read_half_closed_{}; | ||
| bool write_half_closed_{}; | ||
|
|
||
| // Used to defer onGenericPoolReady and onGenericPoolFailure to the reception | ||
| // of the CONNECT response or the resetEncoder. | ||
| std::unique_ptr<HttpConnPool::Callbacks> conn_pool_callbacks_; | ||
| }; | ||
|
|
||
| class Http1Upstream : public HttpUpstream { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -217,6 +217,7 @@ class ProxyingConnectIntegrationTest : public HttpProtocolIntegrationTest { | |
| config_helper_.addConfigModifier( | ||
| [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& | ||
| hcm) -> void { ConfigHelper::setConnectConfig(hcm, false); }); | ||
|
|
||
| HttpProtocolIntegrationTest::initialize(); | ||
| } | ||
|
|
||
|
|
@@ -313,7 +314,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ConnectTerminationIntegrationTest, | |
| testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), | ||
| TestUtility::ipTestParamsToString); | ||
|
|
||
| using Params = std::tuple<Network::Address::IpVersion, FakeHttpConnection::Type>; | ||
| using Params = std::tuple<Network::Address::IpVersion, FakeHttpConnection::Type, bool>; | ||
|
|
||
| // Tunneling downstream TCP over an upstream HTTP CONNECT tunnel. | ||
| class TcpTunnelingIntegrationTest : public testing::TestWithParam<Params>, | ||
|
|
@@ -323,17 +324,25 @@ class TcpTunnelingIntegrationTest : public testing::TestWithParam<Params>, | |
| : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) {} | ||
|
|
||
| static std::string paramsToString(const testing::TestParamInfo<Params>& p) { | ||
| return fmt::format("{}_{}", | ||
| std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", | ||
| std::get<1>(p.param) == FakeHttpConnection::Type::HTTP1 ? "HTTP1Upstream" | ||
| : "HTTP2Upstream"); | ||
| return fmt::format( | ||
| "{}_{}_{}", std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", | ||
| std::get<1>(p.param) == FakeHttpConnection::Type::HTTP1 ? "HTTP1Upstream" : "HTTP2Upstream", | ||
| std::get<2>(p.param) ? "WaitConnectResponse" : "DoNotWaitConnectResponse"); | ||
| } | ||
|
|
||
| void SetUp() override { | ||
| wait_for_connect_response_ = std::get<2>(GetParam()); | ||
| enableHalfClose(true); | ||
| setDownstreamProtocol(Http::CodecClient::Type::HTTP2); | ||
| setUpstreamProtocol(std::get<1>(GetParam())); | ||
|
|
||
| if (wait_for_connect_response_) { | ||
| config_helper_.addRuntimeOverride( | ||
| "envoy.reloadable_features.http_upstream_wait_connect_response", "true"); | ||
| } else { | ||
| config_helper_.addRuntimeOverride( | ||
| "envoy.reloadable_features.http_upstream_wait_connect_response", "false"); | ||
| } | ||
| config_helper_.addConfigModifier( | ||
| [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { | ||
| envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy proxy_config; | ||
|
|
@@ -354,6 +363,7 @@ class TcpTunnelingIntegrationTest : public testing::TestWithParam<Params>, | |
| filter->set_name("envoy.filters.network.tcp_proxy"); | ||
| }); | ||
| } | ||
| bool wait_for_connect_response_{}; | ||
| }; | ||
|
|
||
| TEST_P(TcpTunnelingIntegrationTest, Basic) { | ||
|
|
@@ -811,11 +821,90 @@ TEST_P(TcpTunnelingIntegrationTest, DISABLED_TransferEncodingHeaderIgnoredHttp1) | |
| ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); | ||
| } | ||
|
|
||
| TEST_P(TcpTunnelingIntegrationTest, DeferTransmitDataUntilSuccessConnectResponseIsReceived) { | ||
| if (!wait_for_connect_response_) { | ||
| return; | ||
| } | ||
| initialize(); | ||
|
|
||
| // Start a connection, and verify the upgrade headers are received upstream. | ||
| IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); | ||
|
|
||
| // Send some data straight away. | ||
| ASSERT_TRUE(tcp_client->write("hello", false)); | ||
|
|
||
| ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); | ||
| ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); | ||
| ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); | ||
|
|
||
| // Wait a bit, no data should go through. | ||
| ASSERT_FALSE(upstream_request_->waitForData(*dispatcher_, 1, std::chrono::milliseconds(100))); | ||
|
|
||
| upstream_request_->encodeHeaders(default_response_headers_, false); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could have a test for failure modes too - either a disconnect and/or not 200-ok headers.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added test for non 200 response |
||
|
|
||
| ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); | ||
|
|
||
| tcp_client->close(); | ||
| if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { | ||
| ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); | ||
| } else { | ||
| ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); | ||
| // If the upstream now sends 'end stream' the connection is fully closed. | ||
| upstream_request_->encodeData(0, true); | ||
| } | ||
| } | ||
|
|
||
| TEST_P(TcpTunnelingIntegrationTest, NoDataTransmittedIfConnectFailureResponseIsReceived) { | ||
| if (!wait_for_connect_response_) { | ||
| return; | ||
| } | ||
| initialize(); | ||
|
|
||
| // Start a connection, and verify the upgrade headers are received upstream. | ||
| IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); | ||
|
|
||
| // Send some data straight away. | ||
| ASSERT_TRUE(tcp_client->write("hello", false)); | ||
|
|
||
| ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); | ||
| ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); | ||
| ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); | ||
|
|
||
| default_response_headers_.setStatus(enumToInt(Http::Code::ServiceUnavailable)); | ||
| upstream_request_->encodeHeaders(default_response_headers_, false); | ||
|
|
||
| // Wait a bit, no data should go through. | ||
| ASSERT_FALSE(upstream_request_->waitForData(*dispatcher_, 1, std::chrono::milliseconds(100))); | ||
|
|
||
| tcp_client->close(); | ||
| if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { | ||
| ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); | ||
| } else { | ||
| ASSERT_TRUE(upstream_request_->waitForReset()); | ||
| } | ||
| } | ||
|
|
||
| TEST_P(TcpTunnelingIntegrationTest, UpstreamDisconnectBeforeResponseReceived) { | ||
| initialize(); | ||
|
|
||
| // Start a connection, and verify the upgrade headers are received upstream. | ||
| IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); | ||
|
|
||
| ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); | ||
| ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); | ||
| ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); | ||
|
|
||
| ASSERT_TRUE(fake_upstream_connection_->close()); | ||
| tcp_client->waitForHalfClose(); | ||
| tcp_client->close(); | ||
| } | ||
|
|
||
| INSTANTIATE_TEST_SUITE_P( | ||
| IpAndHttpVersions, TcpTunnelingIntegrationTest, | ||
| ::testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), | ||
| testing::Values(FakeHttpConnection::Type::HTTP1, | ||
| FakeHttpConnection::Type::HTTP2)), | ||
| FakeHttpConnection::Type::HTTP2), | ||
| testing::Values(false, true)), | ||
| TcpTunnelingIntegrationTest::paramsToString); | ||
|
|
||
| } // namespace | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we end up having an event callback "from the upstream connection" before there is a connection established. That's a bit weird. I think it might be cleaner if inform_downstream is true, that if we have a deferer we do pool failure, and if not we do the onEvent. WDYT?