diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index ef501d7ba0a2f..449fddb85f63a 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -184,8 +184,8 @@ RouteConstSharedPtr RouteMatcher::route(const MessageMetadata& metadata, void Router::onDestroy() { if (upstream_request_ != nullptr) { upstream_request_->resetStream(); + cleanup(); } - cleanup(); } void Router::setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) { @@ -354,10 +354,12 @@ void Router::onEvent(Network::ConnectionEvent event) { switch (event) { case Network::ConnectionEvent::RemoteClose: + ENVOY_STREAM_LOG(debug, "upstream remote close", *callbacks_); upstream_request_->onResetStream( Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); break; case Network::ConnectionEvent::LocalClose: + ENVOY_STREAM_LOG(debug, "upstream local close", *callbacks_); upstream_request_->onResetStream( Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure); break; @@ -365,6 +367,8 @@ void Router::onEvent(Network::ConnectionEvent event) { // Connected is consumed by the connection pool. NOT_REACHED_GCOVR_EXCL_LINE; } + + upstream_request_->releaseConnection(false); } const Network::Connection* Router::downstreamConnection() const { @@ -389,7 +393,11 @@ Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::In protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()), request_complete_(false), response_started_(false), response_complete_(false) {} -Router::UpstreamRequest::~UpstreamRequest() = default; +Router::UpstreamRequest::~UpstreamRequest() { + if (conn_pool_handle_) { + conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + } +} FilterStatus Router::UpstreamRequest::start() { Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this); @@ -407,18 +415,24 @@ FilterStatus Router::UpstreamRequest::start() { return FilterStatus::Continue; } -void Router::UpstreamRequest::resetStream() { +void Router::UpstreamRequest::releaseConnection(const bool close) { if (conn_pool_handle_) { conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + conn_pool_handle_ = nullptr; } - if (conn_data_ != nullptr) { - conn_state_ = nullptr; - conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); - conn_data_.reset(); + conn_state_ = nullptr; + + // The event triggered by close will also release this connection so clear conn_data_ before + // closing. + auto conn_data = std::move(conn_data_); + if (close && conn_data != nullptr) { + conn_data->connection().close(Network::ConnectionCloseType::NoFlush); } } +void Router::UpstreamRequest::resetStream() { releaseConnection(true); } + void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, Upstream::HostDescriptionConstSharedPtr host) { conn_pool_handle_ = nullptr; diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index 47cd9e929ecf8..d61c7fe7b59ac 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -206,6 +206,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, FilterStatus start(); void resetStream(); + void releaseConnection(bool close); // Tcp::ConnectionPool::Callbacks void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index 2e3f4dfe96b89..23f23c663d9ea 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -7,6 +7,7 @@ #include "gtest/gtest.h" using testing::Combine; +using testing::HasSubstr; using ::testing::TestParamInfo; using testing::Values; @@ -301,6 +302,37 @@ TEST_P(ThriftConnManagerIntegrationTest, EarlyCloseWithUpstream) { EXPECT_EQ(1U, counter->value()); } +// Regression test for https://github.com/envoyproxy/envoy/issues/9037. +TEST_P(ThriftConnManagerIntegrationTest, EarlyUpstreamClose) { + initializeCall(DriverMode::Success); + + const std::string partial_request = + request_bytes_.toString().substr(0, request_bytes_.length() - 5); + + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); + tcp_client->write(request_bytes_.toString()); + + FakeUpstream* expected_upstream = getExpectedUpstream(false); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection)); + + std::string data; + ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data)); + Buffer::OwnedImpl upstream_request(data); + EXPECT_EQ(request_bytes_.toString(), upstream_request.toString()); + + ASSERT_TRUE(fake_upstream_connection->close()); + + tcp_client->waitForDisconnect(); + + EXPECT_THAT(tcp_client->data(), HasSubstr("connection failure")); + + Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_exception"); + EXPECT_EQ(1U, counter->value()); +} + TEST_P(ThriftConnManagerIntegrationTest, Oneway) { initializeOneway(); diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index 457bbafdc4e14..5b29318b4997f 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -603,6 +603,27 @@ TEST_F(ThriftRouterTest, UnexpectedUpstreamLocalClose) { router_->onEvent(Network::ConnectionEvent::RemoteClose); } +// Regression test for https://github.com/envoyproxy/envoy/issues/9037. +TEST_F(ThriftRouterTest, DontCloseConnectionTwice) { + initializeRouter(); + startRequest(MessageType::Call); + connectUpstream(); + sendTrivialStruct(FieldType::String); + + EXPECT_CALL(callbacks_, sendLocalReply(_, _)) + .WillOnce(Invoke([&](const DirectResponse& response, bool end_stream) -> void { + auto& app_ex = dynamic_cast(response); + EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_); + EXPECT_THAT(app_ex.what(), ContainsRegex(".*connection failure.*")); + EXPECT_TRUE(end_stream); + })); + router_->onEvent(Network::ConnectionEvent::RemoteClose); + + // Connection close shouldn't happen in onDestroy(), since it's been handled. + EXPECT_CALL(upstream_connection_, close(Network::ConnectionCloseType::NoFlush)).Times(0); + destroyRouter(); +} + TEST_F(ThriftRouterTest, UnexpectedRouterDestroyBeforeUpstreamConnect) { initializeRouter(); startRequest(MessageType::Call);