From 219c40f19ea71334b2f35d090b3a9c521f08108f Mon Sep 17 00:00:00 2001 From: ohadvano <49730675+ohadvano@users.noreply.github.com> Date: Thu, 1 Aug 2024 21:50:27 +0300 Subject: [PATCH] async_tcp_client: remove callbacks if connection was not closed (#35410) When the ``AsyncTcpClient`` is being destroyed but it also has an active client connection, there's a crash since during the instance destruction, the ``ClientConnection`` object would also be destroyed, causing ``raiseEvent`` to be called back to ``AsyncTcpClient`` while it is being destroyed Caught with the following stack trace: ``` Caught Segmentation fault, suspect faulting address 0x0 Backtrace (use tools/stack_decode.py to get line numbers): Envoy version: ee8c765a07037033766ea556c032120b497152b3/1.27.0/Clean/RELEASE/BoringSSL #0: __restore_rt [0x7d80ab903420] #1: Envoy::Extensions::AccessLoggers::Fluentd::FluentdAccessLoggerImpl::onEvent() [0x58313528746b] #2: Envoy::Tcp::AsyncTcpClientImpl::onEvent() [0x5831359da00a] #3: Envoy::Network::ConnectionImplBase::raiseConnectionEvent() [0x583135f0521d] #4: Envoy::Network::ConnectionImpl::raiseEvent() [0x583135e9fed9] #5: Envoy::Network::ConnectionImpl::closeSocket() [0x583135e9f90c] #6: Envoy::Network::ConnectionImpl::close() [0x583135e9e54c] #7: Envoy::Network::ConnectionImpl::~ConnectionImpl() [0x583135e9de5c] #8: Envoy::Network::ClientConnectionImpl::~ClientConnectionImpl() [0x5831355fd25e] #9: Envoy::Tcp::AsyncTcpClientImpl::~AsyncTcpClientImpl() [0x5831359da247] #10: Envoy::Extensions::AccessLoggers::Fluentd::FluentdAccessLoggerImpl::~FluentdAccessLoggerImpl() [0x583135289350] #11: Envoy::Extensions::AccessLoggers::Fluentd::FluentdAccessLog::ThreadLocalLogger::~ThreadLocalLogger() [0x58313528adbf] #12: Envoy::ThreadLocal::InstanceImpl::shutdownThread() [0x58313560373a] #13: Envoy::Server::WorkerImpl::threadRoutine() [0x583135630c0a] #14: Envoy::Thread::ThreadImplPosix::ThreadImplPosix()::{lambda()#1}::__invoke() [0x5831364e88d5] #15: start_thread [0x7d80ab8f7609] ``` Risk Level: low Testing: unit tests Docs Changes: none Release Notes: none Platform Specific Features: none --------- Signed-off-by: Ohad Vano Signed-off-by: asingh-g --- source/common/tcp/async_tcp_client_impl.cc | 12 +++- source/common/tcp/async_tcp_client_impl.h | 2 + test/common/tcp/async_tcp_client_impl_test.cc | 13 +++- .../filters/test_network_async_tcp_filter.cc | 9 ++- .../test_network_async_tcp_filter.proto | 1 + .../tcp_async_client_integration_test.cc | 61 ++++++++++++++----- 6 files changed, 79 insertions(+), 19 deletions(-) diff --git a/source/common/tcp/async_tcp_client_impl.cc b/source/common/tcp/async_tcp_client_impl.cc index 4d6482cb5dc6..0d6ce288c8ed 100644 --- a/source/common/tcp/async_tcp_client_impl.cc +++ b/source/common/tcp/async_tcp_client_impl.cc @@ -24,6 +24,14 @@ AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher, connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })), enable_half_close_(enable_half_close) {} +AsyncTcpClientImpl::~AsyncTcpClientImpl() { + if (connection_) { + connection_->removeConnectionCallbacks(*this); + } + + close(Network::ConnectionCloseType::NoFlush); +} + bool AsyncTcpClientImpl::connect() { if (connection_) { return false; @@ -69,7 +77,8 @@ void AsyncTcpClientImpl::onConnectTimeout() { } void AsyncTcpClientImpl::close(Network::ConnectionCloseType type) { - if (connection_) { + if (connection_ && !closing_) { + closing_ = true; connection_->close(type); } } @@ -127,6 +136,7 @@ void AsyncTcpClientImpl::onEvent(Network::ConnectionEvent event) { detected_close_ = connection_->detectedCloseType(); } + closing_ = false; dispatcher_.deferredDelete(std::move(connection_)); if (callbacks_) { callbacks_->onEvent(event); diff --git a/source/common/tcp/async_tcp_client_impl.h b/source/common/tcp/async_tcp_client_impl.h index ef965ca68cc5..2f239b757028 100644 --- a/source/common/tcp/async_tcp_client_impl.h +++ b/source/common/tcp/async_tcp_client_impl.h @@ -28,6 +28,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient, AsyncTcpClientImpl(Event::Dispatcher& dispatcher, Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context, bool enable_half_close); + ~AsyncTcpClientImpl(); void close(Network::ConnectionCloseType type) override; @@ -106,6 +107,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient, Event::TimerPtr connect_timer_; AsyncTcpClientCallbacks* callbacks_{}; Network::DetectedCloseType detected_close_{Network::DetectedCloseType::Normal}; + bool closing_{false}; bool connected_{false}; bool enable_half_close_{false}; }; diff --git a/test/common/tcp/async_tcp_client_impl_test.cc b/test/common/tcp/async_tcp_client_impl_test.cc index f185545a1223..409808bdfde7 100644 --- a/test/common/tcp/async_tcp_client_impl_test.cc +++ b/test/common/tcp/async_tcp_client_impl_test.cc @@ -18,6 +18,15 @@ using testing::Return; namespace Envoy { namespace Tcp { +class CustomMockClientConnection : public Network::MockClientConnection { +public: + ~CustomMockClientConnection() { + if (state_ != Connection::State::Closed) { + raiseEvent(Network::ConnectionEvent::LocalClose); + } + }; +}; + class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public testing::Test { public: AsyncTcpClientImplTest() = default; @@ -32,7 +41,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test } void expectCreateConnection(bool trigger_connected = true) { - connection_ = new NiceMock(); + connection_ = new NiceMock(); Upstream::MockHost::MockCreateConnectionData conn_info; connection_->streamInfo().setAttemptCount(1); conn_info.connection_ = connection_; @@ -59,7 +68,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test NiceMock* connect_timer_; NiceMock dispatcher_; NiceMock cluster_manager_; - Network::MockClientConnection* connection_{}; + CustomMockClientConnection* connection_{}; NiceMock callbacks_; }; diff --git a/test/integration/filters/test_network_async_tcp_filter.cc b/test/integration/filters/test_network_async_tcp_filter.cc index 6116b2d01dc1..58c80d793153 100644 --- a/test/integration/filters/test_network_async_tcp_filter.cc +++ b/test/integration/filters/test_network_async_tcp_filter.cc @@ -41,7 +41,8 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter { const test::integration::filters::TestNetworkAsyncTcpFilterConfig& config, Stats::Scope& scope, Upstream::ClusterManager& cluster_manager) : stats_(generateStats("test_network_async_tcp_filter", scope)), - cluster_name_(config.cluster_name()), cluster_manager_(cluster_manager) { + cluster_name_(config.cluster_name()), kill_after_on_data_(config.kill_after_on_data()), + cluster_manager_(cluster_manager) { const auto thread_local_cluster = cluster_manager_.getThreadLocalCluster(cluster_name_); options_ = std::make_shared(true); if (thread_local_cluster != nullptr) { @@ -60,6 +61,11 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter { data.length()); client_->write(data, end_stream); + if (kill_after_on_data_) { + Tcp::AsyncTcpClient* c1 = client_.release(); + delete c1; + } + return Network::FilterStatus::StopIteration; } @@ -166,6 +172,7 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter { TestNetworkAsyncTcpFilterStats stats_; Tcp::AsyncTcpClientPtr client_; absl::string_view cluster_name_; + bool kill_after_on_data_; std::unique_ptr request_callbacks_; std::unique_ptr downstream_callbacks_; Upstream::ClusterManager& cluster_manager_; diff --git a/test/integration/filters/test_network_async_tcp_filter.proto b/test/integration/filters/test_network_async_tcp_filter.proto index bcb4d9beee34..fc84979375bb 100644 --- a/test/integration/filters/test_network_async_tcp_filter.proto +++ b/test/integration/filters/test_network_async_tcp_filter.proto @@ -4,4 +4,5 @@ package test.integration.filters; message TestNetworkAsyncTcpFilterConfig { string cluster_name = 1; + bool kill_after_on_data = 2; } diff --git a/test/integration/tcp_async_client_integration_test.cc b/test/integration/tcp_async_client_integration_test.cc index 89c4e29c1771..f0a9932bbc0a 100644 --- a/test/integration/tcp_async_client_integration_test.cc +++ b/test/integration/tcp_async_client_integration_test.cc @@ -1,3 +1,4 @@ +#include "test/integration/filters/test_network_async_tcp_filter.pb.h" #include "test/integration/integration.h" #include "gtest/gtest.h" @@ -16,15 +17,37 @@ class TcpAsyncClientIntegrationTest : public testing::TestWithParam void { + test::integration::filters::TestNetworkAsyncTcpFilterConfig proto_config; + TestUtility::loadFromYaml(yaml, proto_config); + + auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0); + auto* filter_chain = listener->mutable_filter_chains(0); + auto* filter = filter_chain->mutable_filters(0); + filter->mutable_typed_config()->PackFrom(proto_config); + }); + + BaseIntegrationTest::initialize(); + } }; INSTANTIATE_TEST_SUITE_P(IpVersions, TcpAsyncClientIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response"); @@ -51,8 +74,7 @@ TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) { } TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) { - enableHalfClose(true); - initialize(); + init(); std::string data_frame_1("data_frame_1"); std::string data_frame_2("data_frame_2"); @@ -85,8 +107,7 @@ TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) { } TEST_P(TcpAsyncClientIntegrationTest, MultipleResponseFrames) { - enableHalfClose(true); - initialize(); + init(); std::string data_frame_1("data_frame_1"); std::string response_1("response_1"); @@ -116,8 +137,7 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) { return; } - enableHalfClose(true); - initialize(); + init(); IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); ASSERT_TRUE(tcp_client->write("hello1", false)); @@ -143,11 +163,24 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) { test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 0); } +TEST_P(TcpAsyncClientIntegrationTest, ClientTearDown) { + init(true); + + std::string request("request"); + + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); + ASSERT_TRUE(tcp_client->write(request, true)); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + ASSERT_TRUE(fake_upstream_connection->waitForData(request.size())); + + tcp_client->close(); +} + #if ENVOY_PLATFORM_ENABLE_SEND_RST // Test if RST close can be detected from downstream and upstream is closed by RST. TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response"); @@ -178,8 +211,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) { // Test if RST close can be detected from upstream. TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response"); @@ -212,8 +244,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) { // the client. The behavior is different for windows, since RST support is literally supported for // unix like system, disabled the test for windows. TEST_P(TcpAsyncClientIntegrationTest, TestDownstremHalfClosedThenRST) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response");