Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,21 @@ bool UpstreamRequest::onResetStream(ConnectionPool::PoolFailureReason reason) {
case ConnectionPool::PoolFailureReason::Timeout:
upstream_host_->outlierDetector().putResult(poolFailureReasonToResult(reason));

// Error occurred after a partial or underflow response, propagate the reset to the
// downstream.
if (response_underflow_ || response_state_ == ResponseState::Started) {
ENVOY_LOG(debug, "reset downstream connection for a partial or underflow response");
if (response_underflow_) {
stats_.incCloseUnderflowResponse(parent_.cluster());
} else {
stats_.incClosePartialResponse(parent_.cluster());
}
// Error occurred after an underflow response, propagate the reset to the downstream.
if (response_underflow_) {
ENVOY_LOG(debug, "reset downstream connection for an underflow response");
stats_.incCloseUnderflowResponse(parent_.cluster());
parent_.resetDownstreamConnection();
} else if (response_state_ == ResponseState::None) {
} else if (response_state_ <= ResponseState::Started) {
close_downstream = close_downstream_on_error_;
if (response_state_ == ResponseState::Started) {
stats_.incClosePartialResponse(parent_.cluster());
}
stats_.incResponseLocalException(parent_.cluster());
parent_.sendLocalReply(
AppException(AppExceptionType::InternalError,
fmt::format("connection failure: {} '{}'",
fmt::format("connection failure before response {}: {} '{}'",
response_state_ == ResponseState::None ? "start" : "complete",
PoolFailureReasonNames::get().fromReason(reason),
(upstream_host_ && upstream_host_->address())
? upstream_host_->address()->asString()
Expand Down
26 changes: 20 additions & 6 deletions test/extensions/filters/network/thrift_proxy/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ TEST_P(ThriftRouterRainidayTest, PoolRemoteConnectionFailure) {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(app_ex.what(),
ContainsRegex(".*connection failure: remote connection failure.*"));
ContainsRegex(
".*connection failure before response start: remote connection failure.*"));
EXPECT_EQ(GetParam(), end_stream);
}));
EXPECT_CALL(callbacks_, continueDecoding()).Times(GetParam() ? 0 : 1);
Expand Down Expand Up @@ -780,7 +781,8 @@ TEST_P(ThriftRouterRainidayTest, PoolTimeout) {
.WillOnce(Invoke([&](const DirectResponse& response, bool end_stream) -> void {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(app_ex.what(), ContainsRegex(".*connection failure: timeout.*"));
EXPECT_THAT(app_ex.what(),
ContainsRegex(".*connection failure before response start: timeout.*"));
EXPECT_EQ(GetParam(), end_stream);
}));
EXPECT_CALL(
Expand Down Expand Up @@ -1043,7 +1045,8 @@ TEST_P(ThriftRouterRainidayTest, UnexpectedUpstreamRemoteClose) {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(app_ex.what(),
ContainsRegex(".*connection failure: remote connection failure.*"));
ContainsRegex(
".*connection failure before response start: remote connection failure.*"));
EXPECT_EQ(GetParam(), end_stream);
}));
EXPECT_CALL(callbacks_, onReset()).Times(0);
Expand Down Expand Up @@ -1076,7 +1079,8 @@ TEST_F(ThriftRouterTest, DontCloseConnectionTwice) {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(app_ex.what(),
ContainsRegex(".*connection failure: remote connection failure.*"));
ContainsRegex(
".*connection failure before response start: remote connection failure.*"));
EXPECT_TRUE(end_stream);
}));
router_->onEvent(Network::ConnectionEvent::RemoteClose);
Expand Down Expand Up @@ -1343,7 +1347,8 @@ TEST_F(ThriftRouterTest, PoolTimeoutUpstreamTimeMeasurement) {
.WillOnce(Invoke([&](const DirectResponse& response, bool end_stream) -> void {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(app_ex.what(), ContainsRegex(".*connection failure: timeout.*"));
EXPECT_THAT(app_ex.what(),
ContainsRegex(".*connection failure before response start: timeout.*"));
EXPECT_TRUE(end_stream);
}));
context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolFailure(
Expand Down Expand Up @@ -1751,7 +1756,16 @@ TEST_F(ThriftRouterTest, UpstreamDraining) {
TEST_F(ThriftRouterTest, UpstreamPartialResponse) {
initializeRouter();

EXPECT_CALL(callbacks_, resetDownstreamConnection());
EXPECT_CALL(callbacks_, sendLocalReply(_, _))
.WillOnce(Invoke([&](const DirectResponse& response, bool end_stream) -> void {
auto& app_ex = dynamic_cast<const AppException&>(response);
EXPECT_EQ(AppExceptionType::InternalError, app_ex.type_);
EXPECT_THAT(
app_ex.what(),
ContainsRegex(
".*connection failure before response complete: local connection failure.*"));
EXPECT_TRUE(end_stream);
}));

startRequestWithExistingConnection(MessageType::Call);
sendTrivialStruct(FieldType::I32);
Expand Down