Skip to content
Merged
20 changes: 20 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,26 @@ class Instance {
template <typename T, size_t Size = sizeof(T)> void writeBEInt(T value) {
writeInt<ByteOrder::BigEndian, T, Size>(value);
}

/**
* Set the buffer's high watermark. The buffer's low watermark is implicitly set to half the high
* watermark. Setting the high watermark to 0 disables watermark functionality.
* @param watermark supplies the buffer high watermark size threshold, in bytes.
*/
virtual void setWatermarks(uint32_t watermark) PURE;
/**
* Returns the configured high watermark. A return value of 0 indicates that watermark
* functionality is disabled.
*/
virtual uint32_t highWatermark() const PURE;
/**
* Determine if the buffer watermark trigger condition is currently set. The watermark trigger is
* set when the buffer size exceeds the configured high watermark and is cleared once the buffer
* size drops to the low watermark.
* @return true if the buffer size once exceeded the high watermark and hasn't since dropped to
* the low watermark.
*/
virtual bool highWatermarkTriggered() const PURE;
};

using InstancePtr = std::unique_ptr<Instance>;
Expand Down
7 changes: 7 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@ class OwnedImpl : public LibEventInstance {
*/
virtual void appendSliceForTest(absl::string_view data);

// Does not implement watermarking.
// TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer
// implementations. Also, make high-watermark config a constructor argument.
void setWatermarks(uint32_t) override { ASSERT(false, "watermarks not implemented."); }
uint32_t highWatermark() const override { return 0; }
bool highWatermarkTriggered() const override { return false; }

/**
* Describe the in-memory representation of the slices in the buffer. For use
* in tests that want to make assertions about the specific arrangement of
Expand Down
6 changes: 3 additions & 3 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ class WatermarkBuffer : public OwnedImpl {
void appendSliceForTest(const void* data, uint64_t size) override;
void appendSliceForTest(absl::string_view data) override;

void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t highWatermark() const { return high_watermark_; }
uint32_t highWatermark() const override { return high_watermark_; }
// Returns true if the high watermark callbacks have been called more recently
// than the low watermark callbacks.
bool highWatermarkTriggered() const { return above_high_watermark_called_; }
bool highWatermarkTriggered() const override { return above_high_watermark_called_; }

private:
void checkHighAndOverflowWatermarks();
Expand Down
14 changes: 7 additions & 7 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,16 @@ bool ActiveStreamDecoderFilter::canContinue() {
return !parent_.state_.local_complete_;
}

Buffer::WatermarkBufferPtr ActiveStreamDecoderFilter::createBuffer() {
auto buffer = std::make_unique<Buffer::WatermarkBuffer>(
Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() {
auto buffer = dispatcher().getWatermarkFactory().create(
[this]() -> void { this->requestDataDrained(); },
[this]() -> void { this->requestDataTooLarge(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
buffer->setWatermarks(parent_.buffer_limit_);
return buffer;
}

Buffer::WatermarkBufferPtr& ActiveStreamDecoderFilter::bufferedData() {
Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
return parent_.buffered_request_data_;
}

Expand Down Expand Up @@ -1302,15 +1302,15 @@ absl::optional<Router::ConfigConstSharedPtr> ActiveStreamDecoderFilter::routeCon
return parent_.filter_manager_callbacks_.routeConfig();
}

Buffer::WatermarkBufferPtr ActiveStreamEncoderFilter::createBuffer() {
auto buffer = new Buffer::WatermarkBuffer(
Buffer::InstancePtr ActiveStreamEncoderFilter::createBuffer() {
auto buffer = dispatcher().getWatermarkFactory().create(
[this]() -> void { this->responseDataDrained(); },
[this]() -> void { this->responseDataTooLarge(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
buffer->setWatermarks(parent_.buffer_limit_);
return Buffer::WatermarkBufferPtr{buffer};
return buffer;
}
Buffer::WatermarkBufferPtr& ActiveStreamEncoderFilter::bufferedData() {
Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() {
return parent_.buffered_response_data_;
}
bool ActiveStreamEncoderFilter::complete() { return parent_.state_.local_complete_; }
Expand Down
16 changes: 8 additions & 8 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,

void commonContinue();
virtual bool canContinue() PURE;
virtual Buffer::WatermarkBufferPtr createBuffer() PURE;
virtual Buffer::WatermarkBufferPtr& bufferedData() PURE;
virtual Buffer::InstancePtr createBuffer() PURE;
virtual Buffer::InstancePtr& bufferedData() PURE;
virtual bool complete() PURE;
virtual bool has100Continueheaders() PURE;
virtual void do100ContinueHeaders() PURE;
Expand Down Expand Up @@ -138,8 +138,8 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,

// ActiveStreamFilterBase
bool canContinue() override;
Buffer::WatermarkBufferPtr createBuffer() override;
Buffer::WatermarkBufferPtr& bufferedData() override;
Buffer::InstancePtr createBuffer() override;
Buffer::InstancePtr& bufferedData() override;
bool complete() override;
bool has100Continueheaders() override { return false; }
void do100ContinueHeaders() override { NOT_REACHED_GCOVR_EXCL_LINE; }
Expand Down Expand Up @@ -224,8 +224,8 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,

// ActiveStreamFilterBase
bool canContinue() override { return true; }
Buffer::WatermarkBufferPtr createBuffer() override;
Buffer::WatermarkBufferPtr& bufferedData() override;
Buffer::InstancePtr createBuffer() override;
Buffer::InstancePtr& bufferedData() override;
bool complete() override;
bool has100Continueheaders() override;
void do100ContinueHeaders() override;
Expand Down Expand Up @@ -776,8 +776,8 @@ class FilterManager : public ScopeTrackedObject,
// processing the next filter. The storage is created on demand. We need to store metadata
// temporarily in the filter in case the filter has stopped all while processing headers.
std::unique_ptr<MetadataMapVector> request_metadata_map_vector_;
Buffer::WatermarkBufferPtr buffered_response_data_;
Buffer::WatermarkBufferPtr buffered_request_data_;
Buffer::InstancePtr buffered_response_data_;
Buffer::InstancePtr buffered_request_data_;
uint32_t buffer_limit_{0};
uint32_t high_watermark_count_{0};
std::list<DownstreamWatermarkCallbacks*> watermark_callbacks_;
Expand Down
26 changes: 13 additions & 13 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ void StreamEncoderImpl::endEncode() {
}
}

void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) {
void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) {
// It's messy and complicated to try to tag the final write of an HTTP response for response
// tracking for flood protection. Instead, write an empty buffer fragment after the response,
// to allow for tracking.
Expand Down Expand Up @@ -297,20 +297,20 @@ void ConnectionImpl::flushOutput(bool end_encode) {
if (end_encode) {
// If this is an HTTP response in ServerConnectionImpl, track outbound responses for flood
// protection
maybeAddSentinelBufferFragment(output_buffer_);
maybeAddSentinelBufferFragment(*output_buffer_);
}
connection().write(output_buffer_, false);
ASSERT(0UL == output_buffer_.length());
connection().write(*output_buffer_, false);
ASSERT(0UL == output_buffer_->length());
}

void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_.add(data); }
void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_->add(data); }

void ConnectionImpl::addCharToBuffer(char c) { output_buffer_.add(&c, 1); }
void ConnectionImpl::addCharToBuffer(char c) { output_buffer_->add(&c, 1); }

void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_.add(absl::StrCat(i)); }
void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_->add(absl::StrCat(i)); }

void ConnectionImpl::copyToBuffer(const char* data, uint64_t length) {
output_buffer_.add(data, length);
output_buffer_->add(data, length);
}

void StreamEncoderImpl::resetStream(StreamResetReason reason) {
Expand Down Expand Up @@ -477,12 +477,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat
handling_upgrade_(false), reset_stream_called_(false), deferred_end_stream_headers_(false),
strict_1xx_and_204_headers_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.strict_1xx_and_204_response_headers")),
dispatching_(false),
output_buffer_([&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ }),
dispatching_(false), output_buffer_(connection.dispatcher().getWatermarkFactory().create(
[&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) {
output_buffer_.setWatermarks(connection.bufferLimit());
output_buffer_->setWatermarks(connection.bufferLimit());
http_parser_init(&parser_, type);
parser_.allow_chunked_length = 1;
parser_.data = this;
Expand Down
10 changes: 6 additions & 4 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
void addToBuffer(absl::string_view data);
void addCharToBuffer(char c);
void addIntToBuffer(uint64_t i);
Buffer::WatermarkBuffer& buffer() { return output_buffer_; }
Buffer::Instance& buffer() { return *output_buffer_; }
uint64_t bufferRemainingSize();
void copyToBuffer(const char* data, uint64_t length);
void reserveBuffer(uint64_t size);
Expand All @@ -209,7 +209,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
uint32_t bufferLimit() { return connection_.bufferLimit(); }
virtual bool supportsHttp10() { return false; }
bool maybeDirectDispatch(Buffer::Instance& data);
virtual void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer&) {}
virtual void maybeAddSentinelBufferFragment(Buffer::Instance&) {}
CodecStats& stats() { return stats_; }
bool enableTrailers() const { return codec_settings_.enable_trailers_; }

Expand Down Expand Up @@ -448,7 +448,9 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
// is pushed through the filter pipeline either at the end of the current dispatch call, or when
// the last byte of the body is processed (whichever happens first).
Buffer::OwnedImpl buffered_body_;
Buffer::WatermarkBuffer output_buffer_;
// Buffer used to encode the HTTP message before moving it to the network connection's output
// buffer. This buffer is always allocated, never nullptr.
Buffer::InstancePtr output_buffer_;
Protocol protocol_{Protocol::Http11};
const uint32_t max_headers_kb_;
const uint32_t max_headers_count_;
Expand Down Expand Up @@ -535,7 +537,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
void sendProtocolErrorOld(absl::string_view details);

void releaseOutboundResponse(const Buffer::OwnedBufferFragmentImpl* fragment);
void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) override;
void maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) override;
Status doFloodProtectionChecks() const;
Status checkHeaderNameForUnderscores() override;

Expand Down
25 changes: 13 additions & 12 deletions source/common/http/http1/codec_impl_legacy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void StreamEncoderImpl::endEncode() {
}
}

void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) {
void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) {
// It's messy and complicated to try to tag the final write of an HTTP response for response
// tracking for flood protection. Instead, write an empty buffer fragment after the response,
// to allow for tracking.
Expand Down Expand Up @@ -296,20 +296,20 @@ void ConnectionImpl::flushOutput(bool end_encode) {
if (end_encode) {
// If this is an HTTP response in ServerConnectionImpl, track outbound responses for flood
// protection
maybeAddSentinelBufferFragment(output_buffer_);
maybeAddSentinelBufferFragment(*output_buffer_);
}
connection().write(output_buffer_, false);
ASSERT(0UL == output_buffer_.length());
connection().write(*output_buffer_, false);
ASSERT(0UL == output_buffer_->length());
}

void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_.add(data); }
void ConnectionImpl::addToBuffer(absl::string_view data) { output_buffer_->add(data); }

void ConnectionImpl::addCharToBuffer(char c) { output_buffer_.add(&c, 1); }
void ConnectionImpl::addCharToBuffer(char c) { output_buffer_->add(&c, 1); }

void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_.add(absl::StrCat(i)); }
void ConnectionImpl::addIntToBuffer(uint64_t i) { output_buffer_->add(absl::StrCat(i)); }

void ConnectionImpl::copyToBuffer(const char* data, uint64_t length) {
output_buffer_.add(data, length);
output_buffer_->add(data, length);
}

void StreamEncoderImpl::resetStream(StreamResetReason reason) {
Expand Down Expand Up @@ -455,11 +455,12 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat
handling_upgrade_(false), reset_stream_called_(false), deferred_end_stream_headers_(false),
strict_1xx_and_204_headers_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.strict_1xx_and_204_response_headers")),
output_buffer_([&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ }),
output_buffer_(connection.dispatcher().getWatermarkFactory().create(
[&]() -> void { this->onBelowLowWatermark(); },
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) {
output_buffer_.setWatermarks(connection.bufferLimit());
output_buffer_->setWatermarks(connection.bufferLimit());
http_parser_init(&parser_, type);
parser_.allow_chunked_length = 1;
parser_.data = this;
Expand Down
8 changes: 4 additions & 4 deletions source/common/http/http1/codec_impl_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
void addToBuffer(absl::string_view data);
void addCharToBuffer(char c);
void addIntToBuffer(uint64_t i);
Buffer::WatermarkBuffer& buffer() { return output_buffer_; }
Buffer::Instance& buffer() { return *output_buffer_; }
uint64_t bufferRemainingSize();
void copyToBuffer(const char* data, uint64_t length);
void reserveBuffer(uint64_t size);
Expand All @@ -213,7 +213,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
uint32_t bufferLimit() { return connection_.bufferLimit(); }
virtual bool supportsHttp10() { return false; }
bool maybeDirectDispatch(Buffer::Instance& data);
virtual void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer&) {}
virtual void maybeAddSentinelBufferFragment(Buffer::Instance&) {}
Http::Http1::CodecStats& stats() { return stats_; }
bool enableTrailers() const { return codec_settings_.enable_trailers_; }

Expand Down Expand Up @@ -422,7 +422,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
// is pushed through the filter pipeline either at the end of the current dispatch call, or when
// the last byte of the body is processed (whichever happens first).
Buffer::OwnedImpl buffered_body_;
Buffer::WatermarkBuffer output_buffer_;
Buffer::InstancePtr output_buffer_;
Protocol protocol_{Protocol::Http11};
const uint32_t max_headers_kb_;
const uint32_t max_headers_count_;
Expand Down Expand Up @@ -509,7 +509,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
void sendProtocolErrorOld(absl::string_view details);

void releaseOutboundResponse(const Buffer::OwnedBufferFragmentImpl* fragment);
void maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) override;
void maybeAddSentinelBufferFragment(Buffer::Instance& output_buffer) override;
void doFloodProtectionChecks() const;
void checkHeaderNameForUnderscores() override;

Expand Down
Loading