diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index eb7d4196d9107..e4517f158111e 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -135,13 +135,8 @@ Filter::~Filter() { access_log->log(nullptr, nullptr, nullptr, getRequestInfo()); } - if (upstream_handle_) { - upstream_handle_->cancel(); - } - - if (upstream_conn_data_) { - upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); - } + ASSERT(upstream_handle_ == nullptr); + ASSERT(upstream_conn_data_ == nullptr); } TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) { @@ -412,17 +407,29 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) { if (event == Network::ConnectionEvent::RemoteClose) { upstream_conn_data_->connection().close(Network::ConnectionCloseType::FlushWrite); - if (upstream_conn_data_ != nullptr && - upstream_conn_data_->connection().state() != Network::Connection::State::Closed) { - config_->drainManager().add(config_->sharedConfig(), std::move(upstream_conn_data_), - std::move(upstream_callbacks_), std::move(idle_timer_), - read_callbacks_->upstreamHost()); + // Events raised from the previous line may cause upstream_conn_data_ to be NULL if + // it was able to immediately flush all data. + + if (upstream_conn_data_ != nullptr) { + if (upstream_conn_data_->connection().state() != Network::Connection::State::Closed) { + config_->drainManager().add(config_->sharedConfig(), std::move(upstream_conn_data_), + std::move(upstream_callbacks_), std::move(idle_timer_), + read_callbacks_->upstreamHost()); + } else { + upstream_conn_data_.reset(); + } } } else if (event == Network::ConnectionEvent::LocalClose) { upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); upstream_conn_data_.reset(); disableIdleTimer(); } + } else if (upstream_handle_) { + if (event == Network::ConnectionEvent::LocalClose || + event == Network::ConnectionEvent::RemoteClose) { + upstream_handle_->cancel(); + upstream_handle_ = nullptr; + } } } diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 90e6e21f45a30..266a705a5fb97 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -1705,6 +1705,10 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, false); + Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks = nullptr; + EXPECT_CALL(*conn_pool_.connection_data_, addUpstreamCallbacks(_)) + .WillOnce( + Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) { upstream_callbacks = &cb; })); conn_pool_.host_->hostname_ = "newhost"; conn_pool_.poolReady(upstream_conn_); @@ -1714,6 +1718,7 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) { EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value()); EXPECT_EQ(0U, stats_.named_.downstream_cx_http1_active_.value()); + upstream_callbacks->onEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); EXPECT_EQ(0U, stats_.named_.downstream_cx_websocket_active_.value()); @@ -1753,8 +1758,13 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) { EXPECT_CALL(upstream_conn_, write(_, false)); EXPECT_CALL(upstream_conn_, write(BufferEqual(&early_data), false)); EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); + Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks = nullptr; + EXPECT_CALL(*conn_pool_.connection_data_, addUpstreamCallbacks(_)) + .WillOnce( + Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) { upstream_callbacks = &cb; })); conn_pool_.poolReady(upstream_conn_); + upstream_callbacks->onEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } @@ -1828,7 +1838,12 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyEndStream) { EXPECT_CALL(upstream_conn_, write(_, false)); EXPECT_CALL(upstream_conn_, write(_, true)).Times(0); + Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks = nullptr; + EXPECT_CALL(*conn_pool_.connection_data_, addUpstreamCallbacks(_)) + .WillOnce( + Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) { upstream_callbacks = &cb; })); conn_pool_.poolReady(upstream_conn_); + upstream_callbacks->onEvent(Network::ConnectionEvent::RemoteClose); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); conn_manager_.reset(); } diff --git a/test/common/network/filter_manager_impl_test.cc b/test/common/network/filter_manager_impl_test.cc index 06837dfbd450c..8aeaf11a978b9 100644 --- a/test/common/network/filter_manager_impl_test.cc +++ b/test/common/network/filter_manager_impl_test.cc @@ -214,6 +214,8 @@ TEST_F(NetworkFilterManagerTest, RateLimitAndTcpProxy) { EXPECT_CALL(upstream_connection, write(BufferEqual(&buffer), _)); read_buffer_.add("hello"); manager.onRead(); + + connection.raiseEvent(ConnectionEvent::RemoteClose); } } // namespace Network diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index f447f62b08e3e..1df01629ffa81 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -348,6 +348,12 @@ class TcpProxyTest : public testing::Test { .WillByDefault(SaveArg<0>(&access_log_data_)); } + ~TcpProxyTest() { + if (filter_ != nullptr) { + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); + } + } + void configure(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& config) { config_.reset(new Config(config, factory_context_)); } @@ -734,6 +740,22 @@ TEST_F(TcpProxyTest, DisconnectBeforeData) { filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); } +// Test that if the downstream connection is closed before the upstream connection +// is established, the upstream connection is cancelled. +TEST_F(TcpProxyTest, RemoteClosetBeforeUpstreamConnected) { + setup(1); + EXPECT_CALL(*conn_pool_handles_.at(0), cancel()); + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); +} + +// Test that if the downstream connection is closed before the upstream connection +// is established, the upstream connection is cancelled. +TEST_F(TcpProxyTest, LocalClosetBeforeUpstreamConnected) { + setup(1); + EXPECT_CALL(*conn_pool_handles_.at(0), cancel()); + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); +} + TEST_F(TcpProxyTest, UpstreamConnectFailure) { setup(1, accessLogConfig("%RESPONSE_FLAGS%")); @@ -873,6 +895,7 @@ TEST_F(TcpProxyTest, IdleTimeoutWithOutstandingDataFlushed) { TEST_F(TcpProxyTest, AccessLogUpstreamHost) { setup(1, accessLogConfig("%UPSTREAM_HOST% %UPSTREAM_CLUSTER%")); raiseEventUpstreamConnected(0); + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); filter_.reset(); EXPECT_EQ(access_log_data_, "127.0.0.1:80 fake_cluster"); } @@ -881,6 +904,7 @@ TEST_F(TcpProxyTest, AccessLogUpstreamHost) { TEST_F(TcpProxyTest, AccessLogUpstreamLocalAddress) { setup(1, accessLogConfig("%UPSTREAM_LOCAL_ADDRESS%")); raiseEventUpstreamConnected(0); + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); filter_.reset(); EXPECT_EQ(access_log_data_, "2.2.2.2:50000"); } @@ -893,6 +917,7 @@ TEST_F(TcpProxyTest, AccessLogDownstreamAddress) { filter_callbacks_.connection_.remote_address_ = Network::Utility::resolveUrl("tcp://1.1.1.1:40000"); setup(1, accessLogConfig("%DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT% %DOWNSTREAM_LOCAL_ADDRESS%")); + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); filter_.reset(); EXPECT_EQ(access_log_data_, "1.1.1.1 1.1.1.2:20000"); } @@ -1075,6 +1100,9 @@ TEST_F(TcpProxyRoutingTest, NonRoutableConnection) { EXPECT_EQ(total_cx + 1, config_->stats().downstream_cx_total_.value()); EXPECT_EQ(non_routable_cx + 1, config_->stats().downstream_cx_no_route_.value()); + + // Cleanup + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); } TEST_F(TcpProxyRoutingTest, RoutableConnection) { @@ -1087,7 +1115,8 @@ TEST_F(TcpProxyRoutingTest, RoutableConnection) { connection_.local_address_ = std::make_shared("1.2.3.4", 9999); // Expect filter to try to open a connection to specified cluster. - EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)); + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _)) + .WillOnce(Return(nullptr)); filter_->onNewConnection(); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index ef3ee25532fdc..bd940c9f39c93 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -536,7 +536,7 @@ AssertionResult FakeRawConnection::write(const std::string& data, bool end_strea Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data, bool end_stream) { Thread::LockGuard lock(parent_.lock_); - ENVOY_LOG(debug, "got {} bytes", data.length()); + ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream); parent_.data_.append(data.toString()); parent_.half_closed_ = end_stream; data.drain(data.length());