Skip to content
Merged
34 changes: 33 additions & 1 deletion include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,42 @@ class Instance {
template <typename T, size_t Size = sizeof(T)> void writeBEInt(T value) {
writeInt<ByteOrder::BigEndian, T, Size>(value);
}

/**
* Set the buffer's high watermark. The buffer's low watermark is implicitly set to half the high
* watermark. Setting the high watermark to 0 disables watermark functionality.
* @param watermark supplies the buffer high watermark size threshold, in bytes.
*/
virtual void setWatermarks(uint32_t watermark) PURE;
Comment thread
alyssawilk marked this conversation as resolved.
/**
* Returns the configured high watermark.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional, call out the 0 for disabled here as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
virtual uint32_t highWatermark() const PURE;
/**
* Determine if the buffer watermark trigger condition is currently set. The watermark trigger is
* set when the buffer size exceeds the configured high watermark and is cleared once the buffer
* size drops to the low watermark.
* @return true if the buffer size once exceeded the high watermark and hasn't since dropped to
* the low watermark.
*/
virtual bool highWatermarkTriggered() const PURE;
};

using InstancePtr = std::unique_ptr<Instance>;

// Informational enum that hints at the buffer's intended use.
enum class BufferType {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what the intended use of this is?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. If you're curious, here is the first followup change that depends on this. The end goal is to use this in e2e tests for the fix to #11370

master...antoniovicente:e2e_h2_high_buffering__pre_shared_ptr_factory

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so the intended use is to avoid a bunch of nasty test changes and potentially nasty test maintenance issues by switching more buffers to mocks?

I'd like to avoid this if we can.
I wonder if we can reduce the use of the mock buffer factory, and then back this part out. It looks like it's only really "used" in the tcp proxy integration test so I wonder if we could use a normal buffer factory in the base integration test, fix up the tcp proxy test with some comments (it's not likely to change as frequently as the others) and then drop buffertype from this PR. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not a fan of mock buffers, so no - I am not planning to increase mock buffer use. I removed the BufferType argument and made the necessary changes to fix broken tests, see 5b5a99a

The change to the default behavior of the mock watermark factory created by the mock dispatcher fixes a bunch of small tests that use MockConnection

// Per-connection input buffer used to read directly from an IoHandle.
Read,
Comment thread
antoniovicente marked this conversation as resolved.
Outdated
// Per-connection output buffer used to write directly to an IoHandle.
Output,
// Per-stream HTTP output buffer. In the case of HTTP2 and HTTP3/QUIC, this buffer participates in
// per-stream flow control.
HttpStreamOutput,
// Internal buffers used for internally by filters or other components of the IO pipeline.
Internal,
};

/**
* A factory for creating buffers which call callbacks when reaching high and low watermarks.
*/
Expand All @@ -412,7 +444,7 @@ class WatermarkFactory {
* high watermark.
* @return a newly created InstancePtr.
Comment thread
antoniovicente marked this conversation as resolved.
*/
virtual InstancePtr create(std::function<void()> below_low_watermark,
virtual InstancePtr create(BufferType buffer_type, std::function<void()> below_low_watermark,
std::function<void()> above_high_watermark,
std::function<void()> above_overflow_watermark) PURE;
};
Expand Down
5 changes: 5 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ class OwnedImpl : public LibEventInstance {
*/
virtual void appendSliceForTest(absl::string_view data);

// Does not implement watermarking.
void setWatermarks(uint32_t) override { ASSERT(false, "watermarks not implemented."); }
Comment thread
alyssawilk marked this conversation as resolved.
uint32_t highWatermark() const override { return 0; }
bool highWatermarkTriggered() const override { return false; }

/**
* Describe the in-memory representation of the slices in the buffer. For use
* in tests that want to make assertions about the specific arrangement of
Expand Down
8 changes: 4 additions & 4 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ class WatermarkBuffer : public OwnedImpl {
void appendSliceForTest(const void* data, uint64_t size) override;
void appendSliceForTest(absl::string_view data) override;

void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t highWatermark() const { return high_watermark_; }
uint32_t highWatermark() const override { return high_watermark_; }
// Returns true if the high watermark callbacks have been called more recently
// than the low watermark callbacks.
bool highWatermarkTriggered() const { return above_high_watermark_called_; }
bool highWatermarkTriggered() const override { return above_high_watermark_called_; }

private:
void checkHighAndOverflowWatermarks();
Expand Down Expand Up @@ -73,7 +73,7 @@ using WatermarkBufferPtr = std::unique_ptr<WatermarkBuffer>;
class WatermarkBufferFactory : public WatermarkFactory {
public:
// Buffer::WatermarkFactory
InstancePtr create(std::function<void()> below_low_watermark,
InstancePtr create(BufferType /*buffer_type*/, std::function<void()> below_low_watermark,
Comment thread
antoniovicente marked this conversation as resolved.
Outdated
std::function<void()> above_high_watermark,
std::function<void()> above_overflow_watermark) override {
return std::make_unique<WatermarkBuffer>(below_low_watermark, above_high_watermark,
Expand Down
18 changes: 9 additions & 9 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,16 @@ bool ActiveStreamDecoderFilter::canContinue() {
return !parent_.state_.local_complete_;
}

Buffer::WatermarkBufferPtr ActiveStreamDecoderFilter::createBuffer() {
auto buffer = std::make_unique<Buffer::WatermarkBuffer>(
[this]() -> void { this->requestDataDrained(); },
Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() {
auto buffer = dispatcher().getWatermarkFactory().create(
Buffer::BufferType::Internal, [this]() -> void { this->requestDataDrained(); },
[this]() -> void { this->requestDataTooLarge(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
buffer->setWatermarks(parent_.buffer_limit_);
return buffer;
}

Buffer::WatermarkBufferPtr& ActiveStreamDecoderFilter::bufferedData() {
Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
return parent_.buffered_request_data_;
}

Expand Down Expand Up @@ -1302,15 +1302,15 @@ absl::optional<Router::ConfigConstSharedPtr> ActiveStreamDecoderFilter::routeCon
return parent_.filter_manager_callbacks_.routeConfig();
}

Buffer::WatermarkBufferPtr ActiveStreamEncoderFilter::createBuffer() {
auto buffer = new Buffer::WatermarkBuffer(
[this]() -> void { this->responseDataDrained(); },
Buffer::InstancePtr ActiveStreamEncoderFilter::createBuffer() {
auto buffer = dispatcher().getWatermarkFactory().create(
Buffer::BufferType::Internal, [this]() -> void { this->responseDataDrained(); },
[this]() -> void { this->responseDataTooLarge(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
buffer->setWatermarks(parent_.buffer_limit_);
return Buffer::WatermarkBufferPtr{buffer};
return buffer;
}
Buffer::WatermarkBufferPtr& ActiveStreamEncoderFilter::bufferedData() {
Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() {
return parent_.buffered_response_data_;
}
bool ActiveStreamEncoderFilter::complete() { return parent_.state_.local_complete_; }
Expand Down
16 changes: 8 additions & 8 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,

void commonContinue();
virtual bool canContinue() PURE;
virtual Buffer::WatermarkBufferPtr createBuffer() PURE;
virtual Buffer::WatermarkBufferPtr& bufferedData() PURE;
virtual Buffer::InstancePtr createBuffer() PURE;
virtual Buffer::InstancePtr& bufferedData() PURE;
virtual bool complete() PURE;
virtual bool has100Continueheaders() PURE;
virtual void do100ContinueHeaders() PURE;
Expand Down Expand Up @@ -138,8 +138,8 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,

// ActiveStreamFilterBase
bool canContinue() override;
Buffer::WatermarkBufferPtr createBuffer() override;
Buffer::WatermarkBufferPtr& bufferedData() override;
Buffer::InstancePtr createBuffer() override;
Buffer::InstancePtr& bufferedData() override;
bool complete() override;
bool has100Continueheaders() override { return false; }
void do100ContinueHeaders() override { NOT_REACHED_GCOVR_EXCL_LINE; }
Expand Down Expand Up @@ -224,8 +224,8 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,

// ActiveStreamFilterBase
bool canContinue() override { return true; }
Buffer::WatermarkBufferPtr createBuffer() override;
Buffer::WatermarkBufferPtr& bufferedData() override;
Buffer::InstancePtr createBuffer() override;
Buffer::InstancePtr& bufferedData() override;
bool complete() override;
bool has100Continueheaders() override;
void do100ContinueHeaders() override;
Expand Down Expand Up @@ -776,8 +776,8 @@ class FilterManager : public ScopeTrackedObject,
// processing the next filter. The storage is created on demand. We need to store metadata
// temporarily in the filter in case the filter has stopped all while processing headers.
std::unique_ptr<MetadataMapVector> request_metadata_map_vector_;
Buffer::WatermarkBufferPtr buffered_response_data_;
Buffer::WatermarkBufferPtr buffered_request_data_;
Buffer::InstancePtr buffered_response_data_;
Buffer::InstancePtr buffered_request_data_;
uint32_t buffer_limit_{0};
uint32_t high_watermark_count_{0};
std::list<DownstreamWatermarkCallbacks*> watermark_callbacks_;
Expand Down
25 changes: 13 additions & 12 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ void StreamEncoderImpl::endEncode() {
}
}

void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) {
void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) {
// It's messy and complicated to try to tag the final write of an HTTP response for response
// tracking for flood protection. Instead, write an empty buffer fragment after the response,
// to allow for tracking.
Expand Down Expand Up @@ -297,20 +297,20 @@ void ConnectionImpl::flushOutput(bool end_encode) {
if (end_encode) {
// If this is an HTTP response in ServerConnectionImpl, track outbound responses for flood
// protection
maybeAddSentinelBufferFragment(output_buffer_);
maybeAddSentinelBufferFragment(*output_buffer_);
}
connection().write(output_buffer_, false);
ASSERT(0UL == output_buffer_.length());
connection().write(*output_buffer_, false);
ASSERT(0UL == output_buffer_->length());
}

void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_.add(data); }
void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_->add(data); }

void ConnectionImpl::addCharToBuffer(char c) { output_buffer_.add(&c, 1); }
void ConnectionImpl::addCharToBuffer(char c) { output_buffer_->add(&c, 1); }

void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_.add(absl::StrCat(i)); }
void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_->add(absl::StrCat(i)); }

void ConnectionImpl::copyToBuffer(const char* data, uint64_t length) {
output_buffer_.add(data, length);
output_buffer_->add(data, length);
}

void StreamEncoderImpl::resetStream(StreamResetReason reason) {
Expand Down Expand Up @@ -478,11 +478,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat
strict_1xx_and_204_headers_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.strict_1xx_and_204_response_headers")),
dispatching_(false),
output_buffer_([&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ }),
output_buffer_(connection.dispatcher().getWatermarkFactory().create(
Buffer::BufferType::HttpStreamOutput, [&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) {
output_buffer_.setWatermarks(connection.bufferLimit());
output_buffer_->setWatermarks(connection.bufferLimit());
http_parser_init(&parser_, type);
parser_.allow_chunked_length = 1;
parser_.data = this;
Expand Down
8 changes: 4 additions & 4 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
void addToBuffer(absl::string_view data);
void addCharToBuffer(char c);
void addIntToBuffer(uint64_t i);
Buffer::WatermarkBuffer& buffer() { return output_buffer_; }
Buffer::Instance& buffer() { return *output_buffer_; }
uint64_t bufferRemainingSize();
void copyToBuffer(const char* data, uint64_t length);
void reserveBuffer(uint64_t size);
Expand All @@ -209,7 +209,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
uint32_t bufferLimit() { return connection_.bufferLimit(); }
virtual bool supportsHttp10() { return false; }
bool maybeDirectDispatch(Buffer::Instance& data);
virtual void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer&) {}
virtual void maybeAddSentinelBufferFragment(Buffer::Instance&) {}
CodecStats& stats() { return stats_; }
bool enableTrailers() const { return codec_settings_.enable_trailers_; }

Expand Down Expand Up @@ -446,7 +446,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
// is pushed through the filter pipeline either at the end of the current dispatch call, or when
// the last byte of the body is processed (whichever happens first).
Buffer::OwnedImpl buffered_body_;
Buffer::WatermarkBuffer output_buffer_;
Buffer::InstancePtr output_buffer_;
Protocol protocol_{Protocol::Http11};
const uint32_t max_headers_kb_;
const uint32_t max_headers_count_;
Expand Down Expand Up @@ -533,7 +533,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
void sendProtocolErrorOld(absl::string_view details);

void releaseOutboundResponse(const Buffer::OwnedBufferFragmentImpl* fragment);
void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) override;
void maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) override;
Status doFloodProtectionChecks() const;
Status checkHeaderNameForUnderscores() override;

Expand Down
25 changes: 13 additions & 12 deletions source/common/http/http1/codec_impl_legacy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void StreamEncoderImpl::endEncode() {
}
}

void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) {
void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) {
// It's messy and complicated to try to tag the final write of an HTTP response for response
// tracking for flood protection. Instead, write an empty buffer fragment after the response,
// to allow for tracking.
Expand Down Expand Up @@ -296,20 +296,20 @@ void ConnectionImpl::flushOutput(bool end_encode) {
if (end_encode) {
// If this is an HTTP response in ServerConnectionImpl, track outbound responses for flood
// protection
maybeAddSentinelBufferFragment(output_buffer_);
maybeAddSentinelBufferFragment(*output_buffer_);
}
connection().write(output_buffer_, false);
ASSERT(0UL == output_buffer_.length());
connection().write(*output_buffer_, false);
ASSERT(0UL == output_buffer_->length());
}

void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_.add(data); }
void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_->add(data); }

void ConnectionImpl::addCharToBuffer(char c) { output_buffer_.add(&c, 1); }
void ConnectionImpl::addCharToBuffer(char c) { output_buffer_->add(&c, 1); }

void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_.add(absl::StrCat(i)); }
void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_->add(absl::StrCat(i)); }

void ConnectionImpl::copyToBuffer(const char* data, uint64_t length) {
output_buffer_.add(data, length);
output_buffer_->add(data, length);
}

void StreamEncoderImpl::resetStream(StreamResetReason reason) {
Expand Down Expand Up @@ -455,11 +455,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat
handling_upgrade_(false), reset_stream_called_(false), deferred_end_stream_headers_(false),
strict_1xx_and_204_headers_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.strict_1xx_and_204_response_headers")),
output_buffer_([&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ }),
output_buffer_(connection.dispatcher().getWatermarkFactory().create(
Buffer::BufferType::HttpStreamOutput, [&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) {
output_buffer_.setWatermarks(connection.bufferLimit());
output_buffer_->setWatermarks(connection.bufferLimit());
http_parser_init(&parser_, type);
parser_.allow_chunked_length = 1;
parser_.data = this;
Expand Down
8 changes: 4 additions & 4 deletions source/common/http/http1/codec_impl_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
void addToBuffer(absl::string_view data);
void addCharToBuffer(char c);
void addIntToBuffer(uint64_t i);
Buffer::WatermarkBuffer& buffer() { return output_buffer_; }
Buffer::Instance& buffer() { return *output_buffer_; }
uint64_t bufferRemainingSize();
void copyToBuffer(const char* data, uint64_t length);
void reserveBuffer(uint64_t size);
Expand All @@ -213,7 +213,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
uint32_t bufferLimit() { return connection_.bufferLimit(); }
virtual bool supportsHttp10() { return false; }
bool maybeDirectDispatch(Buffer::Instance& data);
virtual void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer&) {}
virtual void maybeAddSentinelBufferFragment(Buffer::Instance&) {}
Http::Http1::CodecStats& stats() { return stats_; }
bool enableTrailers() const { return codec_settings_.enable_trailers_; }

Expand Down Expand Up @@ -420,7 +420,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
// is pushed through the filter pipeline either at the end of the current dispatch call, or when
// the last byte of the body is processed (whichever happens first).
Buffer::OwnedImpl buffered_body_;
Buffer::WatermarkBuffer output_buffer_;
Buffer::InstancePtr output_buffer_;
Protocol protocol_{Protocol::Http11};
const uint32_t max_headers_kb_;
const uint32_t max_headers_count_;
Expand Down Expand Up @@ -507,7 +507,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
void sendProtocolErrorOld(absl::string_view details);

void releaseOutboundResponse(const Buffer::OwnedBufferFragmentImpl* fragment);
void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) override;
void maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) override;
void doFloodProtectionChecks() const;
void checkHeaderNameForUnderscores() override;

Expand Down
Loading