Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ void ConnectionManager::dispatch() {

void ConnectionManager::sendLocalReply(MessageMetadata& metadata, const DirectResponse& response,
bool end_stream) {
Buffer::OwnedImpl buffer;
if (read_callbacks_->connection().state() == Network::Connection::State::Closed) {
return;
}

Buffer::OwnedImpl buffer;
const DirectResponse::ResponseType result = response.encode(metadata, *protocol_, buffer);

Buffer::OwnedImpl response_buffer;
Expand Down Expand Up @@ -204,6 +207,11 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() {

ConnectionManager& cm = parent_.parent_;

if (cm.read_callbacks_->connection().state() == Network::Connection::State::Closed) {
complete_ = true;
throw EnvoyException("downstream connection is closed");
}

Buffer::OwnedImpl buffer;

// Use the factory to get the concrete transport from the decoder transport (as opposed to
Expand Down
65 changes: 65 additions & 0 deletions test/extensions/filters/network/thrift_proxy/conn_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,44 @@ TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendsLocalErrorReply) {
EXPECT_EQ(1U, store_.counter("test.response_error").value());
}

// sendLocalReply does nothing, when the remote closed the connection.
TEST_F(ThriftConnectionManagerTest, OnDataWithFilterSendLocalReplyRemoteClosedConnection) {
auto* filter = new NiceMock<ThriftFilters::MockDecoderFilter>();
custom_filter_.reset(filter);

initializeFilter();
writeFramedBinaryMessage(buffer_, MessageType::Call, 0x0F);

ThriftFilters::DecoderFilterCallbacks* callbacks{};
EXPECT_CALL(*filter, setDecoderFilterCallbacks(_))
.WillOnce(
Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; }));
EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_));

NiceMock<MockDirectResponse> direct_response;
EXPECT_CALL(direct_response, encode(_, _, _)).Times(0);

// First filter sends local reply.
EXPECT_CALL(*filter, messageBegin(_))
.WillOnce(Invoke([&](MessageMetadataSharedPtr) -> FilterStatus {
callbacks->sendLocalReply(direct_response, false);
return FilterStatus::StopIteration;
}));
EXPECT_CALL(filter_callbacks_.connection_, write(_, false)).Times(0);
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(1);

// Remote closes the connection.
filter_callbacks_.connection_.state_ = Network::Connection::State::Closed;
EXPECT_EQ(filter_->onData(buffer_, true), Network::FilterStatus::StopIteration);

filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList();
EXPECT_EQ(1U, store_.counter("test.request").value());
EXPECT_EQ(1U, store_.counter("test.request_call").value());
EXPECT_EQ(0U, store_.gauge("test.request_active").value());
EXPECT_EQ(0U, store_.counter("test.response").value());
EXPECT_EQ(0U, store_.counter("test.response_error").value());
}

// Tests a decoder filter that modifies data.
TEST_F(ThriftConnectionManagerTest, DecoderFiltersModifyRequests) {
auto* filter = new NiceMock<ThriftFilters::MockDecoderFilter>();
Expand Down Expand Up @@ -1252,6 +1290,33 @@ TEST_F(ThriftConnectionManagerTest, DecoderFiltersModifyRequests) {
EXPECT_EQ(1U, store_.gauge("test.request_active").value());
}

TEST_F(ThriftConnectionManagerTest, transportEndWhenRemoteClose) {
initializeFilter();
writeComplexFramedBinaryMessage(buffer_, MessageType::Call, 0x0F);

ThriftFilters::DecoderFilterCallbacks* callbacks{};
EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_))
.WillOnce(
Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; }));

EXPECT_EQ(filter_->onData(buffer_, false), Network::FilterStatus::StopIteration);
EXPECT_EQ(1U, store_.counter("test.request_call").value());

writeComplexFramedBinaryMessage(write_buffer_, MessageType::Reply, 0x0F);

FramedTransportImpl transport;
BinaryProtocolImpl proto;
callbacks->startUpstreamResponse(transport, proto);

// Remote closes the connection.
filter_callbacks_.connection_.state_ = Network::Connection::State::Closed;
EXPECT_EQ(ThriftFilters::ResponseStatus::Reset, callbacks->upstreamData(write_buffer_));
EXPECT_EQ(0U, store_.counter("test.response").value());
EXPECT_EQ(1U, store_.counter("test.response_decoding_error").value());

filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList();
}

} // namespace ThriftProxy
} // namespace NetworkFilters
} // namespace Extensions
Expand Down