-
Notifications
You must be signed in to change notification settings - Fork 5.3k
connection: adding watermarks to the read buffer. #11170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
1bdbb5d
network: do not do infinite reads
alyssawilk a78eff4
connection: adding limits to the read buffer.
alyssawilk 8b87dfa
tidy
alyssawilk ca61e93
avd gets reviewer gold star of the day
alyssawilk 59a0bf5
tidy
alyssawilk 6aafcb2
reviewer comments
alyssawilk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,12 @@ TEST_P(ConnectionImplDeathTest, BadFd) { | |
| ".*assert failure: SOCKET_VALID\\(ConnectionImpl::ioHandle\\(\\)\\.fd\\(\\)\\).*"); | ||
| } | ||
|
|
||
| class TestClientConnectionImpl : public Network::ClientConnectionImpl { | ||
| public: | ||
| using ClientConnectionImpl::ClientConnectionImpl; | ||
| Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; } | ||
| }; | ||
|
|
||
| class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> { | ||
| protected: | ||
| ConnectionImplTest() : api_(Api::createApiForTest(time_system_)), stream_info_(time_system_) {} | ||
|
|
@@ -104,9 +110,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> { | |
| socket_ = std::make_shared<Network::TcpListenSocket>(Network::Test::getAnyAddress(GetParam()), | ||
| nullptr, true); | ||
| listener_ = dispatcher_->createListener(socket_, listener_callbacks_, true); | ||
| client_connection_ = dispatcher_->createClientConnection( | ||
| socket_->localAddress(), source_address_, Network::Test::createRawBufferSocket(), | ||
| socket_options_); | ||
| client_connection_ = std::make_unique<Network::TestClientConnectionImpl>( | ||
| *dispatcher_, socket_->localAddress(), source_address_, | ||
| Network::Test::createRawBufferSocket(), socket_options_); | ||
| client_connection_->addConnectionCallbacks(client_callbacks_); | ||
| EXPECT_EQ(nullptr, client_connection_->ssl()); | ||
| const Network::ClientConnection& const_connection = *client_connection_; | ||
|
|
@@ -215,6 +221,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> { | |
| return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket), file_event, | ||
| &file_ready_cb_}; | ||
| } | ||
| Network::TestClientConnectionImpl* testClientConnection() { | ||
| return dynamic_cast<Network::TestClientConnectionImpl*>(client_connection_.get()); | ||
| } | ||
|
|
||
| Event::FileReadyCb file_ready_cb_; | ||
| Event::SimulatedTimeSystem time_system_; | ||
|
|
@@ -742,7 +751,7 @@ TEST_P(ConnectionImplTest, HalfCloseNoEarlyCloseDetection) { | |
| } | ||
|
|
||
| // Test that as watermark levels are changed, the appropriate callbacks are triggered. | ||
| TEST_P(ConnectionImplTest, Watermarks) { | ||
| TEST_P(ConnectionImplTest, WriteWatermarks) { | ||
| useMockBuffer(); | ||
|
|
||
| setUpBasicConnection(); | ||
|
|
@@ -791,6 +800,123 @@ TEST_P(ConnectionImplTest, Watermarks) { | |
| disconnect(false); | ||
| } | ||
|
|
||
| // Test that as watermark levels are changed, the appropriate callbacks are triggered. | ||
| TEST_P(ConnectionImplTest, ReadWatermarks) { | ||
|
|
||
| setUpBasicConnection(); | ||
| client_connection_->setBufferLimits(2); | ||
| std::shared_ptr<MockReadFilter> client_read_filter(new NiceMock<MockReadFilter>()); | ||
| client_connection_->addReadFilter(client_read_filter); | ||
| connect(); | ||
|
|
||
| EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
| EXPECT_TRUE(client_connection_->readEnabled()); | ||
| // Add 4 bytes to the buffer and verify the connection becomes read disabled. | ||
| { | ||
| Buffer::OwnedImpl buffer("data"); | ||
| server_connection_->write(buffer, false); | ||
| EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
| .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
| dispatcher_->exit(); | ||
| return FilterStatus::StopIteration; | ||
| })); | ||
| dispatcher_->run(Event::Dispatcher::RunType::Block); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth doing a second server_connection write before doing drains, and verify that there is no read from the client connection on dispatcher_->run()? |
||
|
|
||
| EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
| EXPECT_FALSE(client_connection_->readEnabled()); | ||
| } | ||
|
|
||
| // Drain 3 bytes from the buffer. This bring sit below the low watermark, and | ||
| // read enables, as well as triggering a kick for the remaining byte. | ||
| { | ||
| testClientConnection()->readBuffer().drain(3); | ||
| EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
| EXPECT_TRUE(client_connection_->readEnabled()); | ||
|
|
||
| EXPECT_CALL(*client_read_filter, onData(_, false)); | ||
| dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
| } | ||
|
|
||
| // Add 3 bytes to the buffer and verify the connection becomes read disabled | ||
| // again. | ||
| { | ||
| Buffer::OwnedImpl buffer("bye"); | ||
| server_connection_->write(buffer, false); | ||
| EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
| .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
| dispatcher_->exit(); | ||
| return FilterStatus::StopIteration; | ||
| })); | ||
| dispatcher_->run(Event::Dispatcher::RunType::Block); | ||
|
|
||
| EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
| EXPECT_FALSE(client_connection_->readEnabled()); | ||
| } | ||
|
|
||
| // Now have the consumer read disable. | ||
| // This time when the buffer is drained, there will be no kick as the consumer | ||
| // does not want to read. | ||
| { | ||
| client_connection_->readDisable(true); | ||
| testClientConnection()->readBuffer().drain(3); | ||
| EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
| EXPECT_FALSE(client_connection_->readEnabled()); | ||
|
|
||
| EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0); | ||
| dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
| } | ||
|
|
||
| // Now read enable again. | ||
| // Inside the onData call, readDisable and readEnable. This should trigger | ||
| // another kick on the next dispatcher loop, so onData gets called twice. | ||
| { | ||
| client_connection_->readDisable(false); | ||
| EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
| .Times(2) | ||
| .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
| client_connection_->readDisable(true); | ||
| client_connection_->readDisable(false); | ||
| return FilterStatus::StopIteration; | ||
| })) | ||
| .WillRepeatedly(Return(FilterStatus::StopIteration)); | ||
| dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
| } | ||
|
|
||
| // Test the same logic for dispatched_buffered_data from the | ||
| // onReadReady() (read_disable_count_ != 0) path. | ||
| { | ||
| // Fill the buffer and verify the socket is read disabled. | ||
| Buffer::OwnedImpl buffer("bye"); | ||
| server_connection_->write(buffer, false); | ||
| EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
| .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
| dispatcher_->exit(); | ||
| return FilterStatus::StopIteration; | ||
| })); | ||
| dispatcher_->run(Event::Dispatcher::RunType::Block); | ||
| EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
| EXPECT_FALSE(client_connection_->readEnabled()); | ||
|
|
||
| // Read disable and read enable, to set dispatch_buffered_data_ true. | ||
| client_connection_->readDisable(true); | ||
| client_connection_->readDisable(false); | ||
| // Now event loop. This hits the early on-Read path. As above, read | ||
| // disable and read enable from inside the stack of onData, to ensure that | ||
| // dispatch_buffered_data_ works correctly. | ||
| EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
| .Times(2) | ||
| .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
| client_connection_->readDisable(true); | ||
| client_connection_->readDisable(false); | ||
| return FilterStatus::StopIteration; | ||
| })) | ||
| .WillRepeatedly(Return(FilterStatus::StopIteration)); | ||
| dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
| } | ||
|
|
||
| disconnect(true); | ||
| } | ||
|
|
||
| // Write some data to the connection. It will automatically attempt to flush | ||
| // it to the upstream file descriptor via a write() call to buffer_, which is | ||
| // configured to succeed and accept all bytes read. | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you are here can you ASSERT this is greater than 0 before decrementing? I'm surprised this was not already asserted.