Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ class Stream : public StreamResetHandler {
*/
virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE;

/**
* @return the account, if any, used by this stream.
*/
virtual Buffer::BufferMemoryAccountSharedPtr account() const PURE;

/**
* Sets the account for this stream, propagating it to all of its buffers.
* @param the account to assign this stream.
Expand Down
15 changes: 10 additions & 5 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,17 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod

ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());

// Create account, wiring the stream to use it for tracking bytes.
// If tracking is disabled, the wiring becomes a NOP.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
response_encoder.getStream().account();

if (downstream_stream_account == nullptr) {
// Create account, wiring the stream to use it for tracking bytes.
// If tracking is disabled, the wiring becomes a NOP.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
}

ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(),
std::move(downstream_stream_account)));

Expand Down
2 changes: 2 additions & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class StreamEncoderImpl : public virtual StreamEncoder,
// require a flush timeout not already covered by other timeouts.
}

Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }

void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
// TODO(kbaichoo): implement account tracking for H1. Particularly, binding
// the account to the buffers used. The current wiring is minimal, and used
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class ConnectionImpl : public virtual Connection,
return parent_.connection_.connectionInfoProvider();
}
absl::string_view responseDetails() override { return details_; }
Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }
void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override;

// ScopeTrackedObject
Expand Down
2 changes: 2 additions & 0 deletions source/common/quic/envoy_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
return connection()->connectionInfoProvider();
}

Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }

void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
buffer_memory_account_ = account;
}
Expand Down
85 changes: 85 additions & 0 deletions test/integration/buffer_accounting_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,91 @@ TEST_P(Http2BufferWatermarksTest, ShouldCreateFourBuffersPerAccount) {
EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0));
}

TEST_P(Http2BufferWatermarksTest, AccountsAndInternalRedirect) {
const Http::TestResponseHeaderMapImpl redirect_response{
{":status", "302"}, {"content-length", "0"}, {"location", "http://authority2/new/url"}};

auto handle = config_helper_.createVirtualHost("handle.internal.redirect");
handle.mutable_routes(0)->set_name("redirect");
handle.mutable_routes(0)->mutable_route()->mutable_internal_redirect_policy();
config_helper_.addVirtualHost(handle);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));

default_request_headers_.setHost("handle.internal.redirect");
IntegrationStreamDecoderPtr response =
codec_client_->makeHeaderOnlyRequest(default_request_headers_);

waitForNextUpstreamRequest();

if (streamBufferAccounting()) {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1);
} else {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0);
}

upstream_request_->encodeHeaders(redirect_response, true);
waitForNextUpstreamRequest();

upstream_request_->encodeHeaders(default_response_headers_, true);

ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());

if (streamBufferAccounting()) {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1) << printAccounts();
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)) << printAccounts();
} else {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0);
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0));
}
}

TEST_P(Http2BufferWatermarksTest, AccountsAndInternalRedirectWithRequestBody) {
const Http::TestResponseHeaderMapImpl redirect_response{
{":status", "302"}, {"content-length", "0"}, {"location", "http://authority2/new/url"}};

auto handle = config_helper_.createVirtualHost("handle.internal.redirect");
handle.mutable_routes(0)->set_name("redirect");
handle.mutable_routes(0)->mutable_route()->mutable_internal_redirect_policy();
config_helper_.addVirtualHost(handle);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));

default_request_headers_.setHost("handle.internal.redirect");
default_request_headers_.setMethod("POST");

const std::string request_body = "foobarbizbaz";
buffer_factory_->setExpectedAccountBalance(request_body.size(), 1);

IntegrationStreamDecoderPtr response =
codec_client_->makeRequestWithBody(default_request_headers_, request_body);

waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(redirect_response, true);

waitForNextUpstreamRequest();

upstream_request_->encodeHeaders(default_response_headers_, true);

ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());

if (streamBufferAccounting()) {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1) << printAccounts();
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)) << printAccounts();
EXPECT_TRUE(
buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout))
<< "buffer total: " << buffer_factory_->totalBufferSize()
<< " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts();
} else {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0);
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0));
}
}

TEST_P(Http2BufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) {
const int num_requests = 5;
const uint32_t request_body_size = 4096;
Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class MockStream : public Stream {
MOCK_METHOD(uint32_t, bufferLimit, (), (const));
MOCK_METHOD(const Network::ConnectionInfoProvider&, connectionInfoProvider, ());
MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout));
MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, account, (), (const));
MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr));

// Use the same underlying structure as StreamCallbackHelper to insure iteration stability
Expand Down