diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index 1b5d51e43b5e9..0edb3d6cbb04c 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -87,8 +87,11 @@ void ConnectionManager::dispatch() { void ConnectionManager::sendLocalReply(MessageMetadata& metadata, const DirectResponse& response, bool end_stream) { - Buffer::OwnedImpl buffer; + if (read_callbacks_->connection().state() == Network::Connection::State::Closed) { + return; + } + Buffer::OwnedImpl buffer; const DirectResponse::ResponseType result = response.encode(metadata, *protocol_, buffer); Buffer::OwnedImpl response_buffer; @@ -204,6 +207,11 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { ConnectionManager& cm = parent_.parent_; + if (cm.read_callbacks_->connection().state() == Network::Connection::State::Closed) { + complete_ = true; + throw EnvoyException("downstream connection is closed"); + } + Buffer::OwnedImpl buffer; // Use the factory to get the concrete transport from the decoder transport (as opposed to diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 4336ea3262667..0af14a1d4a29d 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1204,6 +1204,44 @@ TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendsLocalErrorReply) { EXPECT_EQ(1U, store_.counter("test.response_error").value()); } +// sendLocalReply does nothing, when the remote closed the connection. +TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendLocalReplyRemoteClosedConnection) { + auto* filter = new NiceMock(); + custom_filter_.reset(filter); + + initializeFilter(); + writeFramedBinaryMessage(buffer_, MessageType::Call, 0x0F); + + ThriftFilters::DecoderFilterCallbacks* callbacks{}; + EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)) + .WillOnce( + Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; })); + EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_)); + + NiceMock direct_response; + EXPECT_CALL(direct_response, encode(_, _, _)).Times(0); + + // First filter sends local reply. + EXPECT_CALL(*filter, messageBegin(_)) + .WillOnce(Invoke([&](MessageMetadataSharedPtr) -> FilterStatus { + callbacks->sendLocalReply(direct_response, false); + return FilterStatus::StopIteration; + })); + EXPECT_CALL(filter_callbacks_.connection_, write(_, false)).Times(0); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(1); + + // Remote closes the connection. + filter_callbacks_.connection_.state_ = Network::Connection::State::Closed; + EXPECT_EQ(filter_->onData(buffer_, true), Network::FilterStatus::StopIteration); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + EXPECT_EQ(1U, store_.counter("test.request").value()); + EXPECT_EQ(1U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, store_.gauge("test.request_active").value()); + EXPECT_EQ(0U, store_.counter("test.response").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); +} + // Tests a decoder filter that modifies data. TEST_F(ThriftConnectionManagerTest, DecoderFiltersModifyRequests) { auto* filter = new NiceMock(); @@ -1252,6 +1290,33 @@ TEST_F(ThriftConnectionManagerTest, DecoderFiltersModifyRequests) { EXPECT_EQ(1U, store_.gauge("test.request_active").value()); } +TEST_F(ThriftConnectionManagerTest, transportEndWhenRemoteClose) { + initializeFilter(); + writeComplexFramedBinaryMessage(buffer_, MessageType::Call, 0x0F); + + ThriftFilters::DecoderFilterCallbacks* callbacks{}; + EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_)) + .WillOnce( + Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; })); + + EXPECT_EQ(filter_->onData(buffer_, false), Network::FilterStatus::StopIteration); + EXPECT_EQ(1U, store_.counter("test.request_call").value()); + + writeComplexFramedBinaryMessage(write_buffer_, MessageType::Reply, 0x0F); + + FramedTransportImpl transport; + BinaryProtocolImpl proto; + callbacks->startUpstreamResponse(transport, proto); + + // Remote closes the connection. + filter_callbacks_.connection_.state_ = Network::Connection::State::Closed; + EXPECT_EQ(ThriftFilters::ResponseStatus::Reset, callbacks->upstreamData(write_buffer_)); + EXPECT_EQ(0U, store_.counter("test.response").value()); + EXPECT_EQ(1U, store_.counter("test.response_decoding_error").value()); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); +} + } // namespace ThriftProxy } // namespace NetworkFilters } // namespace Extensions