-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Tcp flow control #1217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tcp flow control #1217
Changes from 12 commits
cbe959f
f2ffecb
0a9116d
24b56df
2cb1ebf
f07608b
36ecdc3
ccea6d4
b846116
8fd62ca
1036f4b
ea9aebb
e1a91fa
9bbc0f4
f5ce5f4
73f80d3
62098a4
e15f458
5b514b8
f9603f7
c0bbd2d
8f18115
c604950
31b46da
872e1b3
f373704
71792b3
c6e50dc
c925fee
3bf7617
b8216e9
d55f9eb
b79fc5e
531b61e
69a8e33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,17 @@ class ConnectionCallbacks { | |
| * @param events supplies the ConnectionEvent events that occurred as a bitmask. | ||
| */ | ||
| virtual void onEvent(uint32_t events) PURE; | ||
|
|
||
| /** | ||
| * Called when the write buffer for a connection goes over its high watermark. | ||
| */ | ||
| virtual void onAboveWriteBufferHighWatermark() PURE; | ||
|
|
||
| /** | ||
| * Called when the write buffer for a connection goes from over its high | ||
| * watermark to under its low watermark. | ||
| */ | ||
| virtual void onBelowWriteBufferLowWatermark() PURE; | ||
| }; | ||
|
|
||
| /** | ||
|
|
@@ -162,6 +173,20 @@ class Connection : public Event::DeferredDeletable, public FilterManager { | |
| * Get the value set with setReadBufferLimit. | ||
| */ | ||
| virtual uint32_t readBufferLimit() const PURE; | ||
|
|
||
| /** | ||
| * Sets the high and low watermarks which trigger onAboveWriteBufferHighWatermark | ||
| * and onBelowWriteBufferHighWatermark callbacks. | ||
| * The connection is assumed to start out with less than high_watermark | ||
| * worth of data buffered, so onAboveWriteBufferHighWatermark will always be | ||
| * called before onAboveWriteBufferHighWatermark | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/onAboveWriteBufferHighWatermark/onBelowWriteBufferLowWatermark ? |
||
| * @param low_watermark if the connection was above the high watermark and the | ||
| * connection buffer is drained below this many bytes, onBelowWriteBufferHighWatermark will be | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/onBelowWriteBufferHighWatermark/onBelowWriteBufferLowWatermark ? |
||
| * called. | ||
| * @param high_watermark if the connection has more bytes than this buffered, | ||
| * onAboveWriteBufferHighWatermark will be called. | ||
| */ | ||
| virtual void setWriteBufferWatermarks(uint32_t low_watermark, uint32_t high_watermark) PURE; | ||
| }; | ||
|
|
||
| typedef std::unique_ptr<Connection> ConnectionPtr; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -123,6 +123,40 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb | |
| config_->stats().downstream_cx_tx_bytes_buffered_}); | ||
| } | ||
|
|
||
| void TcpProxy::readDisableUpstream(bool disable) { upstream_connection_->readDisable(disable); } | ||
|
|
||
| void TcpProxy::readDisableDownstream(bool disable) { | ||
| read_callbacks_->connection().readDisable(disable); | ||
| } | ||
|
|
||
| void TcpProxy::DownstreamCallbacks::onAboveWriteBufferHighWatermark() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it worth it here to add ASSERTS in all of these functions that basically assert that we aren't getting multiple watermark callbacks? Or can that happen? I'm thinking something along the lines of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. connections can have the read disabled by other means so if we check the actual state of the connection we can only verify that we enable actually disabled sockets (not that we disable enabled sockets) We could track local state by adding 2 booleans to the tcp proxy filter which are only used for debug, which I'd be happy to do - flow control is tricky enough to get right I'm happy to have extra asserts too!
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would be in favor of adding the booleans (at least for now, with a TODO to clean them up once we have production experience) since this code is super tricky to get right and more checking is better IMO. I will leave it up to you to decide either way.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Works for me. Flow control changes are inherently dangerous as you point out. |
||
| ASSERT(!on_high_watermark_called_); | ||
| on_high_watermark_called_ = true; | ||
| // If downstream has too much data buffered, stop reading on the upstream connection. | ||
| parent_.readDisableUpstream(true); | ||
| } | ||
|
|
||
| void TcpProxy::DownstreamCallbacks::onBelowWriteBufferLowWatermark() { | ||
| ASSERT(on_high_watermark_called_); | ||
| on_high_watermark_called_ = false; | ||
| // The downstream buffer has been drained. Resume reading from upstream. | ||
| parent_.readDisableUpstream(false); | ||
| } | ||
|
|
||
| void TcpProxy::UpstreamCallbacks::onAboveWriteBufferHighWatermark() { | ||
| ASSERT(!on_high_watermark_called_); | ||
| on_high_watermark_called_ = true; | ||
| // There's too much data buffered in the upstream write buffer, so stop reading. | ||
| parent_.readDisableDownstream(true); | ||
| } | ||
|
|
||
| void TcpProxy::UpstreamCallbacks::onBelowWriteBufferLowWatermark() { | ||
| ASSERT(on_high_watermark_called_); | ||
| on_high_watermark_called_ = false; | ||
| // The upstream write buffer is drained. Resume reading. | ||
| parent_.readDisableDownstream(false); | ||
| } | ||
|
|
||
| Network::FilterStatus TcpProxy::initializeUpstreamConnection() { | ||
| const std::string& cluster_name = config_->getRouteFromEntries(read_callbacks_->connection()); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,7 +98,7 @@ void ConnectionImpl::close(ConnectionCloseType type) { | |
| return; | ||
| } | ||
|
|
||
| uint64_t data_to_write = write_buffer_.length(); | ||
| uint64_t data_to_write = write_buffer_->length(); | ||
| ENVOY_CONN_LOG(debug, "closing data_to_write={} type={}", *this, data_to_write, enumToInt(type)); | ||
| if (data_to_write == 0 || type == ConnectionCloseType::NoFlush) { | ||
| if (data_to_write > 0) { | ||
|
|
@@ -207,15 +207,26 @@ void ConnectionImpl::readDisable(bool disable) { | |
| // TODO(mattklein123): Potentially support half-closed TCP connections. It's unclear if this is | ||
| // required for any scenarios in which Envoy will be used (I don't know of any). | ||
| if (disable) { | ||
| if (!read_enabled) { | ||
| ++read_disable_count_; | ||
| return; | ||
| } | ||
| ASSERT(read_enabled); | ||
| state_ &= ~InternalState::ReadEnabled; | ||
| file_event_->setEnabled(Event::FileReadyType::Write | Event::FileReadyType::Closed); | ||
| } else { | ||
| if (read_disable_count_ > 0) { | ||
| --read_disable_count_; | ||
| return; | ||
| } | ||
| ASSERT(!read_enabled); | ||
| state_ |= InternalState::ReadEnabled; | ||
| // We never ask for both early close and read at the same time. If we are reading, we want to | ||
| // consume all available data. | ||
| file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); | ||
| // If the connection has data buffered there's no guarantee there's also data in the kernel | ||
| // which will kick off the filter chain. Instead fake an event to make sure the buffered data | ||
| // gets processed regardless. | ||
| if (read_buffer_.length() > 0) { | ||
| file_event_->activate(Event::FileReadyType::Read); | ||
| } | ||
|
|
@@ -254,13 +265,52 @@ void ConnectionImpl::write(Buffer::Instance& data) { | |
| // ever changed, read the comment in Ssl::ConnectionImpl::doWriteToSocket() VERY carefully. | ||
| // That code assumes that we never change existing write_buffer_ chain elements between calls | ||
| // to SSL_write(). That code will have to change if we ever copy here. | ||
| write_buffer_.move(data); | ||
| write_buffer_->move(data); | ||
| checkForHighWatermark(); | ||
|
|
||
| if (!(state_ & InternalState::Connecting)) { | ||
| file_event_->activate(Event::FileReadyType::Write); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void ConnectionImpl::setWriteBufferWatermarks(uint32_t new_low_watermark, | ||
| uint32_t new_high_watermark) { | ||
| ENVOY_CONN_LOG(debug, "Setting watermarks: {} {}", *this, new_low_watermark, new_high_watermark); | ||
| ASSERT(new_low_watermark < new_high_watermark); | ||
|
|
||
| high_watermark_ = new_high_watermark; | ||
| low_watermark_ = new_low_watermark; | ||
|
|
||
| checkForLowWatermark(); | ||
| checkForHighWatermark(); | ||
| } | ||
|
|
||
| void ConnectionImpl::checkForLowWatermark() { | ||
| if (!above_high_watermark_called_ || write_buffer_->length() >= low_watermark_) { | ||
| return; | ||
| } | ||
| ENVOY_CONN_LOG(debug, "onBelowWriteBufferLowWatermark", *this); | ||
|
|
||
| above_high_watermark_called_ = false; | ||
| for (ConnectionCallbacks* callback : callbacks_) { | ||
| callback->onBelowWriteBufferLowWatermark(); | ||
| } | ||
| } | ||
|
|
||
| void ConnectionImpl::checkForHighWatermark() { | ||
| if (above_high_watermark_called_ || high_watermark_ == 0 || | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's some asymmetry here that is probably fine, but might be worth calling out somewhere. You'll never receive an
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, per my other comment, I would add as many ASSERTs as make sense to code like this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've called it out more explicitly in include/envoy/network/connection.h let me know if you think it should go elsewhere as well! |
||
| write_buffer_->length() <= high_watermark_) { | ||
| return; | ||
| } | ||
| ENVOY_CONN_LOG(debug, "onAboveWriteBufferHighWatermark", *this); | ||
|
|
||
| above_high_watermark_called_ = true; | ||
| for (ConnectionCallbacks* callback : callbacks_) { | ||
| callback->onAboveWriteBufferHighWatermark(); | ||
| } | ||
| } | ||
|
|
||
| void ConnectionImpl::onFileEvent(uint32_t events) { | ||
| ENVOY_CONN_LOG(trace, "socket event: {}", *this, events); | ||
|
|
||
|
|
@@ -347,12 +397,11 @@ ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { | |
| PostIoAction action; | ||
| uint64_t bytes_written = 0; | ||
| do { | ||
| if (write_buffer_.length() == 0) { | ||
| if (write_buffer_->length() == 0) { | ||
| action = PostIoAction::KeepOpen; | ||
| break; | ||
| } | ||
|
|
||
| int rc = write_buffer_.write(fd_); | ||
| int rc = write_buffer_->write(fd_); | ||
| ENVOY_CONN_LOG(trace, "write returns: {}", *this, rc); | ||
| if (rc == -1) { | ||
| ENVOY_CONN_LOG(trace, "write error: {}", *this, errno); | ||
|
|
@@ -364,6 +413,7 @@ ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { | |
|
|
||
| break; | ||
| } else { | ||
| checkForLowWatermark(); | ||
| bytes_written += rc; | ||
| } | ||
| } while (true); | ||
|
|
@@ -400,7 +450,7 @@ void ConnectionImpl::onWriteReady() { | |
| } | ||
|
|
||
| IoResult result = doWriteToSocket(); | ||
| uint64_t new_buffer_size = write_buffer_.length(); | ||
| uint64_t new_buffer_size = write_buffer_->length(); | ||
| updateWriteBufferStats(result.bytes_processed_, new_buffer_size); | ||
|
|
||
| if (result.action_ == PostIoAction::Close) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,11 +72,16 @@ class ConnectionImpl : public virtual Connection, | |
| void write(Buffer::Instance& data) override; | ||
| void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; } | ||
| uint32_t readBufferLimit() const override { return read_buffer_limit_; } | ||
| void setWriteBufferWatermarks(uint32_t low_watermark, uint32_t high_watermark) override; | ||
|
|
||
| // Network::BufferSource | ||
| Buffer::Instance& getReadBuffer() override { return read_buffer_; } | ||
| Buffer::Instance& getWriteBuffer() override { return *current_write_buffer_; } | ||
|
|
||
| void replaceWriteBufferForTest(std::unique_ptr<Buffer::OwnedImpl> new_buffer) { | ||
| write_buffer_ = std::move(new_buffer); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels a bit scary reaching in and mutating internal state from a test (as opposed to helper methods to just inspect state). I wonder if there is a way to factory-ify this to allow mocking via dependency injection...
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. The general pattern we have been doing is to make something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM. This turns out to involve touching a handful of new files (and the test refactors were already pretty large) so if no one objects I'm going to the test changes and the buffer factory out into their own separate patch since this one is already pretty large.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fine with me if that is easier.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, over in #1234. Feel free to ignore this one until that is merge in, or I can attempt to rebase if you'd prefer to review things in parallel. |
||
| } | ||
|
|
||
| protected: | ||
| enum class PostIoAction { Close, KeepOpen }; | ||
|
|
||
|
|
@@ -99,12 +104,29 @@ class ConnectionImpl : public virtual Connection, | |
| // Reconsider how to make fairness happen. | ||
| void setReadBufferReady() { file_event_->activate(Event::FileReadyType::Read); } | ||
|
|
||
| // Called when data is drained from the write buffer, to see if onBelowWriteBufferLowWatermark | ||
| // should be called. | ||
| void checkForLowWatermark(); | ||
| // Called when data is added to the write buffer, to see if onAboveWriteBufferHighWatermark should | ||
| // be called. | ||
| void checkForHighWatermark(); | ||
|
|
||
| FilterManagerImpl filter_manager_; | ||
| Address::InstanceConstSharedPtr remote_address_; | ||
| Address::InstanceConstSharedPtr local_address_; | ||
| Buffer::OwnedImpl read_buffer_; | ||
| Buffer::OwnedImpl write_buffer_; | ||
| Buffer::InstancePtr write_buffer_{new Buffer::OwnedImpl}; | ||
| uint32_t read_buffer_limit_ = 0; | ||
| // Used for network level buffer limits (off by default). If these are non-zero, when the write | ||
| // buffer passes |high_watermark_|, onAboveWriteBufferHighWatermark will be called to disable | ||
| // reading further data. When the buffer drains below |low_watermark_|, | ||
| // onBelowWriteBufferLowWatermark will be called to resume reads. | ||
| uint32_t high_watermark_{0}; | ||
| uint32_t low_watermark_{0}; | ||
| // Tracks the latest state of watermark callbacks. | ||
| // True between the time onAboveWriteBufferHighWatermark is called until the next call to | ||
| // onBelowLowWatermark. | ||
| bool above_high_watermark_called_{false}; | ||
|
|
||
| private: | ||
| // clang-format off | ||
|
|
@@ -138,6 +160,10 @@ class ConnectionImpl : public virtual Connection, | |
| uint64_t last_read_buffer_size_{}; | ||
| uint64_t last_write_buffer_size_{}; | ||
| std::unique_ptr<BufferStats> buffer_stats_; | ||
| // Tracks the number of times reads have been disabled. If N different components call | ||
| // readDisabled(true) this allows the connection to only resume reads when readDisabled(false) | ||
| // has been called N times. | ||
| uint32_t read_disable_count_{0}; | ||
| }; | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,6 +102,17 @@ void ListenerImpl::newConnection(int fd, Address::InstanceConstSharedPtr remote_ | |
| Address::InstanceConstSharedPtr local_address) { | ||
| ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address, local_address)); | ||
| new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_); | ||
| // Due to the fact that writes to the connection and flushing data from the connection are done | ||
| // asynchronously, we have the option of either setting the watermarks aggressively, and regularly | ||
| // enabling/disabling reads from the socket, or allowing more data, but then not triggering | ||
| // based on watermarks until 2x the data is buffered in the common case. Given these are all soft | ||
| // limits we err on the side of buffeing more and having better performace. | ||
| // If the connection class is changed to write-and-flush the high watermark should be changed to | ||
| // the buffer limit without the + 1 | ||
| if (options_.per_connection_buffer_limit_bytes_ > 0) { | ||
| new_connection->setWriteBufferWatermarks(options_.per_connection_buffer_limit_bytes_ / 2, | ||
| options_.per_connection_buffer_limit_bytes_ + 1); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we refactor this logic to be in one place?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops - missed this one. I'd be inclined to merge setReadBufferLimit and setWriteBufferWatermarks into setConnectionBufferLimits which just does both under the hood. I'll see if that works cleanly in code and docs tomorrow. |
||
| cb_.onNewConnection(std::move(new_connection)); | ||
| } | ||
|
|
||
|
|
@@ -111,6 +122,10 @@ void SslListenerImpl::newConnection(int fd, Address::InstanceConstSharedPtr remo | |
| local_address, ssl_ctx_, | ||
| Ssl::ConnectionImpl::InitialState::Server)); | ||
| new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_); | ||
| if (options_.per_connection_buffer_limit_bytes_ > 0) { | ||
| new_connection->setWriteBufferWatermarks(options_.per_connection_buffer_limit_bytes_ / 2, | ||
| options_.per_connection_buffer_limit_bytes_ + 1); | ||
| } | ||
| cb_.onNewConnection(std::move(new_connection)); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/onBelowWriteBufferHighWatermark/onBelowWriteBufferLowWatermark ?