From 1bdbb5d3765d99dff854ef1021809c7d589b68a8 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 11 May 2020 12:35:16 -0400 Subject: [PATCH 1/6] network: do not do infinite reads Signed-off-by: Alyssa Wilk --- source/common/buffer/watermark_buffer.h | 1 + source/common/network/connection_impl.cc | 58 ++++++++++++++++++------ source/common/network/connection_impl.h | 12 ++++- 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 827d1a51bccfd..4eff9b37d5504 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -38,6 +38,7 @@ class WatermarkBuffer : public OwnedImpl { void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); } void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); uint32_t highWatermark() const { return high_watermark_; } + bool aboveHighWatermark() const { return above_high_watermark_called_; } private: void checkHighWatermark(); diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index f44aee154ead1..0baa4bc2fe33d 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -48,6 +48,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt : ConnectionImplBase(dispatcher, next_global_id_++), transport_socket_(std::move(transport_socket)), socket_(std::move(socket)), stream_info_(stream_info), filter_manager_(*this), + read_buffer_([this]() -> void { this->onReadBufferLowWatermark(); }, + [this]() -> void { this->onReadBufferHighWatermark(); }), write_buffer_(dispatcher.getWatermarkFactory().create( [this]() -> void { this->onWriteBufferLowWatermark(); }, [this]() -> void { this->onWriteBufferHighWatermark(); })), @@ -186,6 +188,11 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } +bool ConnectionImpl::consumerWantsToRead() { + return read_disable_count_ == 0 || + (read_disable_count_ == 1 && read_buffer_.aboveHighWatermark()); +} + void ConnectionImpl::closeSocket(ConnectionEvent close_type) { if (!ioHandle().isOpen()) { return; @@ -268,7 +275,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { - if (read_disable_count_ != 0 || inDelayedClose()) { + if (inDelayedClose() || !consumerWantsToRead()) { return; } ASSERT(ioHandle().isOpen()); @@ -342,24 +349,25 @@ void ConnectionImpl::readDisable(bool disable) { } } else { --read_disable_count_; - if (read_disable_count_ != 0) { - // The socket should stay disabled. - return; - } if (state() != State::Open || file_event_ == nullptr) { // If readDisable is called on a closed connection, do not crash. return; } - // We never ask for both early close and read at the same time. If we are reading, we want to - // consume all available data. - file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); - // If the connection has data buffered there's no guarantee there's also data in the kernel - // which will kick off the filter chain. Instead fake an event to make sure the buffered data - // gets processed regardless and ensure that we dispatch it via onRead. - if (read_buffer_.length() > 0) { - dispatch_buffered_data_ = true; - file_event_->activate(Event::FileReadyType::Read); + if (read_disable_count_ == 0) { + // We never ask for both early close and read at the same time. If we are reading, we want to + // consume all available data. + file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); + } + + if (consumerWantsToRead()) { + // If the connection has data buffered there's no guarantee there's also data in the kernel + // which will kick off the filter chain. Instead fake an event to make sure the buffered data + // gets processed regardless and ensure that we dispatch it via onRead. + if (read_buffer_.length() > 0) { + dispatch_buffered_data_ = true; + file_event_->activate(Event::FileReadyType::Read); + } } } } @@ -465,9 +473,20 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) { // would result in respecting the exact buffer limit. if (limit > 0) { static_cast(write_buffer_.get())->setWatermarks(limit + 1); + read_buffer_.setWatermarks(limit + 1); } } +void ConnectionImpl::onReadBufferLowWatermark() { + ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this); + readDisable(false); +} + +void ConnectionImpl::onReadBufferHighWatermark() { + ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this); + readDisable(true); +} + void ConnectionImpl::onWriteBufferLowWatermark() { ENVOY_CONN_LOG(debug, "onBelowWriteBufferLowWatermark", *this); ASSERT(write_buffer_above_high_watermark_); @@ -529,6 +548,17 @@ void ConnectionImpl::onReadReady() { ASSERT(!connecting_); + // We get here while read disabled iff the consumer of connection data kicked off a read, and + // instead of reading from the socket we simply need to dispatch already read data. + if (read_disable_count_ != 0) { + ASSERT(dispatch_buffered_data_); + ASSERT(consumerWantsToRead()); + dispatch_buffered_data_ = false; + onRead(read_buffer_.length()); + dispatch_buffered_data_ = false; + return; + } + IoResult result = transport_socket_->doRead(read_buffer_); uint64_t new_buffer_size = read_buffer_.length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 6e8c1eb655189..cbacaff90c34a 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -122,11 +122,19 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback static uint64_t nextGlobalIdForTest() { return next_global_id_; } protected: + // A convenience function which returns true if + // 1) The read disable count is zero or + // 2) The read disable count is one, due to the read buffer being overrun. + // In either case the consumer of the data would like to read from the buffer. + bool consumerWantsToRead(); + // Network::ConnectionImplBase void closeConnectionImmediately() override; void closeSocket(ConnectionEvent close_type); + void onReadBufferLowWatermark(); + void onReadBufferHighWatermark(); void onWriteBufferLowWatermark(); void onWriteBufferHighWatermark(); @@ -135,7 +143,9 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback StreamInfo::StreamInfo& stream_info_; FilterManagerImpl filter_manager_; - Buffer::OwnedImpl read_buffer_; + // Ensure that if the consumer of the data from this connection isn't + // consuming, that the connection eventually stops reading from the wire. + Buffer::WatermarkBuffer read_buffer_; // This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has // a generic pointer. // It MUST be defined after the filter_manager_ as some filters may have callbacks that From a78eff4d5fa3c288ef1513bf85041055cd4a62ad Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 12 May 2020 12:29:17 -0400 Subject: [PATCH 2/6] connection: adding limits to the read buffer. Signed-off-by: Alyssa Wilk --- source/common/network/connection_impl.cc | 28 +++---- test/common/network/connection_impl_test.cc | 86 ++++++++++++++++++++- 2 files changed, 97 insertions(+), 17 deletions(-) diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 0baa4bc2fe33d..f2c78a9fa6e76 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -360,14 +360,12 @@ void ConnectionImpl::readDisable(bool disable) { file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); } - if (consumerWantsToRead()) { + if (consumerWantsToRead() && read_buffer_.length() > 0) { // If the connection has data buffered there's no guarantee there's also data in the kernel // which will kick off the filter chain. Instead fake an event to make sure the buffered data // gets processed regardless and ensure that we dispatch it via onRead. - if (read_buffer_.length() > 0) { - dispatch_buffered_data_ = true; - file_event_->activate(Event::FileReadyType::Read); - } + dispatch_buffered_data_ = true; + setReadBufferReady(); } } } @@ -479,7 +477,9 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) { void ConnectionImpl::onReadBufferLowWatermark() { ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this); - readDisable(false); + if (state() == State::Open) { + readDisable(false); + } } void ConnectionImpl::onReadBufferHighWatermark() { @@ -548,14 +548,16 @@ void ConnectionImpl::onReadReady() { ASSERT(!connecting_); - // We get here while read disabled iff the consumer of connection data kicked off a read, and - // instead of reading from the socket we simply need to dispatch already read data. + // We get here while read disabled in two ways. + // 1) There was a call to setReadBufferReady(), for example if a raw buffer socket ceded due to + // shouldDrainReadBuffer(). In this case we defer the event until the socket is read enabled. + // 2) The consumer of connection data called readDisable(true), and instead of reading from the + // socket we simply need to dispatch already read data. if (read_disable_count_ != 0) { - ASSERT(dispatch_buffered_data_); - ASSERT(consumerWantsToRead()); - dispatch_buffered_data_ = false; - onRead(read_buffer_.length()); - dispatch_buffered_data_ = false; + if (dispatch_buffered_data_ && consumerWantsToRead()) { + onRead(read_buffer_.length()); + dispatch_buffered_data_ = false; + } return; } diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index fbeb06519b095..4377547c7a27e 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -93,6 +93,12 @@ TEST_P(ConnectionImplDeathTest, BadFd) { ".*assert failure: SOCKET_VALID\\(ConnectionImpl::ioHandle\\(\\)\\.fd\\(\\)\\).*"); } +class TestClientConnectionImpl : public Network::ClientConnectionImpl { +public: + using ClientConnectionImpl::ClientConnectionImpl; + Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; } +}; + class ConnectionImplTest : public testing::TestWithParam { protected: ConnectionImplTest() : api_(Api::createApiForTest(time_system_)), stream_info_(time_system_) {} @@ -104,9 +110,9 @@ class ConnectionImplTest : public testing::TestWithParam { socket_ = std::make_shared(Network::Test::getAnyAddress(GetParam()), nullptr, true); listener_ = dispatcher_->createListener(socket_, listener_callbacks_, true); - client_connection_ = dispatcher_->createClientConnection( - socket_->localAddress(), source_address_, Network::Test::createRawBufferSocket(), - socket_options_); + client_connection_ = std::make_unique( + *dispatcher_, socket_->localAddress(), source_address_, + Network::Test::createRawBufferSocket(), socket_options_); client_connection_->addConnectionCallbacks(client_callbacks_); EXPECT_EQ(nullptr, client_connection_->ssl()); const Network::ClientConnection& const_connection = *client_connection_; @@ -215,6 +221,9 @@ class ConnectionImplTest : public testing::TestWithParam { return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket), file_event, &file_ready_cb_}; } + Network::TestClientConnectionImpl* testClientConnection() { + return dynamic_cast(client_connection_.get()); + } Event::FileReadyCb file_ready_cb_; Event::SimulatedTimeSystem time_system_; @@ -742,7 +751,7 @@ TEST_P(ConnectionImplTest, HalfCloseNoEarlyCloseDetection) { } // Test that as watermark levels are changed, the appropriate callbacks are triggered. -TEST_P(ConnectionImplTest, Watermarks) { +TEST_P(ConnectionImplTest, WriteWatermarks) { useMockBuffer(); setUpBasicConnection(); @@ -791,6 +800,75 @@ TEST_P(ConnectionImplTest, Watermarks) { disconnect(false); } +// Test that as watermark levels are changed, the appropriate callbacks are triggered. +TEST_P(ConnectionImplTest, ReadWatermarks) { + + setUpBasicConnection(); + client_connection_->setBufferLimits(2); + std::shared_ptr client_read_filter(new NiceMock()); + client_connection_->addReadFilter(client_read_filter); + connect(); + + EXPECT_FALSE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_TRUE(client_connection_->readEnabled()); + // Add 4 bytes to the buffer and verify the connection becomes read disabled. + { + Buffer::OwnedImpl buffer("data"); + server_connection_->write(buffer, false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .WillRepeatedly(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_TRUE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_FALSE(client_connection_->readEnabled()); + } + + // Drain 3 bytes from the buffer. This bring sit below the low watermark, and + // read enables, as well as triggering a kick for the remaining byte. + { + testClientConnection()->readBuffer().drain(3); + EXPECT_FALSE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_TRUE(client_connection_->readEnabled()); + + EXPECT_CALL(*client_read_filter, onData(_, false)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + // Add 3 bytes to the buffer and verify the connection becomes read disabled + // again. + { + Buffer::OwnedImpl buffer("bye"); + server_connection_->write(buffer, false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .WillRepeatedly(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_TRUE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_FALSE(client_connection_->readEnabled()); + } + + // Now have the consumer read disable. + // This time when the buffer is drained, there will be no kick as the consumer + // does not want to read. + { + client_connection_->readDisable(true); + testClientConnection()->readBuffer().drain(3); + EXPECT_FALSE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_FALSE(client_connection_->readEnabled()); + + EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + disconnect(true); +} + // Write some data to the connection. It will automatically attempt to flush // it to the upstream file descriptor via a write() call to buffer_, which is // configured to succeed and accept all bytes read. From 8b87dfa96e4a15f96f5b3d7c06df5c2bed152d93 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 13 May 2020 09:49:15 -0400 Subject: [PATCH 3/6] tidy Signed-off-by: Alyssa Wilk --- source/common/network/connection_impl.cc | 3 ++- source/common/network/connection_impl.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index f2c78a9fa6e76..5b96cd3ee9183 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -223,7 +223,8 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) { socket_->close(); - raiseEvent(close_type); + // Call the base class directly as close() is called in the destructor. + ConnectionImpl::raiseEvent(close_type); } void ConnectionImpl::noDelay(bool enable) { diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index cbacaff90c34a..c08b2f7424fd5 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -105,7 +105,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback IoHandle& ioHandle() override { return socket_->ioHandle(); } const IoHandle& ioHandle() const override { return socket_->ioHandle(); } Connection& connection() override { return *this; } - void raiseEvent(ConnectionEvent event) override; + void raiseEvent(ConnectionEvent event) final override; // Should the read buffer be drained? bool shouldDrainReadBuffer() override { return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_; From ca61e93c576bdbde5c959682024befe724f059e2 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Thu, 14 May 2020 11:38:04 -0400 Subject: [PATCH 4/6] avd gets reviewer gold star of the day Signed-off-by: Alyssa Wilk --- source/common/buffer/watermark_buffer.h | 4 +- source/common/network/connection_impl.cc | 25 +++++---- test/common/network/connection_impl_test.cc | 62 ++++++++++++++++++--- 3 files changed, 72 insertions(+), 19 deletions(-) diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 4eff9b37d5504..5bc111a4e1e3c 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -38,7 +38,9 @@ class WatermarkBuffer : public OwnedImpl { void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); } void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); uint32_t highWatermark() const { return high_watermark_; } - bool aboveHighWatermark() const { return above_high_watermark_called_; } + // Returns true if the high watermark callbacks have been called more recently + // than the low watermark callbacks. + bool highWatermarkTriggered() const { return above_high_watermark_called_; } private: void checkHighWatermark(); diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 5b96cd3ee9183..219da21fb0a6f 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -190,7 +190,7 @@ void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent: bool ConnectionImpl::consumerWantsToRead() { return read_disable_count_ == 0 || - (read_disable_count_ == 1 && read_buffer_.aboveHighWatermark()); + (read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered()); } void ConnectionImpl::closeSocket(ConnectionEvent close_type) { @@ -319,8 +319,8 @@ void ConnectionImpl::readDisable(bool disable) { ASSERT(state() == State::Open); ASSERT(file_event_ != nullptr); - ENVOY_CONN_LOG(trace, "readDisable: enabled={} disable_count={} state={}", *this, - read_disable_count_, disable, static_cast(state())); + ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this, + disable, read_disable_count_, static_cast(state()), read_buffer_.length()); // When we disable reads, we still allow for early close notifications (the equivalent of // EPOLLRDHUP for an epoll backend). For backends that support it, this allows us to apply @@ -363,8 +363,9 @@ void ConnectionImpl::readDisable(bool disable) { if (consumerWantsToRead() && read_buffer_.length() > 0) { // If the connection has data buffered there's no guarantee there's also data in the kernel - // which will kick off the filter chain. Instead fake an event to make sure the buffered data - // gets processed regardless and ensure that we dispatch it via onRead. + // which will kick off the filter chain. Alternately if the read buffer has data the fd could + // be read disabled. To handle these cases, fake an event to make sure the buffered data gets + // processed regardless and ensure that we dispatch it via onRead. dispatch_buffered_data_ = true; setReadBufferReady(); } @@ -485,7 +486,9 @@ void ConnectionImpl::onReadBufferLowWatermark() { void ConnectionImpl::onReadBufferHighWatermark() { ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this); - readDisable(true); + if (state() == State::Open) { + readDisable(true); + } } void ConnectionImpl::onWriteBufferLowWatermark() { @@ -545,7 +548,9 @@ void ConnectionImpl::onFileEvent(uint32_t events) { } void ConnectionImpl::onReadReady() { - ENVOY_CONN_LOG(trace, "read ready", *this); + ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this, dispatch_buffered_data_); + const bool latched_dispatch_buffered_data = dispatch_buffered_data_; + dispatch_buffered_data_ = false; ASSERT(!connecting_); @@ -555,9 +560,8 @@ void ConnectionImpl::onReadReady() { // 2) The consumer of connection data called readDisable(true), and instead of reading from the // socket we simply need to dispatch already read data. if (read_disable_count_ != 0) { - if (dispatch_buffered_data_ && consumerWantsToRead()) { + if (latched_dispatch_buffered_data && consumerWantsToRead()) { onRead(read_buffer_.length()); - dispatch_buffered_data_ = false; } return; } @@ -575,13 +579,12 @@ void ConnectionImpl::onReadReady() { read_end_stream_ |= result.end_stream_read_; if (result.bytes_processed_ != 0 || result.end_stream_read_ || - (dispatch_buffered_data_ && read_buffer_.length() > 0)) { + (latched_dispatch_buffered_data && read_buffer_.length() > 0)) { // Skip onRead if no bytes were processed unless we explicitly want to force onRead for // buffered data. For instance, skip onRead if the connection was closed without producing // more data. onRead(new_buffer_size); } - dispatch_buffered_data_ = false; // The read callback may have already closed the connection. if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) { diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 4377547c7a27e..6ef055b9fab38 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -809,20 +809,20 @@ TEST_P(ConnectionImplTest, ReadWatermarks) { client_connection_->addReadFilter(client_read_filter); connect(); - EXPECT_FALSE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); EXPECT_TRUE(client_connection_->readEnabled()); // Add 4 bytes to the buffer and verify the connection becomes read disabled. { Buffer::OwnedImpl buffer("data"); server_connection_->write(buffer, false); EXPECT_CALL(*client_read_filter, onData(_, false)) - .WillRepeatedly(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { dispatcher_->exit(); return FilterStatus::StopIteration; })); dispatcher_->run(Event::Dispatcher::RunType::Block); - EXPECT_TRUE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); EXPECT_FALSE(client_connection_->readEnabled()); } @@ -830,7 +830,7 @@ TEST_P(ConnectionImplTest, ReadWatermarks) { // read enables, as well as triggering a kick for the remaining byte. { testClientConnection()->readBuffer().drain(3); - EXPECT_FALSE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); EXPECT_TRUE(client_connection_->readEnabled()); EXPECT_CALL(*client_read_filter, onData(_, false)); @@ -843,13 +843,13 @@ TEST_P(ConnectionImplTest, ReadWatermarks) { Buffer::OwnedImpl buffer("bye"); server_connection_->write(buffer, false); EXPECT_CALL(*client_read_filter, onData(_, false)) - .WillRepeatedly(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { dispatcher_->exit(); return FilterStatus::StopIteration; })); dispatcher_->run(Event::Dispatcher::RunType::Block); - EXPECT_TRUE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); EXPECT_FALSE(client_connection_->readEnabled()); } @@ -859,13 +859,61 @@ TEST_P(ConnectionImplTest, ReadWatermarks) { { client_connection_->readDisable(true); testClientConnection()->readBuffer().drain(3); - EXPECT_FALSE(testClientConnection()->readBuffer().aboveHighWatermark()); + EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); EXPECT_FALSE(client_connection_->readEnabled()); EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0); dispatcher_->run(Event::Dispatcher::RunType::NonBlock); } + // Now read enable again. + // Inside the onData call, readDisable and readEnable. This should trigger + // another kick on the next dispatcher loop, so onData gets called twice. + { + client_connection_->readDisable(false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .Times(2) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + client_connection_->readDisable(true); + client_connection_->readDisable(false); + return FilterStatus::StopIteration; + })) + .WillRepeatedly(Return(FilterStatus::StopIteration)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + // Test the same logic for dispatched_buffered_data from the + // onReadReady() (read_disable_count_ != 0) path. + { + // Fill the buffer and verify the socket is read disabled. + Buffer::OwnedImpl buffer("bye"); + server_connection_->write(buffer, false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_FALSE(client_connection_->readEnabled()); + + // Read disable and read enable, to set dispatch_buffered_data_ true. + client_connection_->readDisable(true); + client_connection_->readDisable(false); + // Now event loop. This hits the early on-Read path. As above, read + // disable and read enable from inside the stack of onData, to ensure that + // dispatch_buffered_data_ works correctly. + EXPECT_CALL(*client_read_filter, onData(_, false)) + .Times(2) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + client_connection_->readDisable(true); + client_connection_->readDisable(false); + return FilterStatus::StopIteration; + })) + .WillRepeatedly(Return(FilterStatus::StopIteration)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + disconnect(true); } From 59a0bf5572c36f09a61bb8dd5427c6814b3fed42 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 18 May 2020 11:12:24 -0400 Subject: [PATCH 5/6] tidy Signed-off-by: Alyssa Wilk --- source/common/network/connection_impl.cc | 2 +- source/common/network/connection_impl.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 219da21fb0a6f..f43f1c3755491 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -194,7 +194,7 @@ bool ConnectionImpl::consumerWantsToRead() { } void ConnectionImpl::closeSocket(ConnectionEvent close_type) { - if (!ioHandle().isOpen()) { + if (!ConnectionImpl::ioHandle().isOpen()) { return; } diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index c08b2f7424fd5..4bb33cf737dc4 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -102,10 +102,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback } // Network::TransportSocketCallbacks - IoHandle& ioHandle() override { return socket_->ioHandle(); } + IoHandle& ioHandle() final { return socket_->ioHandle(); } const IoHandle& ioHandle() const override { return socket_->ioHandle(); } Connection& connection() override { return *this; } - void raiseEvent(ConnectionEvent event) final override; + void raiseEvent(ConnectionEvent event) final; // Should the read buffer be drained? bool shouldDrainReadBuffer() override { return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_; From 6aafcb20eb6ff85799acefede2b45d116e67c111 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 20 May 2020 13:48:28 -0400 Subject: [PATCH 6/6] reviewer comments Signed-off-by: Alyssa Wilk --- source/common/network/connection_impl.cc | 1 + source/common/network/connection_impl.h | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index f43f1c3755491..12961773a7ee3 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -349,6 +349,7 @@ void ConnectionImpl::readDisable(bool disable) { file_event_->setEnabled(Event::FileReadyType::Write); } } else { + ASSERT(read_disable_count_ != 0); --read_disable_count_; if (state() != State::Open || file_event_ == nullptr) { // If readDisable is called on a closed connection, do not crash. diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 4bb33cf737dc4..b464e2af96d10 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -124,8 +124,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback protected: // A convenience function which returns true if // 1) The read disable count is zero or - // 2) The read disable count is one, due to the read buffer being overrun. + // 2) The read disable count is one due to the read buffer being overrun. // In either case the consumer of the data would like to read from the buffer. + // If the read count is greater than one, or equal to one when the buffer is + // not overrun, then the consumer of the data has called readDisable, and does + // not want to read. bool consumerWantsToRead(); // Network::ConnectionImplBase