From f3d04a6bcc831b7e8190d0541d7dcd72b0d07e78 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 9 Sep 2020 12:08:30 -0400 Subject: [PATCH 1/2] tcp_proxy: API cleanup Signed-off-by: Alyssa Wilk --- source/common/tcp/conn_pool.cc | 14 +++--- source/common/tcp_proxy/tcp_proxy.cc | 59 ++++++++++++++----------- source/common/tcp_proxy/tcp_proxy.h | 1 + test/common/tcp_proxy/tcp_proxy_test.cc | 9 ++++ 4 files changed, 48 insertions(+), 35 deletions(-) diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index ed3332d8afef2..0daa034597ebd 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -53,15 +53,13 @@ void ActiveTcpClient::clearCallbacks() { void ActiveTcpClient::onEvent(Network::ConnectionEvent event) { Envoy::ConnectionPool::ActiveClient::onEvent(event); - // Do not pass the Connected event to TCP proxy sessions. - // The tcp proxy filter synthesizes its own Connected event in onPoolReadyBase - // and receiving it twice causes problems. - // TODO(alyssawilk) clean this up in a follow-up. It's confusing. - if (callbacks_ && event != Network::ConnectionEvent::Connected) { + if (callbacks_) { callbacks_->onEvent(event); - // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct. - // Clear the pointer to avoid using it again. - callbacks_ = nullptr; + if (event != Network::ConnectionEvent::Connected) { + // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct. + // Clear the pointer to avoid using it again. + callbacks_ = nullptr; + } } } diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 5aa4b9d64cc49..91abf2f82e5a1 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -309,6 +309,9 @@ void Filter::DownstreamCallbacks::onBelowWriteBufferLowWatermark() { } void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::Connected) { + return; + } if (drainer_ == nullptr) { parent_->onUpstreamEvent(event); } else { @@ -505,8 +508,7 @@ void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, getStreamInfo().onUpstreamHostSelected(host); getStreamInfo().setUpstreamLocalAddress(local_address); getStreamInfo().setUpstreamSslConnection(ssl_info); - // Simulate the event that onPoolReady represents. - upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected); + onUpstreamConnection(); read_callbacks_->continueReading(); } @@ -637,31 +639,34 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) { read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); } } - } else if (event == Network::ConnectionEvent::Connected) { - // Re-enable downstream reads now that the upstream connection is established - // so we have a place to send downstream data to. - read_callbacks_->connection().readDisable(false); - - read_callbacks_->upstreamHost()->outlierDetector().putResult( - Upstream::Outlier::Result::LocalOriginConnectSuccessFinal); - - getStreamInfo().setRequestedServerName(read_callbacks_->connection().requestedServerName()); - ENVOY_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}", - getStreamInfo().requestedServerName()); - - if (config_->idleTimeout()) { - // The idle_timer_ can be moved to a Drainer, so related callbacks call into - // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch - // the call to either TcpProxy or to Drainer, depending on the current state. - idle_timer_ = read_callbacks_->connection().dispatcher().createTimer( - [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); }); - resetIdleTimer(); - read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); }); - if (upstream_) { - upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) { - upstream_callbacks->onBytesSent(); - }); - } + } +} + +void Filter::onUpstreamConnection() { + connecting_ = false; + // Re-enable downstream reads now that the upstream connection is established + // so we have a place to send downstream data to. + read_callbacks_->connection().readDisable(false); + + read_callbacks_->upstreamHost()->outlierDetector().putResult( + Upstream::Outlier::Result::LocalOriginConnectSuccessFinal); + + getStreamInfo().setRequestedServerName(read_callbacks_->connection().requestedServerName()); + ENVOY_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}", + getStreamInfo().requestedServerName()); + + if (config_->idleTimeout()) { + // The idle_timer_ can be moved to a Drainer, so related callbacks call into + // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch + // the call to either TcpProxy or to Drainer, depending on the current state. + idle_timer_ = read_callbacks_->connection().dispatcher().createTimer( + [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); }); + resetIdleTimer(); + read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); }); + if (upstream_) { + upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) { + upstream_callbacks->onBytesSent(); + }); } } } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 1bb73ddd377f8..b1abf95cc2473 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -353,6 +353,7 @@ class Filter : public Network::ReadFilter, void onDownstreamEvent(Network::ConnectionEvent event); void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamEvent(Network::ConnectionEvent event); + void onUpstreamConnection(); void onIdleTimeout(); void resetIdleTimer(); void disableIdleTimer(); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 817b87a6c01ae..20961c086b3d4 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -1146,6 +1146,15 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(ConnectAttemptsLimit)) { EXPECT_EQ(access_log_data_, "UF,URX"); } +TEST_F(TcpProxyTest, ConnectedNoOp) { + setup(1); + raiseEventUpstreamConnected(0); + + upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected); + + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); +} + // Test that the tcp proxy sends the correct notifications to the outlier detector TEST_F(TcpProxyTest, OutlierDetection) { envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = defaultConfig(); From fc9b5347c4ec55d32efeb96b4c177d5378b66d23 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 9 Sep 2020 15:47:17 -0400 Subject: [PATCH 2/2] new course Signed-off-by: Alyssa Wilk --- source/common/tcp/conn_pool.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index 0daa034597ebd..ee18b337a59e3 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -53,13 +53,14 @@ void ActiveTcpClient::clearCallbacks() { void ActiveTcpClient::onEvent(Network::ConnectionEvent event) { Envoy::ConnectionPool::ActiveClient::onEvent(event); - if (callbacks_) { + // Do not pass the Connected event to any session which registered during onEvent above. + // Consumers of connection pool connections assume they are receiving already connected + // connections. + if (callbacks_ && event != Network::ConnectionEvent::Connected) { callbacks_->onEvent(event); - if (event != Network::ConnectionEvent::Connected) { - // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct. - // Clear the pointer to avoid using it again. - callbacks_ = nullptr; - } + // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct. + // Clear the pointer to avoid using it again. + callbacks_ = nullptr; } }