Implemented H2 stream level buffer accounting.#16218
Implemented H2 stream level buffer accounting.#16218antoniovicente merged 22 commits intoenvoyproxy:mainfrom
Conversation
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
|
/assign @antoniovicente Can you take a look? Thanks |
antoniovicente
left a comment
There was a problem hiding this comment.
Great start. See comments below.
| return buffer_memory_account_; | ||
| } | ||
| void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { | ||
| RELEASE_ASSERT( |
There was a problem hiding this comment.
I'm wondering if for consistency we should have something outside the codec create the account and call setAccount on the serverside connections. I think that a possible place is ConnectionManagerImpl::createCodec which creates the server connection.
Also consider moving the implementation of these methods to the StreamImpl base class if possible.
There was a problem hiding this comment.
I modified ConnectionManagerImpl::newStream to create the account and set it on the newly created downstream request object. This has allowed me to push the implementations to StreamImpl as suggested, and get rid of getAccount() as part of the stream api.
| ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection()); | ||
| ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit())); | ||
| ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), | ||
| response_encoder.getStream().getAccount())); |
There was a problem hiding this comment.
We could also say that the connection manager is the owner of the account and feed the account directly from the connection manager instead of asking the stream. I think we can remove getAccount() from the stream API if we do this change.
See related comment in http2/codec_impl.h setAccount
There was a problem hiding this comment.
See response to comment in http2/codec_impl.h
| Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; } | ||
| void clearRouteCache() override {} | ||
| uint64_t streamId() const override { return stream_id_; } | ||
| // TODO(kbaichoo): Implement this? |
There was a problem hiding this comment.
I think that some users of async client are filters like the RBAC filter. We should plumb accounts to those eventually. Other users may involve config plane operations which are not associated with clients. Still it may be worth associating with an account owned by the config system, as a way to track memory usage by config plane components.
There was a problem hiding this comment.
Could you change the TODO to something like: Plumb account from owning request filter.
There was a problem hiding this comment.
Updated the comment.
| } | ||
|
|
||
| for (auto& acc : account_infos_) { | ||
| if (static_cast<Buffer::BufferMemoryAccountImpl*>(acc.first.get())->balance() < byte_size) { |
There was a problem hiding this comment.
Is this call to balance() happening in the right thread? Remember that buffer_memory_allocated_ is not atomic.
There was a problem hiding this comment.
Hmm, I've changed how this is done so it should be running in the worker thread.
| } | ||
| return true; | ||
| }; | ||
| return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); |
There was a problem hiding this comment.
mutex AwaitWithTimeout only works correctly if changes to the predicate condition can only happen during times the mutex is held. Accounts are not changed with the mutex above held, therefore this await won't work correctly.
| void appendSliceForTest(absl::string_view data) override; | ||
|
|
||
| void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); } | ||
| void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); |
There was a problem hiding this comment.
test/common/buffer/watermark_buffer_test.cc seems to be failing to build because the 2 argument version of setWatermarks is missing, please look into it.
was unnecessary. Fixed potential race condition in checking balances. Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
KBaichoo
left a comment
There was a problem hiding this comment.
Thanks for the review @antoniovicente
| void appendSliceForTest(absl::string_view data) override; | ||
|
|
||
| void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); } | ||
| void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); |
| return buffer_memory_account_; | ||
| } | ||
| void setAccount(Buffer::BufferMemoryAccountSharedPtr) override { | ||
| RELEASE_ASSERT( |
There was a problem hiding this comment.
I modified ConnectionManagerImpl::newStream to create the account and set it on the newly created downstream request object. This has allowed me to push the implementations to StreamImpl as suggested, and get rid of getAccount() as part of the stream api.
| ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection()); | ||
| ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit())); | ||
| ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), | ||
| response_encoder.getStream().getAccount())); |
There was a problem hiding this comment.
See response to comment in http2/codec_impl.h
| } | ||
|
|
||
| for (auto& acc : account_infos_) { | ||
| if (static_cast<Buffer::BufferMemoryAccountImpl*>(acc.first.get())->balance() < byte_size) { |
There was a problem hiding this comment.
Hmm, I've changed how this is done so it should be running in the worker thread.
| } | ||
| return true; | ||
| }; | ||
| return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); |
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
…upstream Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
include/envoy/buffer/buffer.h
Outdated
| * @param low_watermark supplies the buffer low watermark size threshold, in bytes. | ||
| * @param high_watermark supplies the buffer high watermark size threshold, in bytes. | ||
| */ | ||
| virtual void setWatermarks(uint32_t low_watermark, uint32_t high_watermark) PURE; |
There was a problem hiding this comment.
Fairly sure we don't need this as part of the public API. The setWatermarks 2 arg method is on the way out, would it be helpful to get it removed everywhere? Probably as part of a separate PR.
There was a problem hiding this comment.
It was needed as it broke some tests. It doesn't seem like this is the right PR to also remove that older interface.
I'll create another PR removing it.
| pending_recv_data_.setWatermarks(low_watermark, high_watermark); | ||
| pending_send_data_.setWatermarks(low_watermark, high_watermark); | ||
| pending_recv_data_->setWatermarks(low_watermark, high_watermark); | ||
| pending_send_data_->setWatermarks(low_watermark, high_watermark); |
There was a problem hiding this comment.
This should really transition to the 1 argument version of setWatermarks. There's only 1 call to setWriteBufferWatermarks and it uses buffer_limit / 2, buffer_limit as arguments.
There was a problem hiding this comment.
Will do removing of setWatermark(low,high) in another PR as mentioned above.
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
KBaichoo
left a comment
There was a problem hiding this comment.
Thanks for the review @antoniovicente
include/envoy/buffer/buffer.h
Outdated
| * @param low_watermark supplies the buffer low watermark size threshold, in bytes. | ||
| * @param high_watermark supplies the buffer high watermark size threshold, in bytes. | ||
| */ | ||
| virtual void setWatermarks(uint32_t low_watermark, uint32_t high_watermark) PURE; |
There was a problem hiding this comment.
It was needed as it broke some tests. It doesn't seem like this is the right PR to also remove that older interface.
I'll create another PR removing it.
| pending_recv_data_.setWatermarks(low_watermark, high_watermark); | ||
| pending_send_data_.setWatermarks(low_watermark, high_watermark); | ||
| pending_recv_data_->setWatermarks(low_watermark, high_watermark); | ||
| pending_send_data_->setWatermarks(low_watermark, high_watermark); |
There was a problem hiding this comment.
Will do removing of setWatermark(low,high) in another PR as mentioned above.
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
antoniovicente
left a comment
There was a problem hiding this comment.
What is left before this PR can be consider no longer WIP?
| Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; } | ||
| void clearRouteCache() override {} | ||
| uint64_t streamId() const override { return stream_id_; } | ||
| // TODO(kbaichoo): Implement this? |
There was a problem hiding this comment.
Could you change the TODO to something like: Plumb account from owning request filter.
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
…upstream Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
|
The only thing that was keeping it still WIP is lack of a runtime guard, which is now included. This is disabled by default since only tests use the new features right now. |
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
|
Retrying Azure Pipelines: |
|
/retest |
|
Retrying Azure Pipelines: |
| response_encoder.getStream().setAccount(downstream_request_account); | ||
| } | ||
| ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), | ||
| std::move(downstream_request_account))); |
There was a problem hiding this comment.
Thinking a bit about the future:
It would be useful to have a way to go from Account to ActiveStream. Possible strawperson:
class BufferMemoryAccount {
+ // Methods to set and clear the account owner.
+ virtual void setOwner(AccountOwner& owner) PURE;
+ virtual void clearOwner() PURE;
};
class AccountOwner {
public:
// Reset the stream owned by this account.
// Better method names may be possible. Reason argument seems optional since we expect this
// to be called only when under memory pressure.
void resetStream(Reason reason) PURE;
// Dump debug information about the stream.
void dumpState(std::ostream& os) PURE;
};
The account interface could wrap the reset and dump methods so they are no-ops if the account has no owner, or have method to get an optional reference to the owner that the per-worker overload trackers can use to run those operations on the relevant owner objects.
There was a problem hiding this comment.
Yes, we'll need something like this later down the line, and will address this then.
| // Wait for all requests to buffered the response from upstream. | ||
| if (streamBufferAccounting()) { | ||
| EXPECT_TRUE( | ||
| buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) |
There was a problem hiding this comment.
It would be nice to pass the expected body sizes and requests to waitForExpectedAccountBalanceWithTimeout instead of requiring a separate call to setExpectedAccountBalance
There was a problem hiding this comment.
Not sure that sending requests should really be the job of the buffer_factory. That seems a bit strange.
There was a problem hiding this comment.
This comment is about the wait with timeout API provided by the buffer factory.
There was a problem hiding this comment.
Perhaps we're miscommunicating.
I see the issue as follows:
- The test thread has to send the requests
- The test thread also has to set these expectations and wait on them
If the test thread runs (1) followed by (2), there's a possibility of a race causing the wait to timeout even though accounts are correctly sized due to the accounts notifying if expectations are met in the worker when updating accounts. i.e. if all accounts are updated, and we then set the expectation it won't fire since none of them have any additional events.
So instead this is split into
- Send request
- Set expectation
- Wait for expectation
And we can do 2, 1, and 3. Alternatively we can push those into a single function, as what I think you'd suggest would require, but it seems strange for the buffer factory to be involved with 2.
There was a problem hiding this comment.
Yeah, I wasn't understanding why you saw the need to mix expectations, sending requests and waiting for expectations. Hopefully the info below will make things more clear:
We have ways to query the factory about the state of buffers created by it. For example, TrackedWatermarkBufferFactory::maxBufferSize
That method is implemented by storing some information about the buffer in an atlernate data structure kept by the factory which is updated from the check watermark overrides. It seems to me that your extensions to the tracked buffer factory to also gather information about accounts can make some basic assumptions about check watermark functions being called after changes to buffers which result in updates to the number of bytes tracked by the accounts.
Therefore, it is possible to keep a map from account to account size which is updated on each call to check watermark functions which could be used to get the expected size of each account under a lock. Given that map with a mirrored version of the account sizes, you can implement methods to get info about the size of accounts which are implemented using mechanisms similar to those used in the implementation of TrackedWatermarkBufferFactory::maxBufferSize and TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds (via Mutex::AwaitWithTimeout, see https://github.com/envoyproxy/envoy/pull/14714/files#diff-9f19f8978f392143007cd837f851e2674b7b61227f71d6f907a0e60a72cd0b4b)
Using this technique would allow sending requests in the normal way and implement a version of wait for expectation that takes the expectations as function arguments.
There was a problem hiding this comment.
Duplicating the internal information of the account doesn't seem right -- seems like a leaky abstraction.
I also prefer using the notification as IIRC AwaitWithTimeout doesn't have a signaling mechanism while the notification does, awaking blocked threads.
There was a problem hiding this comment.
AwaitWithTimeout checks every time that the mutex is released. But there's a requirement that updates happen when the mutex is held.
There was a problem hiding this comment.
Anyway, I think we can start with this test framework API and improve it in the future as needed.
|
|
||
| bool TrackedWatermarkBufferFactory::waitForExpectedAccountBalanceWithTimeout( | ||
| std::chrono::milliseconds timeout) { | ||
| return expected_balances_met_.WaitForNotificationWithTimeout(absl::FromChrono(timeout)); |
There was a problem hiding this comment.
A potential drawback of the Notification approach vs mutex_.AwaitWithTimeout is that the notification approach can only be used once per test. Should the mutex_.AwaitWithTimeout approach be used here instead? See also comments about passing in expectations to waitForExpectedAccountBalanceWithTimeout instead of using a separate method to set expectations.
There was a problem hiding this comment.
I remember I explicitly choose to use Notification over mutex_.AwaitWithTimeout. It's been a while, but I think my original reasoning is that only the worker thread should do accumulations and notify the test thread when the condition is met rather than try to have the test thread access the account balances which might change in the worker thread.
For now Notification has what we need.
| } | ||
|
|
||
| removeDanglingAccounts(); | ||
| if (account_infos_.size() < expected_balances_->num_accounts_) { |
There was a problem hiding this comment.
Should this be an exact comparison? Or should we allow for there to be extra accounts as long as at least num_accounts_ have balance_per_account_ in them?
Also, should we extend these concepts to allow waiting for accounts to be drained? I guess waitUntilExpectedNumberOfAccountsAndBoundBuffers with 0 accounts could be used to tell when all requests are complete.
There was a problem hiding this comment.
Done.
I think using the existing mechanism that most test do (if the responses are completed at the endpoint) is sufficient.
| while (accounts_it != account_infos_.end()) { | ||
| auto next = std::next(accounts_it); | ||
|
|
||
| // Remove all "dangling" accounts. |
There was a problem hiding this comment.
Should you consider accounts with no associated buffers as dangling instead of looking at the ref count?
There was a problem hiding this comment.
Consider the cases where slices point to accounts while all the associated buffers no longer exist. Checking the ref count tells us that no other entity should have access to the account.
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
…upstream Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
KBaichoo
left a comment
There was a problem hiding this comment.
Thanks for the review @antoniovicente .
| response_encoder.getStream().setAccount(downstream_request_account); | ||
| } | ||
| ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(), | ||
| std::move(downstream_request_account))); |
There was a problem hiding this comment.
Yes, we'll need something like this later down the line, and will address this then.
| while (accounts_it != account_infos_.end()) { | ||
| auto next = std::next(accounts_it); | ||
|
|
||
| // Remove all "dangling" accounts. |
There was a problem hiding this comment.
Consider the cases where slices point to accounts while all the associated buffers no longer exist. Checking the ref count tells us that no other entity should have access to the account.
|
|
||
| bool TrackedWatermarkBufferFactory::waitForExpectedAccountBalanceWithTimeout( | ||
| std::chrono::milliseconds timeout) { | ||
| return expected_balances_met_.WaitForNotificationWithTimeout(absl::FromChrono(timeout)); |
There was a problem hiding this comment.
I remember I explicitly choose to use Notification over mutex_.AwaitWithTimeout. It's been a while, but I think my original reasoning is that only the worker thread should do accumulations and notify the test thread when the condition is met rather than try to have the test thread access the account balances which might change in the worker thread.
For now Notification has what we need.
| } | ||
|
|
||
| removeDanglingAccounts(); | ||
| if (account_infos_.size() < expected_balances_->num_accounts_) { |
There was a problem hiding this comment.
Done.
I think using the existing mechanism that most test do (if the responses are completed at the endpoint) is sufficient.
| // Wait for all requests to buffered the response from upstream. | ||
| if (streamBufferAccounting()) { | ||
| EXPECT_TRUE( | ||
| buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) |
There was a problem hiding this comment.
Not sure that sending requests should really be the job of the buffer_factory. That seems a bit strange.
|
/retest |
|
Retrying Azure Pipelines: |
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
| // Wait for all requests to buffered the response from upstream. | ||
| if (streamBufferAccounting()) { | ||
| EXPECT_TRUE( | ||
| buffer_factory_->waitForExpectedAccountBalanceWithTimeout(TestUtility::DefaultTimeout)) |
There was a problem hiding this comment.
Yeah, I wasn't understanding why you saw the need to mix expectations, sending requests and waiting for expectations. Hopefully the info below will make things more clear:
We have ways to query the factory about the state of buffers created by it. For example, TrackedWatermarkBufferFactory::maxBufferSize
That method is implemented by storing some information about the buffer in an atlernate data structure kept by the factory which is updated from the check watermark overrides. It seems to me that your extensions to the tracked buffer factory to also gather information about accounts can make some basic assumptions about check watermark functions being called after changes to buffers which result in updates to the number of bytes tracked by the accounts.
Therefore, it is possible to keep a map from account to account size which is updated on each call to check watermark functions which could be used to get the expected size of each account under a lock. Given that map with a mirrored version of the account sizes, you can implement methods to get info about the size of accounts which are implemented using mechanisms similar to those used in the implementation of TrackedWatermarkBufferFactory::maxBufferSize and TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds (via Mutex::AwaitWithTimeout, see https://github.com/envoyproxy/envoy/pull/14714/files#diff-9f19f8978f392143007cd837f851e2674b7b61227f71d6f907a0e60a72cd0b4b)
Using this technique would allow sending requests in the normal way and implement a version of wait for expectation that takes the expectations as function arguments.
| main_tid = server.api().threadFactory().currentThreadId(); | ||
|
|
||
| slot->runOnAllThreads( | ||
| [&main_tid, &server, &func, this](OptRef<ThreadLocal::ThreadLocalObject>) { |
There was a problem hiding this comment.
nit: drop the & for main_tid so the tid is captured by value.
| ThreadLocal::TypedSlotPtr<> slot; | ||
| Envoy::Thread::ThreadId main_tid; | ||
|
|
||
| server.dispatcher().post([&] { |
There was a problem hiding this comment.
This seems more complicated than I expected. Wondering if there's a way to simplify by capturing dispatchers associated with threads where accounts are created.
test/per_file_coverage.sh
Outdated
| "source/common/thread:0.0" # Death tests don't report LCOV | ||
| "source/common/matcher:93.3" | ||
| "source/common/quic:87.8" | ||
| "source/common/quic:87.7" |
There was a problem hiding this comment.
This change shouldn't be needed if you merge upstream/main again.
There was a problem hiding this comment.
Sorry, didn't see the change (75aecf2) that got accepted yesterday. Needed this prior.
…upstream Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
antoniovicente
left a comment
There was a problem hiding this comment.
Thanks for the implementing accounting for H2. Off to senior maintainers for a final review.
/assign-from @envoyproxy/senior-maintainers
|
@envoyproxy/senior-maintainers assignee is @lizan |
|
/retest |
|
Retrying Azure Pipelines: |
|
friendly ping @lizan PTAL |
…ry. (#17093) This PR tracks memory accounts using >1MB of allocated space, with feedback mechanisms based on credits and debits on accounts. It further creates the handle from which the BufferMemoryAccount can reset the stream, and has the WatermarkBufferFactory also produce the particular BufferMemoryAccountImpl used for tracking. Risk Level: Medium Testing: Unit and Integration test Docs Changes: NA Release Notes: NA -- not yet user facing Platform Specific Features: NA Runtime guard: Yes, envoy.test_only.per_stream_buffer_accounting from #16218 sufficient Related Issue #15791 Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Per-request buffer accounting is still a work in progress so behavior changes are protected by 'envoy.test_only.per_stream_buffer_accounting' which is disabled by default. Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
…ry. (envoyproxy#17093) This PR tracks memory accounts using >1MB of allocated space, with feedback mechanisms based on credits and debits on accounts. It further creates the handle from which the BufferMemoryAccount can reset the stream, and has the WatermarkBufferFactory also produce the particular BufferMemoryAccountImpl used for tracking. Risk Level: Medium Testing: Unit and Integration test Docs Changes: NA Release Notes: NA -- not yet user facing Platform Specific Features: NA Runtime guard: Yes, envoy.test_only.per_stream_buffer_accounting from envoyproxy#16218 sufficient Related Issue envoyproxy#15791 Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
Signed-off-by: Kevin Baichoo kbaichoo@google.com
For an explanation of how to fill out the fields, please see the relevant section
in PULL_REQUESTS.md
Commit Message: Implement H2 stream level buffer accounting.
Additional Description: This is runtime guarded, disabled by default since only tests use the new features provided. As this gets developed it'll become enabled by default.
Risk Level: Medium
Testing: Included unit test / integration tests
Docs Changes: N/A
Release Notes: N/A (not end-user accessible yet)
Platform Specific Features: N/A
Runtime guard: Yes, envoy.test_only.per_stream_buffer_accounting
Related Issue #15791