diff --git a/docs/configuration/cluster_manager/cluster_stats.rst b/docs/configuration/cluster_manager/cluster_stats.rst index 599c9cc469379..3b10bba64433a 100644 --- a/docs/configuration/cluster_manager/cluster_stats.rst +++ b/docs/configuration/cluster_manager/cluster_stats.rst @@ -53,6 +53,8 @@ Every cluster has a statistics tree rooted at *cluster..* with the followi upstream_rq_retry, Counter, Total request retries upstream_rq_retry_success, Counter, Total request retry successes upstream_rq_retry_overflow, Counter, Total requests not retried due to circuit breaking + upstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from upstream. + upstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from upstream. membership_change, Counter, Total cluster membership changes membership_healthy, Gauge, Current cluster healthy total (inclusive of both health checking and outlier detection) membership_total, Gauge, Current cluster membership total diff --git a/docs/configuration/network_filters/tcp_proxy_filter.rst b/docs/configuration/network_filters/tcp_proxy_filter.rst index faa9edc2148b9..752deb4964e35 100644 --- a/docs/configuration/network_filters/tcp_proxy_filter.rst +++ b/docs/configuration/network_filters/tcp_proxy_filter.rst @@ -138,4 +138,6 @@ statistics are rooted at *tcp..* with the following statistics: downstream_cx_no_route, Counter, Number of connections for which no matching route was found. downstream_cx_tx_bytes_total, Counter, Total bytes written to the downstream connection. downstream_cx_tx_bytes_buffered, Gauge, Total bytes currently buffered to the downstream connection. + downstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from downstream. + downstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from downstream. diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index 355cb7e395f3c..274efb622b1ed 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -45,6 +45,17 @@ class ConnectionCallbacks { * @param events supplies the ConnectionEvent events that occurred as a bitmask. */ virtual void onEvent(uint32_t events) PURE; + + /** + * Called when the write buffer for a connection goes over its high watermark. + */ + virtual void onAboveWriteBufferHighWatermark() PURE; + + /** + * Called when the write buffer for a connection goes from over its high + * watermark to under its low watermark. + */ + virtual void onBelowWriteBufferLowWatermark() PURE; }; /** @@ -153,15 +164,21 @@ class Connection : public Event::DeferredDeletable, public FilterManager { virtual void write(Buffer::Instance& data) PURE; /** - * Set a soft limit on the size of the read buffer prior to flushing to further stages in the + * Set a soft limit on the size of buffers for the connection. + * For the read buffer, this limits the bytes read prior to flushing to further stages in the * processing pipeline. + * For the write buffer, it sets watermarks. When enough data is buffered it triggers a call to + * onAboveWriteBufferHighWatermark, which allows subscribers to enforce flow control by disabling + * reads on the socket funneling data to the write buffer. When enough data is drained from the + * write buffer, onBelowWriteBufferHighWatermark is called which similarly allows subscribers + * resuming reading. */ - virtual void setReadBufferLimit(uint32_t limit) PURE; + virtual void setBufferLimits(uint32_t limit) PURE; /** - * Get the value set with setReadBufferLimit. + * Get the value set with setBufferLimits. */ - virtual uint32_t readBufferLimit() const PURE; + virtual uint32_t bufferLimit() const PURE; }; typedef std::unique_ptr ConnectionPtr; diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 41f489711e64f..3df1c4f319270 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -201,6 +201,8 @@ class HostSet { COUNTER(upstream_rq_retry) \ COUNTER(upstream_rq_retry_success) \ COUNTER(upstream_rq_retry_overflow) \ + COUNTER(upstream_flow_control_paused_reading_total) \ + COUNTER(upstream_flow_control_resumed_reading_total) \ GAUGE (max_host_weight) \ COUNTER(membership_change) \ GAUGE (membership_healthy) \ diff --git a/source/common/buffer/BUILD b/source/common/buffer/BUILD index 2907225057df8..53b0868e82968 100644 --- a/source/common/buffer/BUILD +++ b/source/common/buffer/BUILD @@ -8,6 +8,16 @@ load( envoy_package() +envoy_cc_library( + name = "watermark_buffer_lib", + srcs = ["watermark_buffer.cc"], + hdrs = ["watermark_buffer.h"], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/common:assert_lib", + ], +) + envoy_cc_library( name = "buffer_lib", srcs = ["buffer_impl.cc"], diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index db4c8f83a867b..a9f6fd0148a87 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -67,15 +67,15 @@ void OwnedImpl::move(Instance& rhs) { // now and this is safe. Using the evbuffer move routines require having access to both evbuffers. // This is a reasonable compromise in a high performance path where we want to maintain an // abstraction in case we get rid of evbuffer later. - int rc = evbuffer_add_buffer(buffer_.get(), static_cast(rhs).buffer_.get()); + int rc = evbuffer_add_buffer(buffer_.get(), static_cast(rhs).buffer().get()); ASSERT(rc == 0); UNREFERENCED_PARAMETER(rc); } void OwnedImpl::move(Instance& rhs, uint64_t length) { // See move() above for why we do the static cast. - int rc = - evbuffer_remove_buffer(static_cast(rhs).buffer_.get(), buffer_.get(), length); + int rc = evbuffer_remove_buffer(static_cast(rhs).buffer().get(), buffer_.get(), + length); ASSERT(static_cast(rc) == length); UNREFERENCED_PARAMETER(rc); } diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 8214eac9f7254..40414bc101102 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -16,17 +16,25 @@ class OwnedImplFactory : public Factory { InstancePtr create() override; }; +class LibEventInstance : public Instance { +public: + virtual Event::Libevent::BufferPtr& buffer() PURE; +}; + /** * Wraps an allocated and owned evbuffer. + * + * Note that due to the internals of move() accessing buffer(), OwnedImpl is not + * compatible with non-LibEventInstance buffers. */ -class OwnedImpl : public Instance { +class OwnedImpl : public LibEventInstance { public: OwnedImpl(); OwnedImpl(const std::string& data); OwnedImpl(const Instance& data); OwnedImpl(const void* data, uint64_t size); - // Instance + // LibEventInstance void add(const void* data, uint64_t size) override; void add(const std::string& data) override; void add(const Instance& data) override; @@ -42,6 +50,8 @@ class OwnedImpl : public Instance { ssize_t search(const void* data, uint64_t size, size_t start) const override; int write(int fd) override; + Event::Libevent::BufferPtr& buffer() override { return buffer_; } + private: Event::Libevent::BufferPtr buffer_; }; diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc new file mode 100644 index 0000000000000..f1a01342b39a6 --- /dev/null +++ b/source/common/buffer/watermark_buffer.cc @@ -0,0 +1,89 @@ +#include "common/buffer/watermark_buffer.h" + +#include "common/common/assert.h" + +namespace Envoy { +namespace Buffer { + +void WatermarkBuffer::add(const void* data, uint64_t size) { + wrapped_buffer_->add(data, size); + checkHighWatermark(); +} + +void WatermarkBuffer::add(const std::string& data) { + wrapped_buffer_->add(data); + checkHighWatermark(); +} + +void WatermarkBuffer::add(const Instance& data) { + wrapped_buffer_->add(data); + checkHighWatermark(); +} + +void WatermarkBuffer::commit(RawSlice* iovecs, uint64_t num_iovecs) { + wrapped_buffer_->commit(iovecs, num_iovecs); + checkHighWatermark(); +} + +void WatermarkBuffer::drain(uint64_t size) { + wrapped_buffer_->drain(size); + checkLowWatermark(); +} + +void WatermarkBuffer::move(Instance& rhs) { + wrapped_buffer_->move(rhs); + checkHighWatermark(); +} + +void WatermarkBuffer::move(Instance& rhs, uint64_t length) { + wrapped_buffer_->move(rhs, length); + checkHighWatermark(); +} + +int WatermarkBuffer::read(int fd, uint64_t max_length) { + int bytes_read = wrapped_buffer_->read(fd, max_length); + checkHighWatermark(); + return bytes_read; +} + +uint64_t WatermarkBuffer::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) { + uint64_t bytes_reserved = wrapped_buffer_->reserve(length, iovecs, num_iovecs); + checkHighWatermark(); + return bytes_reserved; +} + +int WatermarkBuffer::write(int fd) { + int bytes_written = wrapped_buffer_->write(fd); + checkLowWatermark(); + return bytes_written; +} + +void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) { + ASSERT(low_watermark < high_watermark); + low_watermark_ = low_watermark; + high_watermark_ = high_watermark; + checkHighWatermark(); + checkLowWatermark(); +} + +void WatermarkBuffer::checkLowWatermark() { + if (!above_high_watermark_called_ || wrapped_buffer_->length() >= low_watermark_) { + return; + } + + above_high_watermark_called_ = false; + below_low_watermark_(); +} + +void WatermarkBuffer::checkHighWatermark() { + if (above_high_watermark_called_ || high_watermark_ == 0 || + wrapped_buffer_->length() <= high_watermark_) { + return; + } + + above_high_watermark_called_ = true; + above_high_watermark_(); +} + +} // namespace Buffer +} // namespace Envoy diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h new file mode 100644 index 0000000000000..fd7547ff44eec --- /dev/null +++ b/source/common/buffer/watermark_buffer.h @@ -0,0 +1,75 @@ +#pragma once + +#include + +#include "common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Buffer { + +// A wrapper for an underlying buffer which does watermark validation. +// The underlying buffer's ownership is transfered to the Watermark buffer. Each time the inner +// buffer is resized (written to or drained), the watermarks are checked. As the buffer size +// transitions from under the low watermark to above the high watermark, the above_high_watermark +// function is called one time. It will not be called again until the buffer is drained below the +// low watermark, at which point the below_low_watermark function is called. +// +// Because the internals of OwnedImpl::move() require accessing the underlying data, OwnedImpl is +// not compatible with generic Buffer::Impls. To allow compatability between WatermarkBuffer and +// OwnedImpl::move, WatermarkBuffer must implement LibEventInstance and is also not compatible +// with generic Buffer::Impls. +// +// WatermarkBuffer takes a pointer to a generic InstancePtr in the constructor to allow test mocks +// which overrides move() in any case. +class WatermarkBuffer : public LibEventInstance { +public: + WatermarkBuffer(InstancePtr&& buffer, std::function below_low_watermark, + std::function above_high_watermark) + : wrapped_buffer_(std::move(buffer)), below_low_watermark_(below_low_watermark), + above_high_watermark_(above_high_watermark) {} + + // Instance + void add(const void* data, uint64_t size) override; + void add(const std::string& data) override; + void add(const Instance& data) override; + void commit(RawSlice* iovecs, uint64_t num_iovecs) override; + void drain(uint64_t size) override; + uint64_t getRawSlices(RawSlice* out, uint64_t out_size) const override { + return wrapped_buffer_->getRawSlices(out, out_size); + } + uint64_t length() const override { return wrapped_buffer_->length(); } + void* linearize(uint32_t size) override { return wrapped_buffer_->linearize(size); } + void move(Instance& rhs) override; + void move(Instance& rhs, uint64_t length) override; + int read(int fd, uint64_t max_length) override; + uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; + ssize_t search(const void* data, uint64_t size, size_t start) const override { + return wrapped_buffer_->search(data, size, start); + } + int write(int fd) override; + Event::Libevent::BufferPtr& buffer() override { + return static_cast(*wrapped_buffer_).buffer(); + } + + void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); + +private: + void checkHighWatermark(); + void checkLowWatermark(); + + InstancePtr wrapped_buffer_; + std::function below_low_watermark_; + std::function above_high_watermark_; + + // Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to + // setWatermarks() the watermark callbacks will be called as described above. + uint32_t high_watermark_{0}; + uint32_t low_watermark_{0}; + // Tracks the latest state of watermark callbacks. + // True between the time above_high_watermark_ has been called until above_high_watermark_ has + // been called. + bool above_high_watermark_called_{false}; +}; + +} // namespace Buffer +} // namespace Envoy diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 84204d3258b3e..f4aa3bd298e2b 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -10,6 +10,7 @@ #include "envoy/network/listen_socket.h" #include "envoy/network/listener.h" +#include "common/buffer/buffer_impl.h" #include "common/event/file_event_impl.h" #include "common/event/signal_impl.h" #include "common/event/timer_impl.h" diff --git a/source/common/filter/auth/client_ssl.h b/source/common/filter/auth/client_ssl.h index 23373715c882d..7a2c625ac8aaa 100644 --- a/source/common/filter/auth/client_ssl.h +++ b/source/common/filter/auth/client_ssl.h @@ -119,6 +119,8 @@ class Instance : public Network::ReadFilter, public Network::ConnectionCallbacks // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: ConfigSharedPtr config_; diff --git a/source/common/filter/ratelimit.h b/source/common/filter/ratelimit.h index ece8d05850e09..751b53b87f253 100644 --- a/source/common/filter/ratelimit.h +++ b/source/common/filter/ratelimit.h @@ -82,6 +82,8 @@ class Instance : public Network::ReadFilter, // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} // RateLimit::RequestCallbacks void complete(LimitStatus status) override; diff --git a/source/common/filter/tcp_proxy.cc b/source/common/filter/tcp_proxy.cc index da4ec3eb5c30a..9a563b9b299e7 100644 --- a/source/common/filter/tcp_proxy.cc +++ b/source/common/filter/tcp_proxy.cc @@ -123,6 +123,58 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb config_->stats().downstream_cx_tx_bytes_buffered_}); } +void TcpProxy::readDisableUpstream(bool disable) { + upstream_connection_->readDisable(disable); + if (disable) { + read_callbacks_->upstreamHost() + ->cluster() + .stats() + .upstream_flow_control_paused_reading_total_.inc(); + } else { + read_callbacks_->upstreamHost() + ->cluster() + .stats() + .upstream_flow_control_resumed_reading_total_.inc(); + } +} + +void TcpProxy::readDisableDownstream(bool disable) { + read_callbacks_->connection().readDisable(disable); + if (disable) { + config_->stats().downstream_flow_control_paused_reading_total_.inc(); + } else { + config_->stats().downstream_flow_control_resumed_reading_total_.inc(); + } +} + +void TcpProxy::DownstreamCallbacks::onAboveWriteBufferHighWatermark() { + ASSERT(!on_high_watermark_called_); + on_high_watermark_called_ = true; + // If downstream has too much data buffered, stop reading on the upstream connection. + parent_.readDisableUpstream(true); +} + +void TcpProxy::DownstreamCallbacks::onBelowWriteBufferLowWatermark() { + ASSERT(on_high_watermark_called_); + on_high_watermark_called_ = false; + // The downstream buffer has been drained. Resume reading from upstream. + parent_.readDisableUpstream(false); +} + +void TcpProxy::UpstreamCallbacks::onAboveWriteBufferHighWatermark() { + ASSERT(!on_high_watermark_called_); + on_high_watermark_called_ = true; + // There's too much data buffered in the upstream write buffer, so stop reading. + parent_.readDisableDownstream(true); +} + +void TcpProxy::UpstreamCallbacks::onBelowWriteBufferLowWatermark() { + ASSERT(on_high_watermark_called_); + on_high_watermark_called_ = false; + // The upstream write buffer is drained. Resume reading. + parent_.readDisableDownstream(false); +} + Network::FilterStatus TcpProxy::initializeUpstreamConnection() { const std::string& cluster_name = config_->getRouteFromEntries(read_callbacks_->connection()); diff --git a/source/common/filter/tcp_proxy.h b/source/common/filter/tcp_proxy.h index d981472ea97e4..55b0c6e9347a7 100644 --- a/source/common/filter/tcp_proxy.h +++ b/source/common/filter/tcp_proxy.h @@ -30,7 +30,9 @@ namespace Filter { COUNTER(downstream_cx_tx_bytes_total) \ GAUGE (downstream_cx_tx_bytes_buffered) \ COUNTER(downstream_cx_total) \ - COUNTER(downstream_cx_no_route) + COUNTER(downstream_cx_no_route) \ + COUNTER(downstream_flow_control_paused_reading_total) \ + COUNTER(downstream_flow_control_resumed_reading_total) // clang-format on /** @@ -94,14 +96,22 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable, // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} std::list active_requests_; Http::ConnectionCallbacks* codec_callbacks_{}; diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index f12ef0dee4e74..a8ab3a0a7d048 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -279,6 +279,8 @@ class ConnectionManagerImpl : Logger::Loggable, // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: struct ActiveStream; diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 615dc6ef5ab53..5fdabe5fbcfff 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -76,6 +76,8 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: // Network::ConnectionCallbacks void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} ConnPoolImpl& parent_; CodecClientPtr codec_client_; diff --git a/source/common/http/http2/conn_pool.h b/source/common/http/http2/conn_pool.h index 89cd9be9c85b8..a0f17e087d79b 100644 --- a/source/common/http/http2/conn_pool.h +++ b/source/common/http/http2/conn_pool.h @@ -43,6 +43,8 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: // Network::ConnectionCallbacks void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} // CodecClientCallbacks void onStreamDestroy() override { parent_.onStreamDestroy(*this); } diff --git a/source/common/mongo/proxy.h b/source/common/mongo/proxy.h index 8be56ff6e87eb..7e0d7281c88de 100644 --- a/source/common/mongo/proxy.h +++ b/source/common/mongo/proxy.h @@ -107,6 +107,8 @@ class ProxyFilter : public Network::Filter, // Network::ConnectionCallbacks void onEvent(uint32_t event) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: struct ActiveQuery { diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 8699a2a249b8e..cde65f48235a1 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -45,6 +45,7 @@ envoy_cc_library( "//include/envoy/network:connection_interface", "//include/envoy/network:filter_interface", "//source/common/buffer:buffer_lib", + "//source/common/buffer:watermark_buffer_lib", "//source/common/common:assert_lib", "//source/common/common:empty_string", "//source/common/common:enum_to_int", diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 746f84707ab9a..5b79c92abde9f 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -59,8 +59,10 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, Address::InstanceConstSharedPtr local_address) : filter_manager_(*this, *this), remote_address_(remote_address), local_address_(local_address), read_buffer_(dispatcher.getBufferFactory().create()), - write_buffer_(dispatcher.getBufferFactory().create()), dispatcher_(dispatcher), fd_(fd), - id_(++next_global_id_) { + write_buffer_(Buffer::InstancePtr{dispatcher.getBufferFactory().create()}, + [this]() -> void { this->onLowWatermark(); }, + [this]() -> void { this->onHighWatermark(); }), + dispatcher_(dispatcher), fd_(fd), id_(++next_global_id_) { // Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM // condition and just crash. @@ -100,7 +102,7 @@ void ConnectionImpl::close(ConnectionCloseType type) { return; } - uint64_t data_to_write = write_buffer_->length(); + uint64_t data_to_write = write_buffer_.length(); ENVOY_CONN_LOG(debug, "closing data_to_write={} type={}", *this, data_to_write, enumToInt(type)); if (data_to_write == 0 || type == ConnectionCloseType::NoFlush) { if (data_to_write > 0) { @@ -209,15 +211,26 @@ void ConnectionImpl::readDisable(bool disable) { // TODO(mattklein123): Potentially support half-closed TCP connections. It's unclear if this is // required for any scenarios in which Envoy will be used (I don't know of any). if (disable) { + if (!read_enabled) { + ++read_disable_count_; + return; + } ASSERT(read_enabled); state_ &= ~InternalState::ReadEnabled; file_event_->setEnabled(Event::FileReadyType::Write | Event::FileReadyType::Closed); } else { + if (read_disable_count_ > 0) { + --read_disable_count_; + return; + } ASSERT(!read_enabled); state_ |= InternalState::ReadEnabled; // We never ask for both early close and read at the same time. If we are reading, we want to // consume all available data. file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); + // If the connection has data buffered there's no guarantee there's also data in the kernel + // which will kick off the filter chain. Instead fake an event to make sure the buffered data + // gets processed regardless. if (read_buffer_->length() > 0) { file_event_->activate(Event::FileReadyType::Read); } @@ -256,7 +269,7 @@ void ConnectionImpl::write(Buffer::Instance& data) { // ever changed, read the comment in Ssl::ConnectionImpl::doWriteToSocket() VERY carefully. // That code assumes that we never change existing write_buffer_ chain elements between calls // to SSL_write(). That code will have to change if we ever copy here. - write_buffer_->move(data); + write_buffer_.move(data); if (!(state_ & InternalState::Connecting)) { file_event_->activate(Event::FileReadyType::Write); @@ -264,6 +277,46 @@ void ConnectionImpl::write(Buffer::Instance& data) { } } +void ConnectionImpl::setBufferLimits(uint32_t limit) { + read_buffer_limit_ = limit; + + // Due to the fact that writes to the connection and flushing data from the connection are done + // asynchronously, we have the option of either setting the watermarks aggressively, and regularly + // enabling/disabling reads from the socket, or allowing more data, but then not triggering + // based on watermarks until 2x the data is buffered in the common case. Given these are all soft + // limits we err on the side of buffering more triggering watermark callbacks less often. + // + // Given the current implementation for straight up TCP proxying, the common case is reading + // |limit| bytes through the socket, passing |limit| bytes to the connection (triggering the high + // watermarks) and the immediately draining |limit| bytes to the socket (triggering the low + // watermarks). We avoid this by setting the high watermark to limit + 1 so a single read will + // not trigger watermarks if the socket is not blocked. + // + // If the connection class is changed to write to the buffer and flush to the socket in the same + // stack then instead of checking watermarks after the write and again after the flush it can + // check once after both operations complete. At that point it would be better to change the high + // watermark from |limit + 1| to |limit| as the common case (move |limit| bytes, flush |limit| + // bytes) would not trigger watermarks but a blocked socket (move |limit| bytes, flush 0 bytes) + // would result in respecting the exact buffer limit. + if (limit > 0) { + write_buffer_.setWatermarks(limit / 2, limit + 1); + } +} + +void ConnectionImpl::onLowWatermark() { + ENVOY_CONN_LOG(debug, "onBelowWriteBufferLowWatermark", *this); + for (ConnectionCallbacks* callback : callbacks_) { + callback->onBelowWriteBufferLowWatermark(); + } +} + +void ConnectionImpl::onHighWatermark() { + ENVOY_CONN_LOG(debug, "onAboveWriteBufferHighWatermark", *this); + for (ConnectionCallbacks* callback : callbacks_) { + callback->onAboveWriteBufferHighWatermark(); + } +} + void ConnectionImpl::onFileEvent(uint32_t events) { ENVOY_CONN_LOG(trace, "socket event: {}", *this, events); @@ -350,11 +403,11 @@ ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { PostIoAction action; uint64_t bytes_written = 0; do { - if (write_buffer_->length() == 0) { + if (write_buffer_.length() == 0) { action = PostIoAction::KeepOpen; break; } - int rc = write_buffer_->write(fd_); + int rc = write_buffer_.write(fd_); ENVOY_CONN_LOG(trace, "write returns: {}", *this, rc); if (rc == -1) { ENVOY_CONN_LOG(trace, "write error: {}", *this, errno); @@ -402,7 +455,7 @@ void ConnectionImpl::onWriteReady() { } IoResult result = doWriteToSocket(); - uint64_t new_buffer_size = write_buffer_->length(); + uint64_t new_buffer_size = write_buffer_.length(); updateWriteBufferStats(result.bytes_processed_, new_buffer_size); if (result.action_ == PostIoAction::Close) { diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 64688fb913bee..225e47fdcb24b 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -8,7 +8,7 @@ #include "envoy/network/connection.h" -#include "common/buffer/buffer_impl.h" +#include "common/buffer/watermark_buffer.h" #include "common/common/logger.h" #include "common/event/dispatcher_impl.h" #include "common/event/libevent.h" @@ -70,8 +70,8 @@ class ConnectionImpl : public virtual Connection, Ssl::Connection* ssl() override { return nullptr; } State state() override; void write(Buffer::Instance& data) override; - void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; } - uint32_t readBufferLimit() const override { return read_buffer_limit_; } + void setBufferLimits(uint32_t limit) override; + uint32_t bufferLimit() const override { return read_buffer_limit_; } // Network::BufferSource Buffer::Instance& getReadBuffer() override { return *read_buffer_; } @@ -99,11 +99,14 @@ class ConnectionImpl : public virtual Connection, // Reconsider how to make fairness happen. void setReadBufferReady() { file_event_->activate(Event::FileReadyType::Read); } + void onLowWatermark(); + void onHighWatermark(); + FilterManagerImpl filter_manager_; Address::InstanceConstSharedPtr remote_address_; Address::InstanceConstSharedPtr local_address_; Buffer::InstancePtr read_buffer_; - Buffer::InstancePtr write_buffer_; + Buffer::WatermarkBuffer write_buffer_; uint32_t read_buffer_limit_ = 0; private: @@ -138,6 +141,10 @@ class ConnectionImpl : public virtual Connection, uint64_t last_read_buffer_size_{}; uint64_t last_write_buffer_size_{}; std::unique_ptr buffer_stats_; + // Tracks the number of times reads have been disabled. If N different components call + // readDisabled(true) this allows the connection to only resume reads when readDisabled(false) + // has been called N times. + uint32_t read_disable_count_{0}; }; /** diff --git a/source/common/network/listener_impl.cc b/source/common/network/listener_impl.cc index b17b4d4e19081..ed90c23d6749c 100644 --- a/source/common/network/listener_impl.cc +++ b/source/common/network/listener_impl.cc @@ -111,7 +111,7 @@ void ListenerImpl::errorCallback(evconnlistener*, void*) { void ListenerImpl::newConnection(int fd, Address::InstanceConstSharedPtr remote_address, Address::InstanceConstSharedPtr local_address) { ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address, local_address)); - new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_); + new_connection->setBufferLimits(options_.per_connection_buffer_limit_bytes_); cb_.onNewConnection(std::move(new_connection)); } @@ -120,7 +120,7 @@ void SslListenerImpl::newConnection(int fd, Address::InstanceConstSharedPtr remo ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address, local_address, ssl_ctx_, Ssl::ConnectionImpl::InitialState::Server)); - new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_); + new_connection->setBufferLimits(options_.per_connection_buffer_limit_bytes_); cb_.onNewConnection(std::move(new_connection)); } diff --git a/source/common/redis/conn_pool_impl.h b/source/common/redis/conn_pool_impl.h index 5937ebd00e874..af2022a6c7c83 100644 --- a/source/common/redis/conn_pool_impl.h +++ b/source/common/redis/conn_pool_impl.h @@ -83,6 +83,8 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} Upstream::HostConstSharedPtr host_; Network::ClientConnectionPtr connection_; @@ -125,6 +127,8 @@ class InstanceImpl : public Instance { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} ThreadLocalPool& parent_; Upstream::HostConstSharedPtr host_; diff --git a/source/common/redis/proxy_filter.h b/source/common/redis/proxy_filter.h index c8437891d5d7e..210a40f5e5e0b 100644 --- a/source/common/redis/proxy_filter.h +++ b/source/common/redis/proxy_filter.h @@ -80,6 +80,8 @@ class ProxyFilter : public Network::ReadFilter, // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} // Redis::DecoderCallbacks void onRespValue(RespValuePtr&& value) override; diff --git a/source/common/ssl/connection_impl.cc b/source/common/ssl/connection_impl.cc index 220550561d732..f263688b7d278 100644 --- a/source/common/ssl/connection_impl.cc +++ b/source/common/ssl/connection_impl.cc @@ -164,7 +164,7 @@ Network::ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { } } - uint64_t original_buffer_length = write_buffer_->length(); + uint64_t original_buffer_length = write_buffer_.length(); uint64_t total_bytes_written = 0; bool keep_writing = true; while ((original_buffer_length != total_bytes_written) && keep_writing) { @@ -175,7 +175,7 @@ Network::ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { // of iterations of this loop, either by pure iterations, bytes written, etc. const uint64_t MAX_SLICES = 32; Buffer::RawSlice slices[MAX_SLICES]; - uint64_t num_slices = write_buffer_->getRawSlices(slices, MAX_SLICES); + uint64_t num_slices = write_buffer_.getRawSlices(slices, MAX_SLICES); uint64_t inner_bytes_written = 0; for (uint64_t i = 0; (i < num_slices) && (original_buffer_length != total_bytes_written); i++) { @@ -210,7 +210,7 @@ Network::ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { // Draining must be done within the inner loop, otherwise we will keep getting the same slices // at the beginning of the buffer. if (inner_bytes_written > 0) { - write_buffer_->drain(inner_bytes_written); + write_buffer_.drain(inner_bytes_written); } } diff --git a/source/common/stats/statsd.h b/source/common/stats/statsd.h index 9a35fe4388638..ddafd502f8f8f 100644 --- a/source/common/stats/statsd.h +++ b/source/common/stats/statsd.h @@ -102,6 +102,8 @@ class TcpStatsdSink : public Sink { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} TcpStatsdSink& parent_; Event::Dispatcher& dispatcher_; diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index ab700c6cc411e..5a8cc0f1f50fc 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -176,6 +176,8 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} HttpHealthCheckerImpl& parent_; Http::CodecClientPtr client_; @@ -277,6 +279,8 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { // Network::ConnectionCallbacks void onEvent(uint32_t events) override { parent_.onEvent(events); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data) override { @@ -354,6 +358,8 @@ class RedisHealthCheckerImpl : public HealthCheckerImplBase { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} RedisHealthCheckerImpl& parent_; Redis::ConnPool::ClientPtr client_; diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 065147c022969..5cfd0f236ae15 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -44,7 +44,7 @@ HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& clu Network::ClientConnectionPtr connection = cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address) : dispatcher.createClientConnection(address); - connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes()); + connection->setBufferLimits(cluster.perConnectionBufferLimitBytes()); return connection; } diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index 09cd27d5b2568..076184187eb9f 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -121,6 +121,8 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable { listener_.removeConnection(*this); } } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} ActiveListener& listener_; Network::ConnectionPtr connection_; diff --git a/test/common/buffer/BUILD b/test/common/buffer/BUILD index aaedc4f077f72..1c5b68a363452 100644 --- a/test/common/buffer/BUILD +++ b/test/common/buffer/BUILD @@ -8,6 +8,15 @@ load( envoy_package() +envoy_cc_test( + name = "watermark_buffer_test", + srcs = ["watermark_buffer_test.cc"], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/buffer:watermark_buffer_lib", + ], +) + envoy_cc_test( name = "zero_copy_input_stream_test", srcs = ["zero_copy_input_stream_test.cc"], diff --git a/test/common/buffer/watermark_buffer_test.cc b/test/common/buffer/watermark_buffer_test.cc new file mode 100644 index 0000000000000..937c077d849af --- /dev/null +++ b/test/common/buffer/watermark_buffer_test.cc @@ -0,0 +1,185 @@ +#include "common/buffer/buffer_impl.h" +#include "common/buffer/watermark_buffer.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Buffer { +namespace { + +const char TEN_BYTES[] = "0123456789"; + +class WatermarkBufferTest : public testing::Test { +public: + WatermarkBufferTest() { buffer_.setWatermarks(5, 10); } + + Buffer::WatermarkBuffer buffer_{InstancePtr{new OwnedImpl()}, + [&]() -> void { ++times_low_watermark_called_; }, + [&]() -> void { ++times_high_watermark_called_; }}; + uint32_t times_low_watermark_called_{0}; + uint32_t times_high_watermark_called_{0}; +}; + +TEST_F(WatermarkBufferTest, AddChar) { + buffer_.add(TEN_BYTES, 10); + EXPECT_EQ(0, times_high_watermark_called_); + buffer_.add("a", 1); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(11, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, AddString) { + buffer_.add(std::string(TEN_BYTES)); + EXPECT_EQ(0, times_high_watermark_called_); + buffer_.add(std::string("a")); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(11, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, AddBuffer) { + OwnedImpl first(TEN_BYTES); + buffer_.add(first); + EXPECT_EQ(0, times_high_watermark_called_); + OwnedImpl second("a"); + buffer_.add(second); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(11, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, Commit) { + buffer_.add(TEN_BYTES, 10); + EXPECT_EQ(0, times_high_watermark_called_); + RawSlice out; + buffer_.reserve(10, &out, 1); + memcpy(out.mem_, &TEN_BYTES[0], 10); + out.len_ = 10; + buffer_.commit(&out, 1); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(20, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, Drain) { + // Draining from above to below the low watermark does nothing if the high + // watermark never got hit. + buffer_.add(TEN_BYTES, 10); + buffer_.drain(10); + EXPECT_EQ(0, times_high_watermark_called_); + EXPECT_EQ(0, times_low_watermark_called_); + + // Go above the high watermark then drain down to just at the low watermark. + buffer_.add(TEN_BYTES, 11); + buffer_.drain(6); + EXPECT_EQ(5, buffer_.length()); + EXPECT_EQ(0, times_low_watermark_called_); + + // Now drain below. + buffer_.drain(1); + EXPECT_EQ(1, times_low_watermark_called_); + + // Going back above should trigger the high again + buffer_.add(TEN_BYTES, 10); + EXPECT_EQ(2, times_high_watermark_called_); +} + +TEST_F(WatermarkBufferTest, MoveFullBuffer) { + buffer_.add(TEN_BYTES, 10); + OwnedImpl data("a"); + + EXPECT_EQ(0, times_high_watermark_called_); + buffer_.move(data); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(11, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, MoveOneByte) { + buffer_.add(TEN_BYTES, 9); + OwnedImpl data("ab"); + + buffer_.move(data, 1); + EXPECT_EQ(0, times_high_watermark_called_); + EXPECT_EQ(10, buffer_.length()); + + buffer_.move(data, 1); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(11, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, WatermarkFdFunctions) { + int pipe_fds[2] = {0, 0}; + ASSERT_EQ(0, pipe(pipe_fds)); + + buffer_.add(TEN_BYTES, 10); + buffer_.add(TEN_BYTES, 10); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(0, times_low_watermark_called_); + + int bytes_written_total = 0; + while (bytes_written_total < 20) { + int bytes_written = buffer_.write(pipe_fds[1]); + if (bytes_written < 0) { + ASSERT_EQ(EAGAIN, errno); + } else { + bytes_written_total += bytes_written; + } + } + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(1, times_low_watermark_called_); + EXPECT_EQ(0, buffer_.length()); + + int bytes_read_total = 0; + while (bytes_read_total < 20) { + bytes_read_total += buffer_.read(pipe_fds[0], 20); + } + EXPECT_EQ(2, times_high_watermark_called_); + EXPECT_EQ(20, buffer_.length()); +} + +TEST_F(WatermarkBufferTest, MoveWatermarks) { + buffer_.add(TEN_BYTES, 9); + EXPECT_EQ(0, times_high_watermark_called_); + buffer_.setWatermarks(1, 9); + EXPECT_EQ(0, times_high_watermark_called_); + buffer_.setWatermarks(1, 8); + EXPECT_EQ(1, times_high_watermark_called_); + + buffer_.setWatermarks(9, 20); + EXPECT_EQ(0, times_low_watermark_called_); + buffer_.setWatermarks(10, 20); + EXPECT_EQ(1, times_low_watermark_called_); + buffer_.setWatermarks(8, 20); + buffer_.setWatermarks(10, 20); + EXPECT_EQ(1, times_low_watermark_called_); +} + +TEST_F(WatermarkBufferTest, GetRawSlices) { + buffer_.add(TEN_BYTES, 10); + + RawSlice slices[2]; + ASSERT_EQ(1, buffer_.getRawSlices(&slices[0], 2)); + EXPECT_EQ(10, slices[0].len_); + EXPECT_EQ(0, memcmp(slices[0].mem_, &TEN_BYTES[0], 10)); + + void* data_pointer = buffer_.linearize(10); + EXPECT_EQ(data_pointer, slices[0].mem_); +} + +TEST_F(WatermarkBufferTest, Search) { + buffer_.add(TEN_BYTES, 10); + + EXPECT_EQ(1, buffer_.search(&TEN_BYTES[1], 2, 0)); + + EXPECT_EQ(-1, buffer_.search(&TEN_BYTES[1], 2, 5)); +} + +TEST_F(WatermarkBufferTest, MoveBack) { + buffer_.add(TEN_BYTES, 10); + OwnedImpl data("a"); + + EXPECT_EQ(0, times_high_watermark_called_); + buffer_.move(data); + data.move(buffer_); +} + +} // namespace +} // namespace Buffer +} // namespace Envoy diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index fbabd622c4be8..c212bfde443cd 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -208,7 +208,7 @@ TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) { ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192)); - EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setReadBufferLimit(8192)); + EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setBufferLimits(8192)); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index ea4e8ab40952e..dc84fbea70cdf 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -140,7 +140,7 @@ TEST_F(Http2ConnPoolImplTest, VerifyConnectionTimingStats) { TEST_F(Http2ConnPoolImplTest, VerifyBufferLimits) { expectClientCreate(); EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192)); - EXPECT_CALL(*test_clients_.back().connection_, setReadBufferLimit(8192)); + EXPECT_CALL(*test_clients_.back().connection_, setBufferLimits(8192)); ActiveTestRequest r1(*this, 0); EXPECT_CALL(r1.inner_encoder_, encodeHeaders(_, true)); diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 9e826adbcaa86..2a49fa62cc65b 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -19,6 +19,7 @@ #include "test/test_common/environment.h" #include "test/test_common/network_utility.h" #include "test/test_common/printers.h" +#include "test/test_common/utility.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -111,14 +112,13 @@ class ConnectionImplTest : public testing::TestWithParam { MockBufferFactory* factory = new MockBufferFactory; dispatcher_.reset(new Event::DispatcherImpl(Buffer::FactoryPtr{factory})); // The first call to create a client session will get a MockBuffer. - // Expect one other call to create a server session which will by default - // get a normal OwnedImpl. + // Other calls for server sessions will by default get a normal OwnedImpl. ON_CALL(*factory, create_()).WillByDefault(Invoke([&]() -> Buffer::Instance* { return new Buffer::OwnedImpl; })); - // By default, expect 4 buffers to be created - the client and server read and write buffers. + // Create a mock client write buffer for the first client connection created. EXPECT_CALL(*factory, create_()) - .Times(4) + .Times(AnyNumber()) .WillOnce(Invoke([&]() -> Buffer::Instance* { return new MockBuffer; // client read buffer. })) @@ -238,6 +238,73 @@ TEST_P(ConnectionImplTest, BufferStats) { dispatcher_->run(Event::Dispatcher::RunType::Block); } +// Ensure the new counter logic in ReadDisable avoids tripping asserts in ReadDisable guarding +// against actual enabling twice in a row. +TEST_P(ConnectionImplTest, ReadDisable) { + setUpBasicConnection(); + + client_connection_->readDisable(true); + client_connection_->readDisable(false); + + client_connection_->readDisable(true); + client_connection_->readDisable(true); + client_connection_->readDisable(false); + client_connection_->readDisable(false); + + client_connection_->readDisable(true); + client_connection_->readDisable(true); + client_connection_->readDisable(false); + client_connection_->readDisable(true); + client_connection_->readDisable(false); + client_connection_->readDisable(false); + + disconnect(); +} + +// Test that as watermark levels are changed, the appropriate callbacks are triggered. +TEST_P(ConnectionImplTest, Watermarks) { + useMockBuffer(); + + setUpBasicConnection(); + + // Stick 5 bytes in the connection buffer. + std::unique_ptr buffer(new Buffer::OwnedImpl("hello")); + int buffer_len = buffer->length(); + EXPECT_CALL(*client_write_buffer_, write(_)) + .WillOnce(Invoke(client_write_buffer_, &MockBuffer::failWrite)); + client_write_buffer_->move(*buffer); + + { + // Go from watermarks being off to being above the high watermark. + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0); + client_connection_->setBufferLimits(buffer_len - 3); + } + + { + // Go from above the high watermark to in between both. + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()).Times(0); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0); + client_connection_->setBufferLimits(buffer_len + 1); + } + + { + // Go from above the high watermark to below the low watermark. + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()).Times(0); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()); + client_connection_->setBufferLimits(buffer_len * 3); + } + + { + // Go back in between and verify neither callback is called. + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()).Times(0); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0); + client_connection_->setBufferLimits(buffer_len * 2); + } + + disconnect(); +} + // Write some data to the connection. It will automatically attempt to flush // it to the upstream file descriptor via a write() call to buffer_, which is // configured to succeed and accept all bytes read. @@ -264,6 +331,133 @@ TEST_P(ConnectionImplTest, BasicWrite) { disconnect(); } +// Similar to BasicWrite, only with watermarks set. +TEST_P(ConnectionImplTest, WriteWithWatermarks) { + useMockBuffer(); + + setUpBasicConnection(); + + connect(); + + client_connection_->setBufferLimits(2); + + std::string data_to_write = "hello world"; + Buffer::OwnedImpl first_buffer_to_write(data_to_write); + std::string data_written; + EXPECT_CALL(*client_write_buffer_, move(_)) + .WillRepeatedly(DoAll(AddBufferToStringWithoutDraining(&data_written), + Invoke(client_write_buffer_, &MockBuffer::baseMove))); + EXPECT_CALL(*client_write_buffer_, write(_)) + .WillOnce(Invoke(client_write_buffer_, &MockBuffer::trackWrites)); + // The write() call on the connection will buffer enough data to bring the connection above the + // high watermark but the subsequent drain immediately brings it back below. + // A nice future performance optimization would be to latch if the socket is writable in the + // connection_impl, and try an immediate drain inside of write() to avoid thrashing here. + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()); + client_connection_->write(first_buffer_to_write); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(data_to_write, data_written); + + // Now do the write again, but this time configure buffer_ to reject the write + // with errno set to EAGAIN via failWrite(). This should result in going above the high + // watermark and not returning. + Buffer::OwnedImpl second_buffer_to_write(data_to_write); + EXPECT_CALL(*client_write_buffer_, move(_)) + .WillRepeatedly(DoAll(AddBufferToStringWithoutDraining(&data_written), + Invoke(client_write_buffer_, &MockBuffer::baseMove))); + EXPECT_CALL(*client_write_buffer_, write(_)) + .WillOnce(Invoke(client_write_buffer_, &MockBuffer::failWrite)); + // The write() call on the connection will buffer enough data to bring the connection above the + // high watermark and as the data will not flush it should not return below the watermark. + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0); + client_connection_->write(second_buffer_to_write); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + + // Clean up the connection. The close() will attempt to flush. The call to + // write() will succeed, bringing the connection back under the low watermark. + EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::LocalClose)); + EXPECT_CALL(*client_write_buffer_, write(_)) + .WillOnce(Invoke(client_write_buffer_, &MockBuffer::trackWrites)); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(1); + client_connection_->close(ConnectionCloseType::NoFlush); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); +} + +// Read and write random bytes and ensure we don't encounter issues. +TEST_P(ConnectionImplTest, WatermarkFuzzing) { + useMockBuffer(); + setUpBasicConnection(); + + connect(); + client_connection_->setBufferLimits(10); + + TestRandomGenerator rand; + int bytes_buffered = 0; + int new_bytes_buffered = 0; + + bool is_below = true; + bool is_above = false; + + ON_CALL(*client_write_buffer_, write(_)) + .WillByDefault(testing::Invoke(client_write_buffer_, &MockBuffer::failWrite)); + ON_CALL(*client_write_buffer_, drain(_)) + .WillByDefault(testing::Invoke(client_write_buffer_, &MockBuffer::baseDrain)); + EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AnyNumber()); + + // Randomly write 1-20 bytes and read 1-30 bytes per loop. + for (int i = 0; i < 50; ++i) { + // The bytes to read this loop. + int bytes_to_write = rand.random() % 20 + 1; + // The bytes buffered at the begining of this loop. + bytes_buffered = new_bytes_buffered; + // Bytes to flush upstream. + int bytes_to_flush = std::min(rand.random() % 30 + 1, bytes_to_write + bytes_buffered); + // The number of bytes buffered at the end of this loop. + new_bytes_buffered = bytes_buffered + bytes_to_write - bytes_to_flush; + ENVOY_LOG_MISC(trace, + "Loop iteration {} bytes_to_write {} bytes_to_flush {} bytes_buffered is {} and " + "will be be {}", + i, bytes_to_write, bytes_to_flush, bytes_buffered, new_bytes_buffered); + + std::string data(bytes_to_write, 'a'); + Buffer::OwnedImpl buffer_to_write(data); + + // If the current bytes buffered plus the bytes we write this loop go over + // the watermark and we're not currently above, we will get a callback for + // going above. + if (bytes_to_write + bytes_buffered > 11 && is_below) { + ENVOY_LOG_MISC(trace, "Expect onAboveWriteBufferHighWatermark"); + EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()); + is_below = false; + is_above = true; + } + // If after the bytes are flushed upstream the number of bytes remaining is + // below the low watermark and the bytes were not previously below the low + // watermark, expect the callback for going below. + if (new_bytes_buffered < 5 && is_above) { + ENVOY_LOG_MISC(trace, "Expect onBelowWriteBufferLowWatermark"); + EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()); + is_below = true; + is_above = false; + } + + // Do the actual work. Write |buffer_to_write| bytes to the connection and + // drain |bytes_to_flush| before having the buffer failWrite() + EXPECT_CALL(*client_write_buffer_, move(_)) + .WillOnce(Invoke(client_write_buffer_, &MockBuffer::baseMove)); + EXPECT_CALL(*client_write_buffer_, write(_)) + .WillOnce(DoAll(Invoke([&](int) -> void { client_write_buffer_->drain(bytes_to_flush); }), + Return(bytes_to_flush))) + .WillRepeatedly(testing::Invoke(client_write_buffer_, &MockBuffer::failWrite)); + client_connection_->write(buffer_to_write); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + disconnect(); +} + class ReadBufferLimitTest : public ConnectionImplTest { public: void readBufferLimitTest(uint32_t read_buffer_limit, uint32_t expected_chunk_size) { @@ -285,7 +479,7 @@ class ReadBufferLimitTest : public ConnectionImplTest { server_connection_ = std::move(conn); server_connection_->addReadFilter(read_filter_); EXPECT_EQ("", server_connection_->nextProtocol()); - EXPECT_EQ(read_buffer_limit, server_connection_->readBufferLimit()); + EXPECT_EQ(read_buffer_limit, server_connection_->bufferLimit()); })); uint32_t filter_seen = 0; diff --git a/test/common/ssl/connection_impl_test.cc b/test/common/ssl/connection_impl_test.cc index 8ebd823320aaf..e8b3af97ba553 100644 --- a/test/common/ssl/connection_impl_test.cc +++ b/test/common/ssl/connection_impl_test.cc @@ -465,7 +465,7 @@ class SslReadBufferLimitTest : public SslCertsTest, server_connection_ = std::move(conn); server_connection_->addReadFilter(read_filter_); EXPECT_EQ("", server_connection_->nextProtocol()); - EXPECT_EQ(read_buffer_limit, server_connection_->readBufferLimit()); + EXPECT_EQ(read_buffer_limit, server_connection_->bufferLimit()); })); uint32_t filter_seen = 0; @@ -534,7 +534,7 @@ class SslReadBufferLimitTest : public SslCertsTest, server_connection_ = std::move(conn); server_connection_->addReadFilter(read_filter_); EXPECT_EQ("", server_connection_->nextProtocol()); - EXPECT_EQ(read_buffer_limit, server_connection_->readBufferLimit()); + EXPECT_EQ(read_buffer_limit, server_connection_->bufferLimit()); })); EXPECT_CALL(*read_filter_, onNewConnection()); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index f7a1b39a010b5..3b5f885f8ee55 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -402,7 +402,7 @@ TEST_F(ClusterManagerImplTest, VerifyBufferLimits) { Json::ObjectSharedPtr loader = Json::Factory::loadFromString(json); create(*loader); Network::MockClientConnection* connection = new NiceMock(); - EXPECT_CALL(*connection, setReadBufferLimit(8192)); + EXPECT_CALL(*connection, setBufferLimits(8192)); EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_)).WillOnce(Return(connection)); auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1"); EXPECT_EQ(connection, conn_data.connection_.get()); diff --git a/test/config/integration/tcp_proxy.json b/test/config/integration/tcp_proxy.json index d09b8c925af1b..555a4a191873c 100644 --- a/test/config/integration/tcp_proxy.json +++ b/test/config/integration/tcp_proxy.json @@ -3,8 +3,7 @@ { "address": "tcp://{{ ip_loopback_address }}:0", "filters": [ - { "type": "read", "name": - "tcp_proxy", + { "type": "read", "name": "tcp_proxy", "config": { "stat_prefix": "test_tcp", "route_config": { @@ -20,6 +19,25 @@ }, { "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, + "filters": [ + { "type": "read", "name": "tcp_proxy", + "config": { + "stat_prefix": "tcp_with_write_limits", + "route_config": { + "routes": [ + { + "cluster": "cluster_with_buffer_limits" + } + ] + } + } + } + ] + }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, "ssl_context": { "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/servercert.pem", @@ -34,7 +52,7 @@ "route_config": { "routes": [ { - "cluster": "cluster_1" + "cluster": "cluster_with_buffer_limits" } ] } @@ -69,6 +87,14 @@ "lb_type": "round_robin", "dns_lookup_family": "{{ dns_lookup_family }}", "hosts": [{"url": "tcp://localhost:4"}] + }, + { + "name": "cluster_with_buffer_limits", + "connect_timeout_ms": 5000, + "type": "static", + "lb_type": "round_robin", + "per_connection_buffer_limit_bytes": 1024, + "hosts": [{"url": "tcp://{{ ip_loopback_address }}:{{ cluster_with_buffer_limits }}"}] }] } } diff --git a/test/integration/BUILD b/test/integration/BUILD index 3f7b7e0f8d05c..e33a7962e8b49 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -162,6 +162,7 @@ envoy_cc_test_library( "//source/server/config/network:redis_proxy_lib", "//source/server/config/network:tcp_proxy_lib", "//source/server/http:health_check_lib", + "//test/mocks/buffer:buffer_mocks", "//test/mocks/upstream:upstream_mocks", "//test/test_common:environment_lib", "//test/test_common:network_utility_lib", diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index dd170a9bc0246..0d1b22f1074a2 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -89,10 +89,12 @@ class QueuedConnectionWrapper : public Network::ConnectionCallbacks { Network::Connection& connection() const { return connection_; } // Network::ConnectionCallbacks - void onEvent(uint32_t events) { + void onEvent(uint32_t events) override { RELEASE_ASSERT(parented_ || (!(events & Network::ConnectionEvent::RemoteClose) && !(events & Network::ConnectionEvent::LocalClose))); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: Network::Connection& connection_; @@ -115,6 +117,8 @@ class FakeConnectionBase : public Network::ConnectionCallbacks { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} protected: FakeConnectionBase(QueuedConnectionWrapperPtr connection_wrapper) diff --git a/test/integration/integration.cc b/test/integration/integration.cc index be18c8dc2a2e8..786d7a0a873a1 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -15,6 +15,7 @@ #include "common/api/api_impl.h" #include "common/buffer/buffer_impl.h" #include "common/common/assert.h" +#include "common/network/connection_impl.h" #include "common/network/utility.h" #include "common/upstream/upstream_impl.h" @@ -28,6 +29,10 @@ #include "gtest/gtest.h" #include "spdlog/spdlog.h" +using testing::AnyNumber; +using testing::Invoke; +using testing::_; + namespace Envoy { IntegrationStreamDecoder::IntegrationStreamDecoder(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} @@ -176,12 +181,28 @@ void IntegrationCodecClient::ConnectionCallbacks::onEvent(uint32_t events) { } } -IntegrationTcpClient::IntegrationTcpClient(Event::Dispatcher& dispatcher, uint32_t port, +IntegrationTcpClient::IntegrationTcpClient(Event::Dispatcher& dispatcher, + MockBufferFactory& factory, uint32_t port, Network::Address::IpVersion version) : payload_reader_(new WaitForPayloadReader(dispatcher)), callbacks_(new ConnectionCallbacks(*this)) { + EXPECT_CALL(factory, create_()) + .Times(2) + .WillOnce(Invoke([&]() -> Buffer::Instance* { + return new Buffer::OwnedImpl; // client read buffer. + })) + .WillOnce(Invoke([&]() -> Buffer::Instance* { + client_write_buffer_ = new MockBuffer; + return client_write_buffer_; + })); + connection_ = dispatcher.createClientConnection(Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port))); + + ON_CALL(*client_write_buffer_, drain(_)) + .WillByDefault(testing::Invoke(client_write_buffer_, &MockBuffer::baseDrain)); + EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AnyNumber()); + connection_->addConnectionCallbacks(*callbacks_); connection_->addReadFilter(payload_reader_); connection_->connect(); @@ -205,9 +226,15 @@ void IntegrationTcpClient::waitForDisconnect() { void IntegrationTcpClient::write(const std::string& data) { Buffer::OwnedImpl buffer(data); + EXPECT_CALL(*client_write_buffer_, move(_)).Times(1); + EXPECT_CALL(*client_write_buffer_, write(_)).Times(1); + + int bytes_expected = client_write_buffer_->bytes_written() + data.size(); + connection_->write(buffer); - connection_->dispatcher().run(Event::Dispatcher::RunType::NonBlock); - // NOTE: We should run blocking until all the body data is flushed. + while (client_write_buffer_->bytes_written() != bytes_expected) { + connection_->dispatcher().run(Event::Dispatcher::RunType::NonBlock); + } } void IntegrationTcpClient::ConnectionCallbacks::onEvent(uint32_t events) { @@ -219,7 +246,8 @@ void IntegrationTcpClient::ConnectionCallbacks::onEvent(uint32_t events) { BaseIntegrationTest::BaseIntegrationTest(Network::Address::IpVersion version) : api_(new Api::Impl(std::chrono::milliseconds(10000))), - dispatcher_(api_->allocateDispatcher()), + mock_buffer_factory_(new MockBufferFactory), + dispatcher_(new Event::DispatcherImpl(Buffer::FactoryPtr{mock_buffer_factory_})), default_log_level_(TestEnvironment::getOptions().logLevel()), version_(version) { // This is a hack, but there are situations where we disconnect fake upstream connections and // then we expect the server connection pool to get the disconnect before the next test starts. @@ -228,6 +256,9 @@ BaseIntegrationTest::BaseIntegrationTest(Network::Address::IpVersion version) // complex test hooks to the server and/or spin waiting on stats, neither of which I think are // necessary right now. std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ON_CALL(*mock_buffer_factory_, create_()).WillByDefault(Invoke([&]() -> Buffer::Instance* { + return new Buffer::OwnedImpl; + })); } BaseIntegrationTest::~BaseIntegrationTest() {} @@ -256,7 +287,8 @@ BaseIntegrationTest::makeHttpConnection(Network::ClientConnectionPtr&& conn, } IntegrationTcpClientPtr BaseIntegrationTest::makeTcpConnection(uint32_t port) { - return IntegrationTcpClientPtr{new IntegrationTcpClient(*dispatcher_, port, version_)}; + return IntegrationTcpClientPtr{ + new IntegrationTcpClient(*dispatcher_, *mock_buffer_factory_, port, version_)}; } void BaseIntegrationTest::registerPort(const std::string& key, uint32_t port) { diff --git a/test/integration/integration.h b/test/integration/integration.h index 3a0719063a6f4..fd4d7def9c8ef 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -14,6 +14,7 @@ #include "test/integration/fake_upstream.h" #include "test/integration/server.h" #include "test/integration/utility.h" +#include "test/mocks/buffer/mocks.h" #include "test/test_common/environment.h" #include "test/test_common/printers.h" @@ -87,6 +88,8 @@ class IntegrationCodecClient : public Http::CodecClientProd { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} IntegrationCodecClient& parent_; }; @@ -116,7 +119,7 @@ typedef std::unique_ptr IntegrationCodecClientPtr; */ class IntegrationTcpClient { public: - IntegrationTcpClient(Event::Dispatcher& dispatcher, uint32_t port, + IntegrationTcpClient(Event::Dispatcher& dispatcher, MockBufferFactory& factory, uint32_t port, Network::Address::IpVersion version); void close(); @@ -131,6 +134,8 @@ class IntegrationTcpClient { // Network::ConnectionCallbacks void onEvent(uint32_t events) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} IntegrationTcpClient& parent_; }; @@ -139,6 +144,7 @@ class IntegrationTcpClient { std::shared_ptr callbacks_; Network::ClientConnectionPtr connection_; bool disconnected_{}; + MockBuffer* client_write_buffer_; }; typedef std::unique_ptr IntegrationTcpClientPtr; @@ -175,6 +181,7 @@ class BaseIntegrationTest : Logger::Loggable { void createTestServer(const std::string& json_path, const std::vector& port_names); Api::ApiPtr api_; + MockBufferFactory* mock_buffer_factory_; // Will point to the dispatcher's factory. Event::DispatcherPtr dispatcher_; protected: diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc index c35a1c478e750..25f116efdb643 100644 --- a/test/integration/tcp_proxy_integration_test.cc +++ b/test/integration/tcp_proxy_integration_test.cc @@ -9,6 +9,10 @@ #include "gtest/gtest.h" +using testing::AnyNumber; +using testing::Invoke; +using testing::_; + namespace Envoy { namespace { @@ -55,18 +59,72 @@ TEST_P(TcpProxyIntegrationTest, TcpProxyDownstreamDisconnect) { }); } +TEST_P(TcpProxyIntegrationTest, TcpProxyLargeWrite) { + IntegrationTcpClientPtr tcp_client; + FakeRawConnectionPtr fake_upstream_connection; + FakeRawConnectionPtr fake_rest_connection; + std::string data(1024 * 16, 'a'); + executeActions({ + [&]() -> void { tcp_client = makeTcpConnection(lookupPort("tcp_proxy_with_write_limits")); }, + [&]() -> void { tcp_client->write(data); }, + [&]() -> void { fake_upstream_connection = fake_upstreams_[1]->waitForRawConnection(); }, + [&]() -> void { fake_upstream_connection->waitForData(data.size()); }, + [&]() -> void { fake_upstream_connection->write(data); }, + [&]() -> void { tcp_client->waitForData(data); }, + [&]() -> void { tcp_client->close(); }, + [&]() -> void { fake_upstream_connection->waitForDisconnect(); }, + }); + + uint32_t upstream_pauses = + test_server_->store() + .counter("cluster.cluster_with_buffer_limits.upstream_flow_control_paused_reading_total") + .value(); + uint32_t upstream_resumes = + test_server_->store() + .counter("cluster.cluster_with_buffer_limits.upstream_flow_control_resumed_reading_total") + .value(); + EXPECT_EQ(upstream_pauses, upstream_resumes); + + uint32_t downstream_pauses = + test_server_->store() + .counter("tcp.tcp_with_write_limits.downstream_flow_control_paused_reading_total") + .value(); + uint32_t downstream_resumes = + test_server_->store() + .counter("tcp.tcp_with_write_limits.downstream_flow_control_resumed_reading_total") + .value(); + EXPECT_EQ(downstream_pauses, downstream_resumes); +} + // Test proxying data in both directions with envoy doing TCP and TLS // termination. -TEST_P(TcpProxyIntegrationTest, SendTlsToTlsListener) { +void TcpProxyIntegrationTest::sendAndReceiveTlsData(const std::string& data_to_send_upstream, + const std::string& data_to_send_downstream) { Network::ClientConnectionPtr ssl_client; FakeRawConnectionPtr fake_upstream_connection; testing::NiceMock runtime; std::unique_ptr context_manager(new Ssl::ContextManagerImpl(runtime)); Ssl::ClientContextPtr context; ConnectionStatusCallbacks connect_callbacks; + MockBuffer* client_write_buffer; executeActions({ - // Set up the SSl client. + // Set up the mock buffer factory so the newly created SSL client will have a mock write + // buffer. This allows us to track the bytes actually written to the socket. [&]() -> void { + EXPECT_CALL(*mock_buffer_factory_, create_()) + .Times(2) + .WillOnce(Invoke([&]() -> Buffer::Instance* { + return new Buffer::OwnedImpl; // client read buffer. + })) + .WillOnce(Invoke([&]() -> Buffer::Instance* { + client_write_buffer = new MockBuffer; + ON_CALL(*client_write_buffer, move(_)) + .WillByDefault(Invoke(client_write_buffer, &MockBuffer::baseMove)); + ON_CALL(*client_write_buffer, drain(_)) + .WillByDefault(Invoke(client_write_buffer, &MockBuffer::trackDrains)); + return client_write_buffer; + })); + // Set up the SSl client. Network::Address::InstanceConstSharedPtr address = Ssl::getSslAddress(version_, lookupPort("tcp_proxy_with_tls_termination")); context = Ssl::createClientSslContext(false, false, *context_manager); @@ -75,7 +133,6 @@ TEST_P(TcpProxyIntegrationTest, SendTlsToTlsListener) { // Perform the SSL handshake. Loopback is whitelisted in tcp_proxy.json for the ssl_auth // filter so there will be no pause waiting on auth data. [&]() -> void { - ssl_client->connect(); ssl_client->addConnectionCallbacks(connect_callbacks); ssl_client->connect(); while (!connect_callbacks.connected()) { @@ -84,20 +141,22 @@ TEST_P(TcpProxyIntegrationTest, SendTlsToTlsListener) { }, // Ship some data upstream. [&]() -> void { - Buffer::OwnedImpl buffer("hello"); + Buffer::OwnedImpl buffer(data_to_send_upstream); ssl_client->write(buffer); - dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + while (client_write_buffer->bytes_drained() != data_to_send_upstream.size()) { + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } }, // Make sure the data makes it upstream. - [&]() -> void { fake_upstream_connection = fake_upstreams_[0]->waitForRawConnection(); }, - [&]() -> void { fake_upstream_connection->waitForData(5); }, + [&]() -> void { fake_upstream_connection = fake_upstreams_[1]->waitForRawConnection(); }, + [&]() -> void { fake_upstream_connection->waitForData(data_to_send_upstream.size()); }, // Now send data downstream and make sure it arrives. [&]() -> void { std::shared_ptr payload_reader( new WaitForPayloadReader(*dispatcher_)); ssl_client->addReadFilter(payload_reader); - fake_upstream_connection->write("world"); - payload_reader->set_data_to_wait_for("world"); + fake_upstream_connection->write(data_to_send_downstream); + payload_reader->set_data_to_wait_for(data_to_send_downstream); ssl_client->dispatcher().run(Event::Dispatcher::RunType::Block); }, // Clean up. @@ -107,5 +166,12 @@ TEST_P(TcpProxyIntegrationTest, SendTlsToTlsListener) { }); } +TEST_P(TcpProxyIntegrationTest, SendTlsToTlsListener) { sendAndReceiveTlsData("hello", "world"); } + +TEST_P(TcpProxyIntegrationTest, LargeBidirectionalTlsWrites) { + std::string large_data(1024 * 8, 'a'); + sendAndReceiveTlsData(large_data, large_data); +} + } // namespace } // namespace Envoy diff --git a/test/integration/tcp_proxy_integration_test.h b/test/integration/tcp_proxy_integration_test.h index fc0b6b7d0e678..a53615e741d2d 100644 --- a/test/integration/tcp_proxy_integration_test.h +++ b/test/integration/tcp_proxy_integration_test.h @@ -8,6 +8,7 @@ #include "gtest/gtest.h" namespace Envoy { +namespace { class TcpProxyIntegrationTest : public BaseIntegrationTest, public testing::TestWithParam { public: @@ -16,13 +17,21 @@ class TcpProxyIntegrationTest : public BaseIntegrationTest, void SetUp() override { fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); registerPort("upstream_0", fake_upstreams_.back()->localAddress()->ip()->port()); - createTestServer("test/config/integration/tcp_proxy.json", - {"tcp_proxy", "tcp_proxy_with_tls_termination"}); + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); + registerPort("cluster_with_buffer_limits", + fake_upstreams_.back()->localAddress()->ip()->port()); + createTestServer( + "test/config/integration/tcp_proxy.json", + {"tcp_proxy", "tcp_proxy_with_write_limits", "tcp_proxy_with_tls_termination"}); } void TearDown() override { test_server_.reset(); fake_upstreams_.clear(); } + + void sendAndReceiveTlsData(const std::string& data_to_send_upstream, + const std::string& data_to_send_downstream); }; +} // namespace } // namespace Envoy diff --git a/test/integration/utility.h b/test/integration/utility.h index 2b76e11a102d1..d379ded8f73bf 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -110,11 +110,13 @@ class ConnectionStatusCallbacks : public Network::ConnectionCallbacks { bool closed() const { return closed_; } // Network::ConnectionCallbacks - void onEvent(uint32_t events) { + void onEvent(uint32_t events) override { closed_ |= (events & Network::ConnectionEvent::RemoteClose || events & Network::ConnectionEvent::LocalClose); connected_ |= events & Network::ConnectionEvent::Connected; } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: bool connected_{false}; diff --git a/test/mocks/buffer/mocks.h b/test/mocks/buffer/mocks.h index ec3aba20b43af..6b303b7824592 100644 --- a/test/mocks/buffer/mocks.h +++ b/test/mocks/buffer/mocks.h @@ -33,10 +33,23 @@ class MockBuffer : public Buffer::OwnedImpl { return bytes_written; } + void trackDrains(uint64_t size) { + bytes_drained_ += size; + Buffer::OwnedImpl::drain(size); + } + + // A convenience function to invoke on write() which fails the write with EAGAIN. + int failWrite(int) { + errno = EAGAIN; + return -1; + } + int bytes_written() const { return bytes_written_; } + uint64_t bytes_drained() const { return bytes_drained_; } private: int bytes_written_{0}; + uint64_t bytes_drained_{0}; }; class MockBufferFactory : public Buffer::Factory { diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 9fe0e119f4077..4dbfce8a17cdc 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -23,6 +23,8 @@ class MockConnectionCallbacks : public ConnectionCallbacks { // Network::ConnectionCallbacks MOCK_METHOD1(onEvent, void(uint32_t events)); + MOCK_METHOD0(onAboveWriteBufferHighWatermark, void()); + MOCK_METHOD0(onBelowWriteBufferLowWatermark, void()); }; class MockConnectionBase { @@ -63,8 +65,8 @@ class MockConnection : public Connection, public MockConnectionBase { MOCK_METHOD0(ssl, Ssl::Connection*()); MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); - MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit)); - MOCK_CONST_METHOD0(readBufferLimit, uint32_t()); + MOCK_METHOD1(setBufferLimits, void(uint32_t limit)); + MOCK_CONST_METHOD0(bufferLimit, uint32_t()); }; /** @@ -95,8 +97,8 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase MOCK_METHOD0(ssl, Ssl::Connection*()); MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); - MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit)); - MOCK_CONST_METHOD0(readBufferLimit, uint32_t()); + MOCK_METHOD1(setBufferLimits, void(uint32_t limit)); + MOCK_CONST_METHOD0(bufferLimit, uint32_t()); // Network::ClientConnection MOCK_METHOD0(connect, void()); diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index f9a5f15cfe8c9..bbe5b667b64aa 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -22,8 +23,21 @@ #include "gtest/gtest.h" #include "spdlog/spdlog.h" +using testing::GTEST_FLAG(random_seed); + namespace Envoy { +static const int32_t SEED = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + +TestRandomGenerator::TestRandomGenerator() + : seed_(GTEST_FLAG(random_seed) == 0 ? SEED : GTEST_FLAG(random_seed)), generator_(seed_) { + std::cerr << "TestRandomGenerator running with seed " << seed_; +} + +uint64_t TestRandomGenerator::random() { return generator_(); } + bool TestUtility::buffersEqual(const Buffer::Instance& lhs, const Buffer::Instance& rhs) { if (lhs.length() != rhs.length()) { return false; diff --git a/test/test_common/utility.h b/test/test_common/utility.h index 383c17d5eb827..0954865825cd2 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -1,8 +1,11 @@ #pragma once +#include + #include #include #include +#include #include #include @@ -26,6 +29,19 @@ namespace Envoy { EXPECT_EQ(message, std::string(e.what())); \ } +// Random number generator which logs its seed to stderr. To repeat a test run with a non-zero seed +// one can run the test with --test_arg=--gtest_filter=[seed] +class TestRandomGenerator { +public: + TestRandomGenerator(); + + uint64_t random(); + +private: + const int32_t seed_; + std::ranlux48 generator_; +}; + class TestUtility { public: /**