From 6ec9070f33debec7ed24e34292f2d90581cab470 Mon Sep 17 00:00:00 2001 From: Raul Gutierrez Segales Date: Wed, 10 Apr 2019 14:33:34 -0700 Subject: [PATCH 1/6] Fix crash when remote closes the connection There's a few paths within the Thrift Proxy where we should ensure the connection is not closed, before trying to write. This change ensures that sendLocalReply() will return early if the connection is gone. It also adds a check for transformEnd(), which gets called from upstreamData(). Signed-off-by: Raul Gutierrez Segales --- .../filters/network/thrift_proxy/conn_manager.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index 1b5d51e43b5e9..57426f4798793 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -87,8 +87,11 @@ void ConnectionManager::dispatch() { void ConnectionManager::sendLocalReply(MessageMetadata& metadata, const DirectResponse& response, bool end_stream) { - Buffer::OwnedImpl buffer; + if (read_callbacks_->connection().state() == Network::Connection::State::Closed) { + return; + } + Buffer::OwnedImpl buffer; const DirectResponse::ResponseType result = response.encode(metadata, *protocol_, buffer); Buffer::OwnedImpl response_buffer; @@ -204,6 +207,11 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { ConnectionManager& cm = parent_.parent_; + if (cm.read_callbacks_->connection().state() == Network::Connection::State::Closed) { + complete_ = true; + throw EnvoyException("thrift downstream gone"); + } + Buffer::OwnedImpl buffer; // Use the factory to get the concrete transport from the decoder transport (as opposed to From c7abdcfce71d57487e4e08f23ad97e0935874cb1 Mon Sep 17 00:00:00 2001 From: Raul Gutierrez Segales Date: Wed, 10 Apr 2019 16:43:05 -0700 Subject: [PATCH 2/6] Test sendLocalReply() when remote closed Signed-off-by: Raul Gutierrez Segales --- .../network/thrift_proxy/conn_manager_test.cc | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 4336ea3262667..1d2559972697f 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1204,6 +1204,46 @@ TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendsLocalErrorReply) { EXPECT_EQ(1U, store_.counter("test.response_error").value()); } +// sendLocalReply does nothing, when the remote closed the connection. +TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendLocalReplyRemoteClosedConnection) { + auto* filter = new NiceMock(); + custom_filter_.reset(filter); + + initializeFilter(); + writeFramedBinaryMessage(buffer_, MessageType::Call, 0x0F); + + ThriftFilters::DecoderFilterCallbacks* callbacks{}; + EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)) + .WillOnce( + Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; })); + EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_)); + + NiceMock direct_response; + EXPECT_CALL(direct_response, encode(_, _, _)).Times(0); + + // First filter sends local reply. + EXPECT_CALL(*filter, messageBegin(_)) + .WillOnce(Invoke([&](MessageMetadataSharedPtr) -> FilterStatus { + callbacks->sendLocalReply(direct_response, false); + return FilterStatus::StopIteration; + })); + EXPECT_CALL(filter_callbacks_.connection_, write(_, false)).Times(0); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(1); + + // Remote closes the connection. + filter_callbacks_.connection_.state_ = Network::Connection::State::Closed; + EXPECT_EQ(filter_->onData(buffer_, true), Network::FilterStatus::StopIteration); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + EXPECT_EQ(1U, store_.counter("test.request").value()); + EXPECT_EQ(1U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, store_.gauge("test.request_active").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); + + // This might not be true, if we haven't decoded a full request. + // EXPECT_EQ(1U, store_.counter("test.cx_destroy_remote_with_active_rq").value()); +} + // Tests a decoder filter that modifies data. TEST_F(ThriftConnectionManagerTest, DecoderFiltersModifyRequests) { auto* filter = new NiceMock(); From 871bc6c5815db8b3491a3f00aa1bc098c727bb6f Mon Sep 17 00:00:00 2001 From: Raul Gutierrez Segales Date: Thu, 11 Apr 2019 14:43:08 -0700 Subject: [PATCH 3/6] Add unit test for ResponseDecoder::transportEnd() This ensures we correctly handle the case when the remote downstream already closed the connection. Signed-off-by: Raul Gutierrez Segales --- .../network/thrift_proxy/conn_manager_test.cc | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 1d2559972697f..5a8a391d0a67c 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1292,6 +1292,29 @@ TEST_F(ThriftConnectionManagerTest, DecoderFiltersModifyRequests) { EXPECT_EQ(1U, store_.gauge("test.request_active").value()); } +TEST_F(ThriftConnectionManagerTest, transportEndWhenRemoteClose) { + initializeFilter(); + writeComplexFramedBinaryMessage(buffer_, MessageType::Call, 0x0F); + + ThriftFilters::DecoderFilterCallbacks* callbacks{}; + EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_)) + .WillOnce( + Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; })); + + EXPECT_EQ(filter_->onData(buffer_, false), Network::FilterStatus::StopIteration); + EXPECT_EQ(1U, store_.counter("test.request_call").value()); + + writeComplexFramedBinaryMessage(write_buffer_, MessageType::Reply, 0x0F); + + FramedTransportImpl transport; + BinaryProtocolImpl proto; + callbacks->startUpstreamResponse(transport, proto); + + // Remote closes the connection. + filter_callbacks_.connection_.state_ = Network::Connection::State::Closed; + EXPECT_EQ(ThriftFilters::ResponseStatus::Reset, callbacks->upstreamData(write_buffer_)); +} + } // namespace ThriftProxy } // namespace NetworkFilters } // namespace Extensions From b4d9d7c187797d9128e0fead655500ce21aeae90 Mon Sep 17 00:00:00 2001 From: Raul Gutierrez Segales Date: Thu, 11 Apr 2019 14:52:23 -0700 Subject: [PATCH 4/6] Drop comment about cx_destroy_remote_with_active_rq Signed-off-by: Raul Gutierrez Segales --- .../filters/network/thrift_proxy/conn_manager_test.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 5a8a391d0a67c..84bab4a52af60 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1239,9 +1239,6 @@ TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendLocalReplyRemoteClosedCo EXPECT_EQ(1U, store_.counter("test.request_call").value()); EXPECT_EQ(0U, store_.gauge("test.request_active").value()); EXPECT_EQ(0U, store_.counter("test.response_error").value()); - - // This might not be true, if we haven't decoded a full request. - // EXPECT_EQ(1U, store_.counter("test.cx_destroy_remote_with_active_rq").value()); } // Tests a decoder filter that modifies data. From 24d710251af07a77217081acfb21e7433d2ce46f Mon Sep 17 00:00:00 2001 From: Raul Gutierrez Segales Date: Thu, 11 Apr 2019 15:01:18 -0700 Subject: [PATCH 5/6] Call clearDeferredDeleteList Signed-off-by: Raul Gutierrez Segales --- .../filters/network/thrift_proxy/conn_manager_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 84bab4a52af60..302f61aa86df2 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1310,6 +1310,8 @@ TEST_F(ThriftConnectionManagerTest, transportEndWhenRemoteClose) { // Remote closes the connection. filter_callbacks_.connection_.state_ = Network::Connection::State::Closed; EXPECT_EQ(ThriftFilters::ResponseStatus::Reset, callbacks->upstreamData(write_buffer_)); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); } } // namespace ThriftProxy From 0ca40e5ec97b1afc50a63ec20765209095166306 Mon Sep 17 00:00:00 2001 From: Raul Gutierrez Segales Date: Thu, 11 Apr 2019 21:31:12 -0700 Subject: [PATCH 6/6] @fishcakez's review Signed-off-by: Raul Gutierrez Segales --- source/extensions/filters/network/thrift_proxy/conn_manager.cc | 2 +- .../filters/network/thrift_proxy/conn_manager_test.cc | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index 57426f4798793..0edb3d6cbb04c 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -209,7 +209,7 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { if (cm.read_callbacks_->connection().state() == Network::Connection::State::Closed) { complete_ = true; - throw EnvoyException("thrift downstream gone"); + throw EnvoyException("downstream connection is closed"); } Buffer::OwnedImpl buffer; diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 302f61aa86df2..0af14a1d4a29d 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1238,6 +1238,7 @@ TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendLocalReplyRemoteClosedCo EXPECT_EQ(1U, store_.counter("test.request").value()); EXPECT_EQ(1U, store_.counter("test.request_call").value()); EXPECT_EQ(0U, store_.gauge("test.request_active").value()); + EXPECT_EQ(0U, store_.counter("test.response").value()); EXPECT_EQ(0U, store_.counter("test.response_error").value()); } @@ -1310,6 +1311,8 @@ TEST_F(ThriftConnectionManagerTest, transportEndWhenRemoteClose) { // Remote closes the connection. filter_callbacks_.connection_.state_ = Network::Connection::State::Closed; EXPECT_EQ(ThriftFilters::ResponseStatus::Reset, callbacks->upstreamData(write_buffer_)); + EXPECT_EQ(0U, store_.counter("test.response").value()); + EXPECT_EQ(1U, store_.counter("test.response_decoding_error").value()); filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); }