Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ RouteConstSharedPtr RouteMatcher::route(const MessageMetadata& metadata,
void Router::onDestroy() {
if (upstream_request_ != nullptr) {
upstream_request_->resetStream();
cleanup();
}
cleanup();
}

void Router::setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) {
Expand Down Expand Up @@ -354,17 +354,21 @@ void Router::onEvent(Network::ConnectionEvent event) {

switch (event) {
case Network::ConnectionEvent::RemoteClose:
ENVOY_STREAM_LOG(debug, "upstream remote close", *callbacks_);
upstream_request_->onResetStream(
Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure);
break;
case Network::ConnectionEvent::LocalClose:
ENVOY_STREAM_LOG(debug, "upstream local close", *callbacks_);
upstream_request_->onResetStream(
Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure);
break;
default:
// Connected is consumed by the connection pool.
NOT_REACHED_GCOVR_EXCL_LINE;
}

upstream_request_->releaseConnection(false);
}

const Network::Connection* Router::downstreamConnection() const {
Expand All @@ -389,7 +393,11 @@ Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::In
protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()),
request_complete_(false), response_started_(false), response_complete_(false) {}

Router::UpstreamRequest::~UpstreamRequest() = default;
Router::UpstreamRequest::~UpstreamRequest() {
if (conn_pool_handle_) {
conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
}
}

FilterStatus Router::UpstreamRequest::start() {
Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this);
Expand All @@ -407,18 +415,24 @@ FilterStatus Router::UpstreamRequest::start() {
return FilterStatus::Continue;
}

void Router::UpstreamRequest::resetStream() {
void Router::UpstreamRequest::releaseConnection(const bool close) {
if (conn_pool_handle_) {
conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
conn_pool_handle_ = nullptr;
}

if (conn_data_ != nullptr) {
conn_state_ = nullptr;
conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
conn_data_.reset();
conn_state_ = nullptr;

// The event triggered by close will also release this connection so clear conn_data_ before
// closing.
auto conn_data = std::move(conn_data_);
if (close && conn_data != nullptr) {
conn_data->connection().close(Network::ConnectionCloseType::NoFlush);
}
}

void Router::UpstreamRequest::resetStream() { releaseConnection(true); }

void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
conn_pool_handle_ = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,

FilterStatus start();
void resetStream();
void releaseConnection(bool close);

// Tcp::ConnectionPool::Callbacks
void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason,
Expand Down
32 changes: 32 additions & 0 deletions test/extensions/filters/network/thrift_proxy/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "gtest/gtest.h"

using testing::Combine;
using testing::HasSubstr;
using ::testing::TestParamInfo;
using testing::Values;

Expand Down Expand Up @@ -301,6 +302,37 @@ TEST_P(ThriftConnManagerIntegrationTest, EarlyCloseWithUpstream) {
EXPECT_EQ(1U, counter->value());
}

// Regression test for https://github.com/envoyproxy/envoy/issues/9037.
TEST_P(ThriftConnManagerIntegrationTest, EarlyUpstreamClose) {
initializeCall(DriverMode::Success);

const std::string partial_request =
request_bytes_.toString().substr(0, request_bytes_.length() - 5);

IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
tcp_client->write(request_bytes_.toString());

FakeUpstream* expected_upstream = getExpectedUpstream(false);
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));

std::string data;
ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data));
Buffer::OwnedImpl upstream_request(data);
EXPECT_EQ(request_bytes_.toString(), upstream_request.toString());

ASSERT_TRUE(fake_upstream_connection->close());

tcp_client->waitForDisconnect();

EXPECT_THAT(tcp_client->data(), HasSubstr("connection failure"));

Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_exception");
EXPECT_EQ(1U, counter->value());
}

TEST_P(ThriftConnManagerIntegrationTest, Oneway) {
initializeOneway();

Expand Down
21 changes: 21 additions & 0 deletions test/extensions/filters/network/thrift_proxy/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,27 @@ TEST_F(ThriftRouterTest, UnexpectedUpstreamLocalClose) {
router_->onEvent(Network::ConnectionEvent::RemoteClose);
}

// Regression test for https://github.com/envoyproxy/envoy/issues/9037.
TEST_F(ThriftRouterTest, DontCloseConnectionTwice) {
initializeRouter();
startRequest(MessageType::Call);
connectUpstream();
sendTrivialStruct(FieldType::String);

EXPECT_CALL(callbacks_, sendLocalReply(_, _))
.WillOnce(Invoke([&](const DirectResponse& response, bool end_stream) -> void {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(app_ex.what(), ContainsRegex(".*connection failure.*"));
EXPECT_TRUE(end_stream);
}));
router_->onEvent(Network::ConnectionEvent::RemoteClose);

// Connection close shouldn't happen in onDestroy(), since it's been handled.
EXPECT_CALL(upstream_connection_, close(Network::ConnectionCloseType::NoFlush)).Times(0);
destroyRouter();
}

TEST_F(ThriftRouterTest, UnexpectedRouterDestroyBeforeUpstreamConnect) {
initializeRouter();
startRequest(MessageType::Call);
Expand Down