Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8c99d4f
Implemented H2 stream level buffer accounting.
KBaichoo Apr 13, 2021
44a5e05
Simplified stream API, H2 codec impl. Reverted setWatermark as change
KBaichoo Apr 30, 2021
6956e95
Added test for Set and Wait for Expected Account Balance.
KBaichoo Apr 30, 2021
dbc66ad
Fixing broken tests.
KBaichoo May 3, 2021
65e8206
Merge remote-tracking branch 'upstream/main' into accounting-connect-…
KBaichoo May 3, 2021
8abaa72
Fixed broken extension test.
KBaichoo May 3, 2021
f7ca7dd
Moved location of method.
KBaichoo May 4, 2021
922a222
Merged upstream.
KBaichoo May 6, 2021
0295ac9
Use Release Assert in test code with side effects (i.e. erase).
KBaichoo May 6, 2021
1644e66
Minor Fixes.
KBaichoo May 7, 2021
091dc3e
Added disabled by default test only runtime guard.
KBaichoo May 10, 2021
2d5fba4
Merge remote-tracking branch 'upstream/main' into accounting-connect-…
KBaichoo May 10, 2021
3e5f947
Clangtidy.
KBaichoo May 11, 2021
324dbf9
Merged main.
KBaichoo May 13, 2021
ee69562
Merge remote-tracking branch 'upstream/main' into accounting-connect-…
KBaichoo May 14, 2021
d8f2db7
Merge remote-tracking branch 'upstream/main' into accounting-connect-…
KBaichoo May 17, 2021
2e65c5d
Cleanup tests.
KBaichoo May 19, 2021
f204c8a
Merge remote-tracking branch 'upstream/main' into accounting-connect-…
KBaichoo May 19, 2021
c06b5d5
Additional test infra changes.
KBaichoo May 20, 2021
7b32a8d
Nit fix.
KBaichoo May 21, 2021
60ca164
Merge remote-tracking branch 'upstream/main' into accounting-connect-…
KBaichoo May 21, 2021
03e7a13
Coverage revert.
KBaichoo May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class Instance {

/**
* Binds the account to be charged for resources used by the buffer. This
* should only be called once.
* should only be called when the buffer is empty as existing slices
* within the buffer won't retroactively get tagged.
*
* @param account a shared_ptr to the account to charge.
*/
Expand Down Expand Up @@ -449,6 +450,7 @@ class Instance {
* @param watermark supplies the buffer high watermark size threshold, in bytes.
*/
virtual void setWatermarks(uint32_t watermark) PURE;

/**
* Returns the configured high watermark. A return value of 0 indicates that watermark
* functionality is disabled.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ class Stream {
* small window updates as satisfying the idle timeout as this is a potential DoS vector.
*/
virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE;

/**
* Sets the account for this stream, propagating it to all of its buffers.
* @param the account to assign this stream.
*/
virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE;
};

/**
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <string>

#include "envoy/access_log/access_log.h"
#include "envoy/buffer/buffer.h"
#include "envoy/common/scope_tracker.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/status.h"
Expand Down Expand Up @@ -556,6 +557,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual uint32_t decoderBufferLimit() PURE;

/**
* @return the account, if any, used by this stream.
*/
virtual Buffer::BufferMemoryAccountSharedPtr account() const PURE;

/**
* Takes a stream, and acts as if the headers are newly arrived.
* On success, this will result in a creating a new filter chain and likely
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,12 @@ class GenericUpstream {
* @param reason supplies the reset reason.
*/
virtual void resetStream() PURE;

/**
* Sets the upstream to use the following account.
* @param the account to assign the generic upstream.
*/
virtual void setAccount(Buffer::BufferMemoryAccountSharedPtr account) PURE;
};

using GenericConnPoolPtr = std::unique_ptr<GenericConnPool>;
Expand Down
4 changes: 2 additions & 2 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ void OwnedImpl::addDrainTracker(std::function<void()> drain_tracker) {

void OwnedImpl::bindAccount(BufferMemoryAccountSharedPtr account) {
ASSERT(slices_.empty());
// We don't yet have an account bound.
ASSERT(!account_);
account_ = std::move(account);
}

BufferMemoryAccountSharedPtr OwnedImpl::getAccountForTest() { return account_; }

void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); }

void OwnedImpl::addBufferFragment(BufferFragment& fragment) {
Expand Down
5 changes: 5 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,11 @@ class OwnedImpl : public LibEventInstance {
*/
virtual void appendSliceForTest(absl::string_view data);

/**
* @return the BufferMemoryAccount bound to this buffer, if any.
*/
BufferMemoryAccountSharedPtr getAccountForTest();

// Does not implement watermarking.
// TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer
// implementations. Also, make high-watermark config a constructor argument.
Expand Down
2 changes: 1 addition & 1 deletion source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WatermarkBuffer : public OwnedImpl {

protected:
virtual void checkHighAndOverflowWatermarks();
void checkLowWatermark();
virtual void checkLowWatermark();

private:
void commit(uint64_t length, absl::Span<RawSlice> slices,
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; }
void clearRouteCache() override {}
uint64_t streamId() const override { return stream_id_; }
// TODO(kbaichoo): Plumb account from owning request filter.
Buffer::BufferMemoryAccountSharedPtr account() const override { return nullptr; }
Tracing::Span& activeSpan() override { return active_span_; }
const Tracing::Config& tracingConfig() override { return tracing_config_; }
void continueDecoding() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Expand Down
20 changes: 16 additions & 4 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,18 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod
}

ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());
ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit()));

// Set the account to start accounting if enabled. This is still a
// work-in-progress, and will be removed when other features using the
// accounting are implemented.
Buffer::BufferMemoryAccountSharedPtr downstream_request_account;
if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) {
downstream_request_account = std::make_shared<Buffer::BufferMemoryAccountImpl>();
response_encoder.getStream().setAccount(downstream_request_account);
}
ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(),
std::move(downstream_request_account)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking a bit about the future:

It would be useful to have a way to go from Account to ActiveStream. Possible strawperson:

class BufferMemoryAccount {
+  // Methods to set and clear the account owner.
+  virtual void setOwner(AccountOwner& owner) PURE;
+  virtual void clearOwner() PURE;
};

class AccountOwner {
public:
  // Reset the stream owned by this account.
  // Better method names may be possible.  Reason argument seems optional since we expect this 
  // to be called only when under memory pressure.
  void resetStream(Reason reason) PURE;
  // Dump debug information about the stream.
  void dumpState(std::ostream& os) PURE;
};

The account interface could wrap the reset and dump methods so they are no-ops if the account has no owner, or have method to get an optional reference to the owner that the per-worker overload trackers can use to run those operations on the relevant owner objects.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll need something like this later down the line, and will address this then.


new_stream->state_.is_internally_created_ = is_internally_created;
new_stream->response_encoder_ = &response_encoder;
new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
Expand Down Expand Up @@ -571,13 +582,14 @@ void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestSrdsUpdate(
}

ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager,
uint32_t buffer_limit)
uint32_t buffer_limit,
Buffer::BufferMemoryAccountSharedPtr account)
: connection_manager_(connection_manager),
stream_id_(connection_manager.random_generator_.random()),
filter_manager_(*this, connection_manager_.read_callbacks_->connection().dispatcher(),
connection_manager_.read_callbacks_->connection(), stream_id_,
connection_manager_.config_.proxy100Continue(), buffer_limit,
connection_manager_.config_.filterFactory(),
std::move(account), connection_manager_.config_.proxy100Continue(),
buffer_limit, connection_manager_.config_.filterFactory(),
connection_manager_.config_.localReply(),
connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
public Tracing::Config,
public ScopeTrackedObject,
public FilterManagerCallbacks {
ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit);
ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit,
Buffer::BufferMemoryAccountSharedPtr account);
void completeRequest();

const Network::Connection* connection();
Expand Down
4 changes: 4 additions & 0 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1578,5 +1578,9 @@ void ActiveStreamFilterBase::resetStream() { parent_.filter_manager_callbacks_.r

uint64_t ActiveStreamFilterBase::streamId() const { return parent_.streamId(); }

Buffer::BufferMemoryAccountSharedPtr ActiveStreamDecoderFilter::account() const {
return parent_.account();
}

} // namespace Http
} // namespace Envoy
13 changes: 9 additions & 4 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>
#include <memory>

#include "envoy/buffer/buffer.h"
#include "envoy/common/optref.h"
#include "envoy/extensions/filters/common/matcher/action/v3/skip_action.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
Expand Down Expand Up @@ -277,6 +278,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) override;

Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override;
Buffer::BufferMemoryAccountSharedPtr account() const override;

// Each decoder filter instance checks if the request passed to the filter is gRPC
// so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders()
Expand Down Expand Up @@ -640,15 +642,16 @@ class FilterManager : public ScopeTrackedObject,
Logger::Loggable<Logger::Id::http> {
public:
FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher,
const Network::Connection& connection, uint64_t stream_id, bool proxy_100_continue,
const Network::Connection& connection, uint64_t stream_id,
Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue,
uint32_t buffer_limit, FilterChainFactory& filter_chain_factory,
const LocalReply::LocalReply& local_reply, Http::Protocol protocol,
TimeSource& time_source, StreamInfo::FilterStateSharedPtr parent_filter_state,
StreamInfo::FilterState::LifeSpan filter_state_life_span)
: filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher),
connection_(connection), stream_id_(stream_id), proxy_100_continue_(proxy_100_continue),
buffer_limit_(buffer_limit), filter_chain_factory_(filter_chain_factory),
local_reply_(local_reply),
connection_(connection), stream_id_(stream_id), account_(std::move(account)),
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit),
filter_chain_factory_(filter_chain_factory), local_reply_(local_reply),
stream_info_(protocol, time_source, connection.addressProviderSharedPtr(),
parent_filter_state, filter_state_life_span) {}
~FilterManager() override {
Expand Down Expand Up @@ -913,6 +916,7 @@ class FilterManager : public ScopeTrackedObject,
const Network::Connection* connection() const { return &connection_; }

uint64_t streamId() const { return stream_id_; }
Buffer::BufferMemoryAccountSharedPtr account() const { return account_; }

Buffer::InstancePtr& bufferedRequestData() { return buffered_request_data_; }

Expand Down Expand Up @@ -986,6 +990,7 @@ class FilterManager : public ScopeTrackedObject,
Event::Dispatcher& dispatcher_;
const Network::Connection& connection_;
const uint64_t stream_id_;
Buffer::BufferMemoryAccountSharedPtr account_;
const bool proxy_100_continue_;

std::list<ActiveStreamDecoderFilterPtr> decoder_filters_;
Expand Down
4 changes: 4 additions & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class StreamEncoderImpl : public virtual StreamEncoder,
// require a flush timeout not already covered by other timeouts.
}

void setAccount(Buffer::BufferMemoryAccountSharedPtr) override {
// TODO(kbaichoo): implement account tracking for H1.
}

void setIsResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; }
void setIsResponseToConnectRequest(bool value) { is_response_to_connect_request_ = value; }
void setDetails(absl::string_view details) { details_ = details; }
Expand Down
45 changes: 31 additions & 14 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,17 @@ template <typename T> static T* removeConst(const void* object) {
}

ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit)
: parent_(parent), local_end_stream_sent_(false), remote_end_stream_(false),
data_deferred_(false), received_noninformational_headers_(false),
: parent_(parent),
pending_recv_data_(parent_.connection_.dispatcher().getWatermarkFactory().create(
[this]() -> void { this->pendingRecvBufferLowWatermark(); },
[this]() -> void { this->pendingRecvBufferHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
pending_send_data_(parent_.connection_.dispatcher().getWatermarkFactory().create(
[this]() -> void { this->pendingSendBufferLowWatermark(); },
[this]() -> void { this->pendingSendBufferHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false),
received_noninformational_headers_(false),
pending_receive_buffer_high_watermark_called_(false),
pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) {
parent_.stats_.streams_active_.inc();
Expand All @@ -145,7 +154,7 @@ ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr
void ConnectionImpl::StreamImpl::destroy() {
disarmStreamIdleTimer();
parent_.stats_.streams_active_.dec();
parent_.stats_.pending_send_bytes_.sub(pending_send_data_.length());
parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length());
}

static void insertHeader(std::vector<nghttp2_nv>& headers, const HeaderEntry& header) {
Expand Down Expand Up @@ -253,7 +262,7 @@ void ConnectionImpl::ServerStreamImpl::encodeHeaders(const ResponseHeaderMap& he
void ConnectionImpl::StreamImpl::encodeTrailersBase(const HeaderMap& trailers) {
ASSERT(!local_end_stream_);
local_end_stream_ = true;
if (pending_send_data_.length() > 0) {
if (pending_send_data_->length() > 0) {
// In this case we want trailers to come after we release all pending body data that is
// waiting on window updates. We need to save the trailers so that we can emit them later.
// However, for empty trailers, we don't need to to save the trailers.
Expand Down Expand Up @@ -409,13 +418,13 @@ void ConnectionImpl::StreamImpl::submitMetadata(uint8_t flags) {
}

ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) {
if (pending_send_data_.length() == 0 && !local_end_stream_) {
if (pending_send_data_->length() == 0 && !local_end_stream_) {
ASSERT(!data_deferred_);
data_deferred_ = true;
return NGHTTP2_ERR_DEFERRED;
} else {
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
if (local_end_stream_ && pending_send_data_.length() <= length) {
if (local_end_stream_ && pending_send_data_->length() <= length) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
if (pending_trailers_to_encode_) {
// We need to tell the library to not set end stream so that we can emit the trailers.
Expand All @@ -425,7 +434,7 @@ ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t*
}
}

return std::min(length, pending_send_data_.length());
return std::min(length, pending_send_data_->length());
}
}

Expand All @@ -446,7 +455,7 @@ void ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t
}

parent_.stats_.pending_send_bytes_.sub(length);
output.move(pending_send_data_, length);
output.move(*pending_send_data_, length);
parent_.connection_.write(output, false);
}

Expand Down Expand Up @@ -491,7 +500,9 @@ void ConnectionImpl::StreamImpl::onPendingFlushTimer() {

void ConnectionImpl::StreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
ASSERT(!local_end_stream_);
encodeDataHelper(data, end_stream, /*skip_encoding_empty_trailers=*/false);
encodeDataHelper(data, end_stream,
/*skip_encoding_empty_trailers=*/
false);
}

void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool end_stream,
Expand All @@ -502,7 +513,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e

local_end_stream_ = end_stream;
parent_.stats_.pending_send_bytes_.add(data.length());
pending_send_data_.move(data);
pending_send_data_->move(data);
if (data_deferred_) {
int rc = nghttp2_session_resume_data(parent_.session_, stream_id_);
ASSERT(rc == 0);
Expand All @@ -514,7 +525,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e
// Intended to check through coverage that this error case is tested
return;
}
if (local_end_stream_ && pending_send_data_.length() > 0) {
if (local_end_stream_ && pending_send_data_->length() > 0) {
createPendingFlushTimer();
}
}
Expand Down Expand Up @@ -576,6 +587,12 @@ void ConnectionImpl::StreamImpl::onMetadataDecoded(MetadataMapPtr&& metadata_map
}
}

void ConnectionImpl::StreamImpl::setAccount(Buffer::BufferMemoryAccountSharedPtr account) {
buffer_memory_account_ = account;
pending_recv_data_->bindAccount(buffer_memory_account_);
pending_send_data_->bindAccount(buffer_memory_account_);
}

ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stats,
Random::RandomGenerator& random_generator,
const envoy::config::core::v3::Http2ProtocolOptions& http2_options,
Expand Down Expand Up @@ -707,7 +724,7 @@ int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) {
StreamImpl* stream = getStream(stream_id);
// If this results in buffering too much data, the watermark buffer will call
// pendingRecvBufferHighWatermark, resulting in ++read_disable_count_
stream->pending_recv_data_.add(data, len);
stream->pending_recv_data_->add(data, len);
// Update the window to the peer unless some consumer of this stream's data has hit a flow control
// limit and disabled reads on this stream
if (!stream->buffersOverrun()) {
Expand Down Expand Up @@ -862,10 +879,10 @@ Status ConnectionImpl::onFrameReceived(const nghttp2_frame* frame) {
// It's possible that we are waiting to send a deferred reset, so only raise data if local
// is not complete.
if (!stream->deferred_reset_) {
stream->decoder().decodeData(stream->pending_recv_data_, stream->remote_end_stream_);
stream->decoder().decodeData(*stream->pending_recv_data_, stream->remote_end_stream_);
}

stream->pending_recv_data_.drain(stream->pending_recv_data_.length());
stream->pending_recv_data_->drain(stream->pending_recv_data_->length());
break;
}
case NGHTTP2_RST_STREAM: {
Expand Down
Loading