From ac138f478672eae622c1f8e1d0c805d4ec95c716 Mon Sep 17 00:00:00 2001 From: Stephan Zuercher Date: Tue, 31 Jul 2018 13:25:33 -0700 Subject: [PATCH 1/3] thrift_proxy: fix oneway bugs The thrift_proxy had some bugs related to oneway messages, both when there is an upstream connection pool failure and when the downstream connection closes before the pool connection completes. Both are fixed with theses changes. In addition, modifies the test harness server to accept multiple simulataneous connections. *Risk Level*: low *Testing*: integration test added *Docs Changes*: n/a *Release Notes*: n/a Signed-off-by: Stephan Zuercher --- .../network/thrift_proxy/conn_manager.cc | 14 ++- .../thrift_proxy/router/router_impl.cc | 30 ++++- .../network/thrift_proxy/router/router_impl.h | 2 +- .../network/thrift_proxy/driver/server.py | 2 +- .../network/thrift_proxy/integration_test.cc | 16 +++ .../network/thrift_proxy/router_test.cc | 117 ++++++++++++++++++ 6 files changed, 170 insertions(+), 11 deletions(-) diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index c94bbeefcec36..757e0c38b6643 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -30,7 +30,7 @@ Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end void ConnectionManager::dispatch() { if (stopped_) { - ENVOY_LOG(error, "thrift filter stopped"); + ENVOY_CONN_LOG(debug, "thrift filter stopped", read_callbacks_->connection()); return; } @@ -44,7 +44,7 @@ void ConnectionManager::dispatch() { } } } catch (const EnvoyException& ex) { - ENVOY_LOG(error, "thrift error: {}", ex.what()); + ENVOY_CONN_LOG(error, "thrift error: {}", read_callbacks_->connection(), ex.what()); stats_.request_decoding_error_.inc(); // Use the current rpc to send an error downstream, if possible. @@ -56,6 +56,7 @@ void ConnectionManager::dispatch() { } void ConnectionManager::continueDecoding() { + ENVOY_CONN_LOG(debug, "thrift filter continued", read_callbacks_->connection()); stopped_ = false; dispatch(); } @@ -72,6 +73,9 @@ void ConnectionManager::resetAllRpcs() { void ConnectionManager::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { read_callbacks_ = &callbacks; + + read_callbacks_->connection().addConnectionCallbacks(*this); + read_callbacks_->connection().enableHalfClose(true); } void ConnectionManager::onEvent(Network::ConnectionEvent event) { @@ -87,7 +91,7 @@ void ConnectionManager::onEvent(Network::ConnectionEvent event) { } ThriftFilters::DecoderFilter& ConnectionManager::newDecoderFilter() { - ENVOY_LOG(debug, "new decoder filter"); + ENVOY_LOG(trace, "new decoder filter"); ActiveRpcPtr new_rpc(new ActiveRpc(*this)); new_rpc->createFilterChain(); @@ -270,7 +274,8 @@ bool ConnectionManager::ActiveRpc::upstreamData(Buffer::Instance& buffer) { } return complete; } catch (const EnvoyException& ex) { - ENVOY_LOG(error, "thrift response error: {}", ex.what()); + ENVOY_CONN_LOG(error, "thrift response error: {}", parent_.read_callbacks_->connection(), + ex.what()); parent_.stats_.response_decoding_error_.inc(); onError(ex.what()); @@ -281,7 +286,6 @@ bool ConnectionManager::ActiveRpc::upstreamData(Buffer::Instance& buffer) { void ConnectionManager::ActiveRpc::resetDownstreamConnection() { parent_.read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); - parent_.doDeferredRpcDestroy(*this); } } // namespace ThriftProxy diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 09d1ac142c2d5..f5d137dca0dff 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -133,8 +133,7 @@ ThriftFilters::FilterStatus Router::messageBegin(absl::string_view name, Message ENVOY_STREAM_LOG(debug, "router decoding request", *callbacks_); upstream_request_.reset(new UpstreamRequest(*this, *conn_pool, name, msg_type, seq_id)); - upstream_request_->start(); - return ThriftFilters::FilterStatus::StopIteration; + return upstream_request_->start(); } ThriftFilters::FilterStatus Router::messageEnd() { @@ -214,14 +213,22 @@ Router::UpstreamRequest::UpstreamRequest(Router& parent, Tcp::ConnectionPool::In Router::UpstreamRequest::~UpstreamRequest() {} -void Router::UpstreamRequest::start() { +ThriftFilters::FilterStatus Router::UpstreamRequest::start() { Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this); if (handle) { + // Pause while we wait for a connection. conn_pool_handle_ = handle; + return ThriftFilters::FilterStatus::StopIteration; } + + return ThriftFilters::FilterStatus::Continue; } void Router::UpstreamRequest::resetStream() { + if (conn_pool_handle_) { + conn_pool_handle_->cancel(); + } + if (conn_data_ != nullptr) { conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); conn_data_.reset(); @@ -230,6 +237,8 @@ void Router::UpstreamRequest::resetStream() { void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, Upstream::HostDescriptionConstSharedPtr host) { + conn_pool_handle_ = nullptr; + // Mimic an upstream reset. onUpstreamHostSelected(host); onResetStream(reason); @@ -237,6 +246,9 @@ void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReas void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Upstream::HostDescriptionConstSharedPtr host) { + // Only invoke continueDecoding if we'd previously stopped the filter chain. + bool continue_decoding = conn_pool_handle_ != nullptr; + onUpstreamHostSelected(host); conn_data_ = std::move(conn_data); conn_data_->addUpstreamCallbacks(parent_); @@ -256,7 +268,9 @@ void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr // TODO(zuercher): need to use an upstream-connection-specific sequence id parent_.convertMessageBegin(method_name_, msg_type_, seq_id_); - parent_.callbacks_->continueDecoding(); + if (continue_decoding) { + parent_.callbacks_->continueDecoding(); + } } void Router::UpstreamRequest::onRequestComplete() { request_complete_ = true; } @@ -271,6 +285,13 @@ void Router::UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionCo } void Router::UpstreamRequest::onResetStream(Tcp::ConnectionPool::PoolFailureReason reason) { + if (msg_type_ == MessageType::Oneway) { + // For oneway requests, we should not attempt a response. Reset the downstream to signal + // an error. + parent_.callbacks_->resetDownstreamConnection(); + return; + } + switch (reason) { case Tcp::ConnectionPool::PoolFailureReason::Overflow: parent_.callbacks_->sendLocalReply(ThriftFilters::DirectResponsePtr{new AppException( @@ -288,6 +309,7 @@ void Router::UpstreamRequest::onResetStream(Tcp::ConnectionPool::PoolFailureReas return; } + // Error occurred after a partial response, propagate the reset to the downstream. parent_.callbacks_->resetDownstreamConnection(); break; default: diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index d298d38b354cd..d6dac074afc7e 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -106,7 +106,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, absl::string_view method_name, MessageType msg_type, int32_t seq_id); ~UpstreamRequest(); - void start(); + ThriftFilters::FilterStatus start(); void resetStream(); // Tcp::ConnectionPool::Callbacks diff --git a/test/extensions/filters/network/thrift_proxy/driver/server.py b/test/extensions/filters/network/thrift_proxy/driver/server.py index 094a8d2338bfd..7ab9e862b4e97 100755 --- a/test/extensions/filters/network/thrift_proxy/driver/server.py +++ b/test/extensions/filters/network/thrift_proxy/driver/server.py @@ -157,7 +157,7 @@ def main(cfg): elif cfg.response == "exception": print("Thrift Server will throw Thrift exceptions for all messages") - server = TServer.TSimpleServer(processor, transport, transport_factory, protocol_factory) + server = TServer.TThreadedServer(processor, transport, transport_factory, protocol_factory) try: server.serve() except KeyboardInterrupt: diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index fe2b5cafb054a..efff2f6d7dfe1 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -263,6 +263,22 @@ TEST_P(ThriftConnManagerIntegrationTest, Oneway) { EXPECT_EQ(1U, counter->value()); } +TEST_P(ThriftConnManagerIntegrationTest, OnewayEarlyClose) { + initializeOneway(); + + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); + tcp_client->write(request_bytes_.toString()); + tcp_client->close(); + + FakeRawConnectionPtr fake_upstream_connection = fake_upstreams_[0]->waitForRawConnection(); + Buffer::OwnedImpl upstream_request( + fake_upstream_connection->waitForData(request_bytes_.length())); + EXPECT_EQ(request_bytes_.toString(), upstream_request.toString()); + + Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_oneway"); + EXPECT_EQ(1U, counter->value()); +} + } // namespace ThriftProxy } // namespace NetworkFilters } // namespace Extensions diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index 0fe593ed2cddd..6fe5eea92aac1 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -129,6 +129,52 @@ class ThriftRouterTestBase { EXPECT_NE(nullptr, upstream_callbacks_); } + void startRequestWithExistingConnection(MessageType msg_type) { + msg_type_ = msg_type; + + EXPECT_EQ(ThriftFilters::FilterStatus::Continue, router_->transportBegin({})); + + EXPECT_CALL(callbacks_, route()).WillOnce(Return(route_ptr_)); + EXPECT_CALL(*route_, routeEntry()).WillOnce(Return(&route_entry_)); + EXPECT_CALL(route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name_)); + + EXPECT_CALL(*context_.cluster_manager_.tcp_conn_pool_.connection_data_, addUpstreamCallbacks(_)) + .WillOnce(Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { + upstream_callbacks_ = &cb; + })); + + NiceMock connection; + EXPECT_CALL(callbacks_, connection()).WillRepeatedly(Return(&connection)); + EXPECT_EQ(&connection, router_->downstreamConnection()); + + // Not yet implemented: + EXPECT_EQ(absl::optional(), router_->computeHashKey()); + EXPECT_EQ(nullptr, router_->metadataMatchCriteria()); + EXPECT_EQ(nullptr, router_->downstreamHeaders()); + + EXPECT_CALL(callbacks_, downstreamTransportType()).WillOnce(Return(TransportType::Framed)); + transport_ = new NiceMock(); + ON_CALL(*transport_, type()).WillByDefault(Return(TransportType::Framed)); + + EXPECT_CALL(callbacks_, downstreamProtocolType()).WillOnce(Return(ProtocolType::Binary)); + protocol_ = new NiceMock(); + ON_CALL(*protocol_, type()).WillByDefault(Return(ProtocolType::Binary)); + EXPECT_CALL(*protocol_, writeMessageBegin(_, method_name_, msg_type_, seq_id_)); + + EXPECT_CALL(callbacks_, continueDecoding()).Times(0); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, newConnection(_)) + .WillOnce( + Invoke([&](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* { + context_.cluster_manager_.tcp_conn_pool_.newConnectionImpl(cb); + context_.cluster_manager_.tcp_conn_pool_.poolReady(upstream_connection_); + return nullptr; + })); + + EXPECT_EQ(ThriftFilters::FilterStatus::Continue, + router_->messageBegin(method_name_, msg_type_, seq_id_)); + EXPECT_NE(nullptr, upstream_callbacks_); + } + void sendTrivialStruct(FieldType field_type) { EXPECT_CALL(*protocol_, writeStructBegin(_, "")); EXPECT_EQ(ThriftFilters::FilterStatus::Continue, router_->structBegin({})); @@ -334,6 +380,18 @@ TEST_F(ThriftRouterTest, PoolOverflowFailure) { Tcp::ConnectionPool::PoolFailureReason::Overflow); } +TEST_F(ThriftRouterTest, PoolConnectionFailureWithOnewayMessage) { + initializeRouter(); + startRequest(MessageType::Oneway); + + EXPECT_CALL(callbacks_, sendLocalReply_(_)).Times(0); + EXPECT_CALL(callbacks_, resetDownstreamConnection()); + context_.cluster_manager_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + + destroyRouter(); +} + TEST_F(ThriftRouterTest, NoRoute) { initializeRouter(); @@ -435,6 +493,53 @@ TEST_F(ThriftRouterTest, TruncatedResponse) { destroyRouter(); } +TEST_F(ThriftRouterTest, UpstreamRemoteCloseMidResponse) { + initializeRouter(); + startRequest(MessageType::Call); + connectUpstream(); + + EXPECT_CALL(callbacks_, sendLocalReply_(_)) + .WillOnce(Invoke([&](ThriftFilters::DirectResponsePtr& response) -> void { + auto* app_ex = dynamic_cast(response.get()); + EXPECT_NE(nullptr, app_ex); + EXPECT_EQ(method_name_, app_ex->method_name_); + EXPECT_EQ(seq_id_, app_ex->seq_id_); + EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); + EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); + })); + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + destroyRouter(); +} + +TEST_F(ThriftRouterTest, UpstreamLocalCloseMidResponse) { + initializeRouter(); + startRequest(MessageType::Call); + connectUpstream(); + + EXPECT_CALL(callbacks_, sendLocalReply_(_)) + .WillOnce(Invoke([&](ThriftFilters::DirectResponsePtr& response) -> void { + auto* app_ex = dynamic_cast(response.get()); + EXPECT_NE(nullptr, app_ex); + EXPECT_EQ(method_name_, app_ex->method_name_); + EXPECT_EQ(seq_id_, app_ex->seq_id_); + EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); + EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); + })); + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + destroyRouter(); +} + +TEST_F(ThriftRouterTest, UpstreamCloseAfterResponse) { + initializeRouter(); + startRequest(MessageType::Call); + connectUpstream(); + sendTrivialStruct(FieldType::String); + completeRequest(); + + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + destroyRouter(); +} + TEST_F(ThriftRouterTest, UpstreamDataTriggersReset) { initializeRouter(); startRequest(MessageType::Call); @@ -459,6 +564,9 @@ TEST_F(ThriftRouterTest, UpstreamDataTriggersReset) { TEST_F(ThriftRouterTest, UnexpectedRouterDestroyBeforeUpstreamConnect) { initializeRouter(); startRequest(MessageType::Call); + + EXPECT_EQ(1, context_.cluster_manager_.tcp_conn_pool_.handles_.size()); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_.handles_.front(), cancel()); destroyRouter(); } @@ -493,6 +601,15 @@ TEST_P(ThriftRouterFieldTypeTest, Call) { destroyRouter(); } +TEST_F(ThriftRouterTest, CallWithExistingConnection) { + initializeRouter(); + startRequestWithExistingConnection(MessageType::Call); + sendTrivialStruct(FieldType::I32); + completeRequest(); + returnResponse(); + destroyRouter(); +} + TEST_P(ThriftRouterContainerTest, DecoderFilterCallbacks) { FieldType field_type = GetParam(); From 659fdfdeaef45a00a5ff00fdd84e6dd82ce8916b Mon Sep 17 00:00:00 2001 From: Stephan Zuercher Date: Wed, 1 Aug 2018 19:47:24 -0700 Subject: [PATCH 2/3] bump ci Signed-off-by: Stephan Zuercher From e992ae113f48a18dc7461cd56fe8c4920906b27f Mon Sep 17 00:00:00 2001 From: Stephan Zuercher Date: Thu, 9 Aug 2018 11:24:43 -0700 Subject: [PATCH 3/3] fix merge Signed-off-by: Stephan Zuercher --- .../thrift_proxy/router/router_impl.cc | 2 +- .../network/thrift_proxy/integration_test.cc | 8 ++-- .../network/thrift_proxy/router_test.cc | 42 +++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 97039463249e7..2340ece251d41 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -286,7 +286,7 @@ void Router::UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionCo } void Router::UpstreamRequest::onResetStream(Tcp::ConnectionPool::PoolFailureReason reason) { - if (msg_type_ == MessageType::Oneway) { + if (metadata_->messageType() == MessageType::Oneway) { // For oneway requests, we should not attempt a response. Reset the downstream to signal // an error. parent_.callbacks_->resetDownstreamConnection(); diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index 03887de5a3060..6d84beee5e1b0 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -279,9 +279,11 @@ TEST_P(ThriftConnManagerIntegrationTest, OnewayEarlyClose) { tcp_client->write(request_bytes_.toString()); tcp_client->close(); - FakeRawConnectionPtr fake_upstream_connection = fake_upstreams_[0]->waitForRawConnection(); - Buffer::OwnedImpl upstream_request( - fake_upstream_connection->waitForData(request_bytes_.length())); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + std::string data; + ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data)); + Buffer::OwnedImpl upstream_request(data); EXPECT_EQ(request_bytes_.toString(), upstream_request.toString()); Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_oneway"); diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index cd5be964f4d53..93fb798460ff9 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -143,14 +143,14 @@ class ThriftRouterTestBase { } void startRequestWithExistingConnection(MessageType msg_type) { - msg_type_ = msg_type; - EXPECT_EQ(ThriftFilters::FilterStatus::Continue, router_->transportBegin({})); EXPECT_CALL(callbacks_, route()).WillOnce(Return(route_ptr_)); EXPECT_CALL(*route_, routeEntry()).WillOnce(Return(&route_entry_)); EXPECT_CALL(route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name_)); + initializeMetadata(msg_type); + EXPECT_CALL(*context_.cluster_manager_.tcp_conn_pool_.connection_data_, addUpstreamCallbacks(_)) .WillOnce(Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { upstream_callbacks_ = &cb; @@ -172,7 +172,12 @@ class ThriftRouterTestBase { EXPECT_CALL(callbacks_, downstreamProtocolType()).WillOnce(Return(ProtocolType::Binary)); protocol_ = new NiceMock(); ON_CALL(*protocol_, type()).WillByDefault(Return(ProtocolType::Binary)); - EXPECT_CALL(*protocol_, writeMessageBegin(_, method_name_, msg_type_, seq_id_)); + EXPECT_CALL(*protocol_, writeMessageBegin(_, _)) + .WillOnce(Invoke([&](Buffer::Instance&, const MessageMetadata& metadata) -> void { + EXPECT_EQ(metadata_->methodName(), metadata.methodName()); + EXPECT_EQ(metadata_->messageType(), metadata.messageType()); + EXPECT_EQ(metadata_->sequenceId(), metadata.sequenceId()); + })); EXPECT_CALL(callbacks_, continueDecoding()).Times(0); EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_, newConnection(_)) @@ -183,8 +188,7 @@ class ThriftRouterTestBase { return nullptr; })); - EXPECT_EQ(ThriftFilters::FilterStatus::Continue, - router_->messageBegin(method_name_, msg_type_, seq_id_)); + EXPECT_EQ(ThriftFilters::FilterStatus::Continue, router_->messageBegin(metadata_)); EXPECT_NE(nullptr, upstream_callbacks_); } @@ -384,7 +388,7 @@ TEST_F(ThriftRouterTest, PoolConnectionFailureWithOnewayMessage) { initializeRouter(); startRequest(MessageType::Oneway); - EXPECT_CALL(callbacks_, sendLocalReply_(_)).Times(0); + EXPECT_CALL(callbacks_, sendLocalReply(_)).Times(0); EXPECT_CALL(callbacks_, resetDownstreamConnection()); context_.cluster_manager_.tcp_conn_pool_.poolFailure( Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure); @@ -485,14 +489,11 @@ TEST_F(ThriftRouterTest, UpstreamRemoteCloseMidResponse) { startRequest(MessageType::Call); connectUpstream(); - EXPECT_CALL(callbacks_, sendLocalReply_(_)) - .WillOnce(Invoke([&](ThriftFilters::DirectResponsePtr& response) -> void { - auto* app_ex = dynamic_cast(response.get()); - EXPECT_NE(nullptr, app_ex); - EXPECT_EQ(method_name_, app_ex->method_name_); - EXPECT_EQ(seq_id_, app_ex->seq_id_); - EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); - EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); + EXPECT_CALL(callbacks_, sendLocalReply(_)) + .WillOnce(Invoke([&](const DirectResponse& response) -> void { + auto& app_ex = dynamic_cast(response); + EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_); + EXPECT_THAT(app_ex.what(), ContainsRegex(".*connection failure.*")); })); upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); destroyRouter(); @@ -503,14 +504,11 @@ TEST_F(ThriftRouterTest, UpstreamLocalCloseMidResponse) { startRequest(MessageType::Call); connectUpstream(); - EXPECT_CALL(callbacks_, sendLocalReply_(_)) - .WillOnce(Invoke([&](ThriftFilters::DirectResponsePtr& response) -> void { - auto* app_ex = dynamic_cast(response.get()); - EXPECT_NE(nullptr, app_ex); - EXPECT_EQ(method_name_, app_ex->method_name_); - EXPECT_EQ(seq_id_, app_ex->seq_id_); - EXPECT_EQ(AppExceptionType::InternalError, app_ex->type_); - EXPECT_THAT(app_ex->error_message_, ContainsRegex(".*connection failure.*")); + EXPECT_CALL(callbacks_, sendLocalReply(_)) + .WillOnce(Invoke([&](const DirectResponse& response) -> void { + auto& app_ex = dynamic_cast(response); + EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_); + EXPECT_THAT(app_ex.what(), ContainsRegex(".*connection failure.*")); })); upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); destroyRouter();