Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cbe959f
TCP + TLS flow control
alyssawilk Jul 6, 2017
f2ffecb
reinstating a check
alyssawilk Jul 6, 2017
0a9116d
Adding SSL unit tests
alyssawilk Jul 6, 2017
24b56df
seeing if pointers are ever OK
alyssawilk Jul 6, 2017
2cb1ebf
fixing test mocks
alyssawilk Jul 6, 2017
f07608b
correcting docs
alyssawilk Jul 6, 2017
36ecdc3
fix_format
alyssawilk Jul 6, 2017
ccea6d4
Merge branch 'master' into tcp-flow-control
alyssawilk Jul 10, 2017
b846116
Addressing some review comments
alyssawilk Jul 10, 2017
8fd62ca
Addressing the rest of the comments not handled by #1234
alyssawilk Jul 10, 2017
1036f4b
Merge branch 'refs/heads/master' into tcp-flow-control
alyssawilk Jul 10, 2017
ea9aebb
Fixing the new proxy test given #1232
alyssawilk Jul 10, 2017
e1a91fa
setting all buffer limits with one call
alyssawilk Jul 11, 2017
9bbc0f4
Merge branch 'refs/heads/master' into tcp-flow-control (plus a bunch …
alyssawilk Jul 12, 2017
f5ce5f4
adding stats
alyssawilk Jul 12, 2017
73f80d3
now with proper stats, and tests!
alyssawilk Jul 12, 2017
62098a4
Merge branch 'refs/heads/master' into tcp-flow-control
alyssawilk Jul 12, 2017
e15f458
Fixing a test flake - on tsan we don't always get write backups
alyssawilk Jul 12, 2017
5b514b8
Merge branch 'master' into tcp-flow-control
alyssawilk Jul 13, 2017
f9603f7
Guarding against EAGAIN in ssl writes as we now do with TCP writes, u…
alyssawilk Jul 13, 2017
c0bbd2d
and removing the right spurious connect
alyssawilk Jul 13, 2017
8f18115
Removing debug log
alyssawilk Jul 13, 2017
c604950
clarifying comment
alyssawilk Jul 13, 2017
31b46da
I can't believe it's not butter!
alyssawilk Jul 13, 2017
872e1b3
Moving watermark checks to a utility class wrapping the buffer
alyssawilk Jul 14, 2017
f373704
Merge branch 'refs/heads/master' into tcp-flow-control
alyssawilk Jul 17, 2017
71792b3
now with better coverage
alyssawilk Jul 17, 2017
c6e50dc
Making move work between watermark buffers and ownedimpl
alyssawilk Jul 18, 2017
c925fee
Adding more comments for watermark buffers, misc review comments
alyssawilk Jul 18, 2017
3bf7617
one more shot!
alyssawilk Jul 18, 2017
b8216e9
more clarity on watermark
alyssawilk Jul 18, 2017
d55f9eb
Test RNG
alyssawilk Jul 18, 2017
b79fc5e
possibly abusing the random seed flag
alyssawilk Jul 18, 2017
531b61e
latching rng seed
alyssawilk Jul 18, 2017
69a8e33
Merge branch 'refs/heads/master' into tcp-flow-control
alyssawilk Jul 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Every cluster has a statistics tree rooted at *cluster.<name>.* with the followi
upstream_rq_retry, Counter, Total request retries
upstream_rq_retry_success, Counter, Total request retry successes
upstream_rq_retry_overflow, Counter, Total requests not retried due to circuit breaking
upstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from upstream.
upstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from upstream.
membership_change, Counter, Total cluster membership changes
membership_healthy, Gauge, Current cluster healthy total (inclusive of both health checking and outlier detection)
membership_total, Gauge, Current cluster membership total
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/network_filters/tcp_proxy_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,6 @@ statistics are rooted at *tcp.<stat_prefix>.* with the following statistics:
downstream_cx_no_route, Counter, Number of connections for which no matching route was found.
downstream_cx_tx_bytes_total, Counter, Total bytes written to the downstream connection.
downstream_cx_tx_bytes_buffered, Gauge, Total bytes currently buffered to the downstream connection.
downstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from downstream.
downstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from downstream.

25 changes: 21 additions & 4 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ class ConnectionCallbacks {
* @param events supplies the ConnectionEvent events that occurred as a bitmask.
*/
virtual void onEvent(uint32_t events) PURE;

/**
* Called when the write buffer for a connection goes over its high watermark.
*/
virtual void onAboveWriteBufferHighWatermark() PURE;

/**
* Called when the write buffer for a connection goes from over its high
* watermark to under its low watermark.
*/
virtual void onBelowWriteBufferLowWatermark() PURE;
};

/**
Expand Down Expand Up @@ -153,15 +164,21 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
virtual void write(Buffer::Instance& data) PURE;

/**
* Set a soft limit on the size of the read buffer prior to flushing to further stages in the
* Set a soft limit on the size of buffers for the connection.
* For the read buffer, this limits the bytes read prior to flushing to further stages in the
* processing pipeline.
* For the write buffer, it sets watermarks. When enough data is buffered it triggers a call to
* onAboveWriteBufferHighWatermark, which allows subscribers to enforce flow control by disabling
* reads on the socket funneling data to the write buffer. When enough data is drained from the
* write buffer, onBelowWriteBufferHighWatermark is called which similarly allows subscribers
* resuming reading.
*/
virtual void setReadBufferLimit(uint32_t limit) PURE;
virtual void setBufferLimits(uint32_t limit) PURE;

/**
* Get the value set with setReadBufferLimit.
* Get the value set with setBufferLimits.
*/
virtual uint32_t readBufferLimit() const PURE;
virtual uint32_t bufferLimit() const PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class HostSet {
COUNTER(upstream_rq_retry) \
COUNTER(upstream_rq_retry_success) \
COUNTER(upstream_rq_retry_overflow) \
COUNTER(upstream_flow_control_paused_reading_total) \
COUNTER(upstream_flow_control_resumed_reading_total) \
GAUGE (max_host_weight) \
COUNTER(membership_change) \
GAUGE (membership_healthy) \
Expand Down
10 changes: 10 additions & 0 deletions source/common/buffer/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ load(

envoy_package()

envoy_cc_library(
name = "watermark_buffer_lib",
srcs = ["watermark_buffer.cc"],
hdrs = ["watermark_buffer.h"],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
],
)

envoy_cc_library(
name = "buffer_lib",
srcs = ["buffer_impl.cc"],
Expand Down
6 changes: 3 additions & 3 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ void OwnedImpl::move(Instance& rhs) {
// now and this is safe. Using the evbuffer move routines require having access to both evbuffers.
// This is a reasonable compromise in a high performance path where we want to maintain an
// abstraction in case we get rid of evbuffer later.
int rc = evbuffer_add_buffer(buffer_.get(), static_cast<OwnedImpl&>(rhs).buffer_.get());
int rc = evbuffer_add_buffer(buffer_.get(), static_cast<LibEventInstance&>(rhs).buffer().get());
ASSERT(rc == 0);
UNREFERENCED_PARAMETER(rc);
}

void OwnedImpl::move(Instance& rhs, uint64_t length) {
// See move() above for why we do the static cast.
int rc =
evbuffer_remove_buffer(static_cast<OwnedImpl&>(rhs).buffer_.get(), buffer_.get(), length);
int rc = evbuffer_remove_buffer(static_cast<LibEventInstance&>(rhs).buffer().get(), buffer_.get(),
length);
ASSERT(static_cast<uint64_t>(rc) == length);
UNREFERENCED_PARAMETER(rc);
}
Expand Down
14 changes: 12 additions & 2 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@ class OwnedImplFactory : public Factory {
InstancePtr create() override;
};

class LibEventInstance : public Instance {
public:
virtual Event::Libevent::BufferPtr& buffer() PURE;
};

/**
* Wraps an allocated and owned evbuffer.
*
* Note that due to the internals of move() accessing buffer(), OwnedImpl is not
* compatible with non-LibEventInstance buffers.
*/
class OwnedImpl : public Instance {
class OwnedImpl : public LibEventInstance {
public:
OwnedImpl();
OwnedImpl(const std::string& data);
OwnedImpl(const Instance& data);
OwnedImpl(const void* data, uint64_t size);

// Instance
// LibEventInstance
void add(const void* data, uint64_t size) override;
void add(const std::string& data) override;
void add(const Instance& data) override;
Expand All @@ -42,6 +50,8 @@ class OwnedImpl : public Instance {
ssize_t search(const void* data, uint64_t size, size_t start) const override;
int write(int fd) override;

Event::Libevent::BufferPtr& buffer() override { return buffer_; }

private:
Event::Libevent::BufferPtr buffer_;
};
Expand Down
89 changes: 89 additions & 0 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include "common/buffer/watermark_buffer.h"

#include "common/common/assert.h"

namespace Envoy {
namespace Buffer {

void WatermarkBuffer::add(const void* data, uint64_t size) {
wrapped_buffer_->add(data, size);
checkHighWatermark();
}

void WatermarkBuffer::add(const std::string& data) {
wrapped_buffer_->add(data);
checkHighWatermark();
}

void WatermarkBuffer::add(const Instance& data) {
wrapped_buffer_->add(data);
checkHighWatermark();
}

void WatermarkBuffer::commit(RawSlice* iovecs, uint64_t num_iovecs) {
wrapped_buffer_->commit(iovecs, num_iovecs);
checkHighWatermark();
}

void WatermarkBuffer::drain(uint64_t size) {
wrapped_buffer_->drain(size);
checkLowWatermark();
}

void WatermarkBuffer::move(Instance& rhs) {
wrapped_buffer_->move(rhs);
checkHighWatermark();
}

void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
wrapped_buffer_->move(rhs, length);
checkHighWatermark();
}

int WatermarkBuffer::read(int fd, uint64_t max_length) {
int bytes_read = wrapped_buffer_->read(fd, max_length);
checkHighWatermark();
return bytes_read;
}

uint64_t WatermarkBuffer::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) {
uint64_t bytes_reserved = wrapped_buffer_->reserve(length, iovecs, num_iovecs);
checkHighWatermark();
return bytes_reserved;
}

int WatermarkBuffer::write(int fd) {
int bytes_written = wrapped_buffer_->write(fd);
checkLowWatermark();
return bytes_written;
}

void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
ASSERT(low_watermark < high_watermark);
low_watermark_ = low_watermark;
high_watermark_ = high_watermark;
checkHighWatermark();
checkLowWatermark();
}

void WatermarkBuffer::checkLowWatermark() {
if (!above_high_watermark_called_ || wrapped_buffer_->length() >= low_watermark_) {
return;
}

above_high_watermark_called_ = false;
below_low_watermark_();
}

void WatermarkBuffer::checkHighWatermark() {
if (above_high_watermark_called_ || high_watermark_ == 0 ||
wrapped_buffer_->length() <= high_watermark_) {
return;
}

above_high_watermark_called_ = true;
above_high_watermark_();
}

} // namespace Buffer
} // namespace Envoy
75 changes: 75 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

#include <string>

#include "common/buffer/buffer_impl.h"

namespace Envoy {
namespace Buffer {

// A wrapper for an underlying buffer which does watermark validation.
// The underlying buffer's ownership is transfered to the Watermark buffer. Each time the inner
// buffer is resized (written to or drained), the watermarks are checked. As the buffer size
// transitions from under the low watermark to above the high watermark, the above_high_watermark
// function is called one time. It will not be called again until the buffer is drained below the
// low watermark, at which point the below_low_watermark function is called.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matt's earlier comment re:utilities was spot on - once I started on h2 flow control I found myself copy-pasting watermark checks so pulled this out. I believe it will be reusable for at least some of the codec buffers. Either way it guards us from screwing up and missing buffer size changes changes (I missed one in ssl/connection_impl.cc before my own tests caught me) because all edits are done through the wrapper.

@htuch let me know what you think before I go too crazy with the unit tests :-)

//
// Because the internals of OwnedImpl::move() require accessing the underlying data, OwnedImpl is
// not compatible with generic Buffer::Impls. To allow compatability between WatermarkBuffer and
// OwnedImpl::move, WatermarkBuffer must implement LibEventInstance and is also not compatible
// with generic Buffer::Impls.
//
// WatermarkBuffer takes a pointer to a generic InstancePtr in the constructor to allow test mocks
// which overrides move() in any case.
class WatermarkBuffer : public LibEventInstance {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a wrapper around a buffer, does it have to derive from LibEventInstance ? Are the changes to introduce LibEventInstance necessary? (If they are the below static_cast should probably be a dynamic_cast, but it's not clear to me that it's necessary).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you just answered my comment with your other comment. If so, maybe add more code comments?

public:
WatermarkBuffer(InstancePtr&& buffer, std::function<void()> below_low_watermark,
std::function<void()> above_high_watermark)
: wrapped_buffer_(std::move(buffer)), below_low_watermark_(below_low_watermark),
above_high_watermark_(above_high_watermark) {}

// Instance
void add(const void* data, uint64_t size) override;
void add(const std::string& data) override;
void add(const Instance& data) override;
void commit(RawSlice* iovecs, uint64_t num_iovecs) override;
void drain(uint64_t size) override;
uint64_t getRawSlices(RawSlice* out, uint64_t out_size) const override {
return wrapped_buffer_->getRawSlices(out, out_size);
}
uint64_t length() const override { return wrapped_buffer_->length(); }
void* linearize(uint32_t size) override { return wrapped_buffer_->linearize(size); }
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
int read(int fd, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
ssize_t search(const void* data, uint64_t size, size_t start) const override {
return wrapped_buffer_->search(data, size, start);
}
int write(int fd) override;
Event::Libevent::BufferPtr& buffer() override {
return static_cast<LibEventInstance&>(*wrapped_buffer_).buffer();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@htuch as discussed this solves the problem where move didn't work in both directions (now tested) because OwnedImpl assumed it was moving another OwnedImpl. The down-side is that WatermarkBuffer now only can do move()s if it wraps an OwnedImpl. That said the Envoy code base already assumes all buffers are OwnedImpls so I don't think we're losing any flexibility, it's just ugly from a purist perspective.

}

void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);

private:
void checkHighWatermark();
void checkLowWatermark();

InstancePtr wrapped_buffer_;
std::function<void()> below_low_watermark_;
std::function<void()> above_high_watermark_;

// Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to
// setWatermarks() the watermark callbacks will be called as described above.
uint32_t high_watermark_{0};
uint32_t low_watermark_{0};
// Tracks the latest state of watermark callbacks.
// True between the time above_high_watermark_ has been called until above_high_watermark_ has
// been called.
bool above_high_watermark_called_{false};
};

} // namespace Buffer
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener.h"

#include "common/buffer/buffer_impl.h"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For OwnedImplFactory.

#include "common/event/file_event_impl.h"
#include "common/event/signal_impl.h"
#include "common/event/timer_impl.h"
Expand Down
2 changes: 2 additions & 0 deletions source/common/filter/auth/client_ssl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class Instance : public Network::ReadFilter, public Network::ConnectionCallbacks

// Network::ConnectionCallbacks
void onEvent(uint32_t events) override;
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

private:
ConfigSharedPtr config_;
Expand Down
2 changes: 2 additions & 0 deletions source/common/filter/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Instance : public Network::ReadFilter,

// Network::ConnectionCallbacks
void onEvent(uint32_t events) override;
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

// RateLimit::RequestCallbacks
void complete(LimitStatus status) override;
Expand Down
52 changes: 52 additions & 0 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,58 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb
config_->stats().downstream_cx_tx_bytes_buffered_});
}

void TcpProxy::readDisableUpstream(bool disable) {
upstream_connection_->readDisable(disable);
if (disable) {
read_callbacks_->upstreamHost()
->cluster()
.stats()
.upstream_flow_control_paused_reading_total_.inc();
} else {
read_callbacks_->upstreamHost()
->cluster()
.stats()
.upstream_flow_control_resumed_reading_total_.inc();
}
}

void TcpProxy::readDisableDownstream(bool disable) {
read_callbacks_->connection().readDisable(disable);
if (disable) {
config_->stats().downstream_flow_control_paused_reading_total_.inc();
} else {
config_->stats().downstream_flow_control_resumed_reading_total_.inc();
}
}

void TcpProxy::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it here to add ASSERTS in all of these functions that basically assert that we aren't getting multiple watermark callbacks? Or can that happen? I'm thinking something along the lines of ASSERT(!parent_.upstreamReadDisabled()). (Similar in other places).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connections can have the read disabled by other means so if we check the actual state of the connection we can only verify that we enable actually disabled sockets (not that we disable enabled sockets) We could track local state by adding 2 booleans to the tcp proxy filter which are only used for debug, which I'd be happy to do - flow control is tricky enough to get right I'm happy to have extra asserts too!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in favor of adding the booleans (at least for now, with a TODO to clean them up once we have production experience) since this code is super tricky to get right and more checking is better IMO. I will leave it up to you to decide either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. Flow control changes are inherently dangerous as you point out.
Added asserts and the the tests pass 5k runs without flakes once I got the waitForDisconnect fix merged in.

ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
// If downstream has too much data buffered, stop reading on the upstream connection.
parent_.readDisableUpstream(true);
}

void TcpProxy::DownstreamCallbacks::onBelowWriteBufferLowWatermark() {
ASSERT(on_high_watermark_called_);
on_high_watermark_called_ = false;
// The downstream buffer has been drained. Resume reading from upstream.
parent_.readDisableUpstream(false);
}

void TcpProxy::UpstreamCallbacks::onAboveWriteBufferHighWatermark() {
ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
// There's too much data buffered in the upstream write buffer, so stop reading.
parent_.readDisableDownstream(true);
}

void TcpProxy::UpstreamCallbacks::onBelowWriteBufferLowWatermark() {
ASSERT(on_high_watermark_called_);
on_high_watermark_called_ = false;
// The upstream write buffer is drained. Resume reading.
parent_.readDisableDownstream(false);
}

Network::FilterStatus TcpProxy::initializeUpstreamConnection() {
const std::string& cluster_name = config_->getRouteFromEntries(read_callbacks_->connection());

Expand Down
Loading