diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index f9f62f12e6375..2309112e71a43 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -148,10 +148,6 @@ void ConnectionImpl::close(ConnectionCloseType type) { return; } - // All close types that follow do not actually close() the socket immediately so that buffered - // data can be written. However, we do want to stop reading to apply TCP backpressure. - read_enabled_ = false; - // NOTE: At this point, it's already been validated that the connection is not already in // delayed close processing and therefore the timer has not yet been created. if (delayed_close_timeout_set) { @@ -244,9 +240,10 @@ void ConnectionImpl::noDelay(bool enable) { uint64_t ConnectionImpl::id() const { return id_; } void ConnectionImpl::onRead(uint64_t read_buffer_size) { - if (!read_enabled_) { + if (!read_enabled_ || inDelayedClose()) { return; } + ASSERT(ioHandle().isOpen()); if (read_buffer_size == 0 && !read_end_stream_) { return; diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 66b3b2bcc79c4..8ddfb9f315354 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1355,6 +1355,7 @@ class FakeReadFilter : public Network::ReadFilter { private: ReadFilterCallbacks* callbacks_{nullptr}; }; + class MockTransportConnectionImplTest : public testing::Test { public: MockTransportConnectionImplTest() { @@ -1696,6 +1697,169 @@ TEST_F(MockTransportConnectionImplTest, WriteReadyOnConnected) { .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, true})); } +// Fixture for validating behavior after a connection is closed. +class PostCloseConnectionImplTest : public MockTransportConnectionImplTest { +protected: + // Setup connection, single read event. + void initialize() { + connection_->addReadFilter(read_filter_); + connection_->setDelayedCloseTimeout(std::chrono::milliseconds(100)); + + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add(val_.c_str(), val_.size()); + return {PostIoAction::KeepOpen, val_.size(), false}; + })); + EXPECT_CALL(*read_filter_, onNewConnection()); + EXPECT_CALL(*read_filter_, onData(_, _)); + file_ready_cb_(Event::FileReadyType::Read); + } + + void writeSomeData() { + Buffer::OwnedImpl buffer("data"); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Write)); + connection_->write(buffer, false); + } + + const std::string val_{"a"}; + std::shared_ptr read_filter_{new StrictMock()}; +}; + +// Test that if a read event occurs after +// close(ConnectionCloseType::FlushWriteAndDelay), the read is not propagated to +// a read filter. +TEST_F(PostCloseConnectionImplTest, ReadAfterCloseFlushWriteDelayIgnored) { + InSequence s; + initialize(); + + // Delayed connection close. + EXPECT_CALL(dispatcher_, createTimer_(_)); + connection_->close(ConnectionCloseType::FlushWriteAndDelay); + + // Read event, doRead() happens on connection but no filter onData(). + EXPECT_CALL(*read_filter_, onData(_, _)).Times(0); + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add(val_.c_str(), val_.size()); + return {PostIoAction::KeepOpen, val_.size(), false}; + })); + file_ready_cb_(Event::FileReadyType::Read); + // Deferred close. + EXPECT_CALL(*transport_socket_, closeSocket(_)); +} + +// Test that if a read event occurs after +// close(ConnectionCloseType::FlushWriteAndDelay) with pending write data, the +// read is not propagated to a read filter. +TEST_F(PostCloseConnectionImplTest, ReadAfterCloseFlushWriteDelayIgnoredWithWriteData) { + InSequence s; + initialize(); + writeSomeData(); + + // Delayed connection close. + EXPECT_CALL(dispatcher_, createTimer_(_)); + connection_->close(ConnectionCloseType::FlushWriteAndDelay); + + // Read event, doRead() happens on connection but no filter onData(). + EXPECT_CALL(*read_filter_, onData(_, _)).Times(0); + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add(val_.c_str(), val_.size()); + return {PostIoAction::KeepOpen, val_.size(), false}; + })); + file_ready_cb_(Event::FileReadyType::Read); + // We have data written above in writeSomeData(), it will be flushed here. + EXPECT_CALL(*transport_socket_, doWrite(_, true)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + // Deferred close. + EXPECT_CALL(*transport_socket_, closeSocket(_)); +} + +// Test that if a read event occurs after +// close(ConnectionCloseType::FlushWriteAndDelay) with pending write data and a +// transport socket than canFlushClose(), the read is not propagated to a read +// filter. +TEST_F(PostCloseConnectionImplTest, ReadAfterCloseFlushWriteDelayIgnoredCanFlushClose) { + InSequence s; + initialize(); + writeSomeData(); + + // The path of interest is when the transport socket canFlushClose(). + ON_CALL(*transport_socket_, canFlushClose()).WillByDefault(Return(true)); + + // Delayed connection close. + EXPECT_CALL(dispatcher_, createTimer_(_)); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write | Event::FileReadyType::Closed)); + connection_->close(ConnectionCloseType::FlushWriteAndDelay); + + // Read event, doRead() happens on connection but no filter onData(). + EXPECT_CALL(*read_filter_, onData(_, _)).Times(0); + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add(val_.c_str(), val_.size()); + return {PostIoAction::KeepOpen, val_.size(), false}; + })); + file_ready_cb_(Event::FileReadyType::Read); + + // Deferred close. + EXPECT_CALL(*transport_socket_, closeSocket(_)); +} + +// Test that if a read event occurs after close(ConnectionCloseType::NoFlush), +// then no read is attempted from the transport socket and hence the read is not +// propagated to a read filter. +TEST_F(PostCloseConnectionImplTest, NoReadAfterCloseNoFlush) { + InSequence s; + initialize(); + + // Immediate connection close. + EXPECT_CALL(*transport_socket_, closeSocket(_)); + connection_->close(ConnectionCloseType::NoFlush); + + // We don't even see a doRead(), let alone an onData() callback. + EXPECT_CALL(*read_filter_, onData(_, _)).Times(0); + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + file_ready_cb_(Event::FileReadyType::Read); +} + +// Test that if a read event occurs after close(ConnectionCloseType::FlushWrite), +// then no read is attempted from the transport socket and hence the read is not +// propagated to a read filter. +TEST_F(PostCloseConnectionImplTest, NoReadAfterCloseFlushWrite) { + InSequence s; + initialize(); + + // Connection flush and close. + EXPECT_CALL(*transport_socket_, closeSocket(_)); + connection_->close(ConnectionCloseType::FlushWrite); + + // We don't even see a doRead(), let alone an onData() callback. + EXPECT_CALL(*read_filter_, onData(_, _)).Times(0); + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + file_ready_cb_(Event::FileReadyType::Read); +} + +// Test that if a read event occurs after close(ConnectionCloseType::FlushWrite) +// with pending write data, then no read is attempted from the transport socket +// and hence the read is not propagated to a read filter. +TEST_F(PostCloseConnectionImplTest, NoReadAfterCloseFlushWriteWriteData) { + InSequence s; + initialize(); + writeSomeData(); + + // Connection flush and close. We have data written above in writeSomeData(), + // it will be flushed here. + EXPECT_CALL(*transport_socket_, doWrite(_, true)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + EXPECT_CALL(*transport_socket_, closeSocket(_)); + connection_->close(ConnectionCloseType::FlushWrite); + + // We don't even see a doRead(), let alone an onData() callback. + EXPECT_CALL(*read_filter_, onData(_, _)).Times(0); + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + file_ready_cb_(Event::FileReadyType::Read); +} + class ReadBufferLimitTest : public ConnectionImplTest { public: void readBufferLimitTest(uint32_t read_buffer_limit, uint32_t expected_chunk_size) { diff --git a/test/integration/h1_corpus/clusterfuzz-testcase-minimized-h1_capture_fuzz_test-5738507290542080 b/test/integration/h1_corpus/clusterfuzz-testcase-minimized-h1_capture_fuzz_test-5738507290542080 new file mode 100644 index 0000000000000..9466a85274013 --- /dev/null +++ b/test/integration/h1_corpus/clusterfuzz-testcase-minimized-h1_capture_fuzz_test-5738507290542080 @@ -0,0 +1,797 @@ +events { +} +events { } +events { downstream_send_bytes: "\n\n\n\n\n\n\n\n\n\n\n\n\n\n" } +events { + downstream_recv_bytes { + } } +events { + downstream_send_bytes: "POST /test/long/ur HTTP/1.1\r\nhost: �s �r � t �s �� �� �� - �r �r ��� r: . � ��� r�� �� �c�d� : v nt er 3 - r�ed-for: 10.0.0.1\r\ntransfer-encoding:events {\n downstrc" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "\005\000" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { + downstream_recv_bytes { + } +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + downstream_send_bytes: "\003\000" +} +events { + downstream_send_bytes: "" +} +events { +} +events { +} +events { +} +events { + upstream_recv_bytes { + } +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + upstream_send_bytes: "" +} +events { + downstream_send_bytes: "POST /t/long)url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + upstream_send_bytes: "" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + downstream_send_bytes: ">" +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "?" +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + upstream_send_bytes: "z" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + upstream_send_bytes: ">" +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + upstream_send_bytes: "" +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + upstream_recv_bytes { + } +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /tes HTTP/1.1\r\ncontent-type: application/grpc\n\r\n" +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + downstream_send_bytes: "POST //test/long/url HTTP/1.1\r\nhost: host\r\nx-lyft-u: chunked\r\n\r\n" +} +events { + downstream_send_bytes: "\005\000" +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_recv_bytes { + } +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + upstream_send_bytes: "" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + upstream_send_bytes: "\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206\206" +} +events { +} +events { +} +events { +} +events { + downstream_recv_bytes { + } +} +events { +} +events { +} +events { +} +events { + upstream_send_bytes: ">" +} +events { +} +events { + upstream_recv_bytes { + } +} +events { +} +events { +} +events { + downstream_send_bytes: "?" +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { + upstream_send_bytes: "?" +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +} +events { + upstream_recv_bytes { + } +} +events { + downstream_send_bytes: "POST /t/long)url HTTP/1.1\r\n\r\n\r\n" +} +events { +} +events { +} +events { +} +events { +} +events { + upstream_send_bytes: "\003\000" +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { +} +events { + downstream_send_bytes: "POST /t/long/url HTTP/1.1\r\n\r\n\r\n" +}