Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ class Stream : public StreamResetHandler {
*/
virtual void setFlushTimeout(std::chrono::milliseconds timeout) PURE;

/**
* @return the account, if any, used by this stream.
*/
virtual Buffer::BufferMemoryAccountSharedPtr account() const PURE;

/**
* Sets the account for this stream, propagating it to all of its buffers.
* @param the account to assign this stream.
Expand Down
15 changes: 10 additions & 5 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,17 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod

ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());

// Create account, wiring the stream to use it for tracking bytes.
// If tracking is disabled, the wiring becomes a NOP.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
response_encoder.getStream().account();

if (downstream_stream_account == nullptr) {
// Create account, wiring the stream to use it for tracking bytes.
// If tracking is disabled, the wiring becomes a NOP.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
}

ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(),
std::move(downstream_stream_account)));

Expand Down
2 changes: 2 additions & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class StreamEncoderImpl : public virtual StreamEncoder,
// require a flush timeout not already covered by other timeouts.
}

Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }

void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
// TODO(kbaichoo): implement account tracking for H1. Particularly, binding
// the account to the buffers used. The current wiring is minimal, and used
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class ConnectionImpl : public virtual Connection,
return parent_.connection_.connectionInfoProvider();
}
absl::string_view responseDetails() override { return details_; }
Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }
void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override;

// ScopeTrackedObject
Expand Down
2 changes: 2 additions & 0 deletions source/common/quic/envoy_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
return connection()->connectionInfoProvider();
}

Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }

void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
buffer_memory_account_ = account;
}
Expand Down
85 changes: 85 additions & 0 deletions test/integration/buffer_accounting_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,91 @@ TEST_P(Http2BufferWatermarksTest, ShouldCreateFourBuffersPerAccount) {
EXPECT_TRUE(buffer_factory_->waitUntilExpectedNumberOfAccountsAndBoundBuffers(0, 0));
}

TEST_P(Http2BufferWatermarksTest, AccountsAndInternalRedirect) {
Http::TestResponseHeaderMapImpl redirect_response_{
{":status", "302"}, {"content-length", "0"}, {"location", "http://authority2/new/url"}};

auto handle = config_helper_.createVirtualHost("handle.internal.redirect");
handle.mutable_routes(0)->set_name("redirect");
handle.mutable_routes(0)->mutable_route()->mutable_internal_redirect_policy();
config_helper_.addVirtualHost(handle);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));

default_request_headers_.setHost("handle.internal.redirect");
IntegrationStreamDecoderPtr response =
codec_client_->makeHeaderOnlyRequest(default_request_headers_);

waitForNextUpstreamRequest();

if (streamBufferAccounting()) {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1);
} else {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0);
}

upstream_request_->encodeHeaders(redirect_response_, true);
waitForNextUpstreamRequest();

upstream_request_->encodeHeaders(default_response_headers_, true);

ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());

if (streamBufferAccounting()) {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1) << printAccounts();
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)) << printAccounts();
} else {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0);
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0));
}
}

TEST_P(Http2BufferWatermarksTest, AccountsAndInternalRedirectWithRequestBody) {
Http::TestResponseHeaderMapImpl redirect_response_{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: naming, should be redirect_response the _ suffix is reserved for object fields. Consider making const since it's not modified and shifting it down right where it's used.

Same with above test.

{":status", "302"}, {"content-length", "0"}, {"location", "http://authority2/new/url"}};

auto handle = config_helper_.createVirtualHost("handle.internal.redirect");
handle.mutable_routes(0)->set_name("redirect");
handle.mutable_routes(0)->mutable_route()->mutable_internal_redirect_policy();
config_helper_.addVirtualHost(handle);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));

default_request_headers_.setHost("handle.internal.redirect");
default_request_headers_.setMethod("POST");

const std::string& request_body = "foobarbizbaz";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be a reference

buffer_factory_->setExpectedAccountBalance(request_body.size(), 1);

IntegrationStreamDecoderPtr response =
codec_client_->makeRequestWithBody(default_request_headers_, request_body);

waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(redirect_response_, true);

waitForNextUpstreamRequest();

upstream_request_->encodeHeaders(default_response_headers_, true);

ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());

if (streamBufferAccounting()) {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 1) << printAccounts();
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(1)) << printAccounts();
EXPECT_TRUE(
buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout))
<< "buffer total: " << buffer_factory_->totalBufferSize()
<< " buffer max: " << buffer_factory_->maxBufferSize() << printAccounts();
} else {
EXPECT_EQ(buffer_factory_->numAccountsCreated(), 0);
EXPECT_TRUE(buffer_factory_->waitForExpectedAccountUnregistered(0));
}
}

TEST_P(Http2BufferWatermarksTest, ShouldTrackAllocatedBytesToUpstream) {
const int num_requests = 5;
const uint32_t request_body_size = 4096;
Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class MockStream : public Stream {
MOCK_METHOD(uint32_t, bufferLimit, (), (const));
MOCK_METHOD(const Network::ConnectionInfoProvider&, connectionInfoProvider, ());
MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout));
MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, account, (), (const));
MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr));

// Use the same underlying structure as StreamCallbackHelper to insure iteration stability
Expand Down