new token bucket impl#6893
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #6893 +/- ##
=========================================
- Coverage 83.1% 83.1% -0.1%
=========================================
Files 815 816 +1
Lines 358629 358922 +293
=========================================
+ Hits 298071 298309 +238
- Misses 60558 60613 +55 🚀 New features to boost your workflow:
|
aa91a89 to
58d47a2
Compare
23878f2 to
2a8b918
Compare
| #[allow(clippy::arithmetic_side_effects)] | ||
| fn maybe_shrink(&self) { | ||
| let mut actual_len = 0; | ||
| let target_shard_size = self.target_capacity / self.data.shards().len(); |
There was a problem hiding this comment.
What if self.target_capacity < shards?
There was a problem hiding this comment.
We should document it and assert probably.
There was a problem hiding this comment.
This is a good catch thank you! target_size becomes zero and data structure wipes all records every time :( Will patch.
1634d8c to
85dcc78
Compare
| @@ -0,0 +1,106 @@ | |||
| #![allow(clippy::arithmetic_side_effects)] | |||
There was a problem hiding this comment.
Why haven't you used some benchmarking framework? And what are the results of the current benchmarking?
There was a problem hiding this comment.
Benching frameworks are not well suited here since the bench is multithreaded, and requires peculiar setup to run. Running it in a loop 10000 times does not really show meaningful perf since you need thread contention.
Here are results:
Running bench_token_bucket...
Run complete over 5 seconds
Accepted 16667, Rejected: 39887821
processed 39904488 requests, 7980897.5 per second
==========
Running bench_token_bucket_eviction...
Run complete over 5 seconds
Max observed size was 406
processed 17113044 requests, 3422608.8 per second
Rejected: 95951
==========
Running bench_keyed_rate_limiter...
Run complete over 5 seconds
Accepted: 1024000 (target 1024000)
Rejected: 37008846
processed 38032846 requests, 7606569.5 per second
TL;DR we can process about 7 M requests per second per bucket, the KeyedRateLimiter may slow things down if there is a lot of churn to 3M requests per second.
There was a problem hiding this comment.
Sounds much more than we ever need
There was a problem hiding this comment.
Well we'd want real code to do things other than token buckets but I do not know how to make this substantially faster, I'm quite certain we are close to hitting HW limits here.
| } | ||
|
|
||
| impl Clone for TokenBucket { | ||
| fn clone(&self) -> Self { |
There was a problem hiding this comment.
This clone looks a bit suspicions to me because it is a deep copy while I typically expect that atomics are copied by Arc-style. Do we really need it somewhere?
There was a problem hiding this comment.
Yes, this is used in the KeyedRateLimiter to clone buckets. It is generally nice to have around so you can mass-produce them from a prototype of some kind. There is no state in them that would become invalidated if we access atomics one at a time, so no reason not to implement clone.
| self.update_state(now); | ||
| match self | ||
| .tokens | ||
| .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |tokens| { |
There was a problem hiding this comment.
SeqCst -- although it is the safest option, it might be less performant in comparison to Release/Acquire combination. Have you seen any difference?
There was a problem hiding this comment.
Yeah I was hunting a nasty concurrency bug here for a while, now I have found it and dropped to AcqRel and Acquire where applicable.
85dcc78 to
ec8f5be
Compare
|
@vadorovsky I believe I have finally tracked down the concurrency bug that was eating my brain here, so should be good to go. Now the system that credits tokens is super braindead and relies on simple CAS logic to gate which thread credits for which time interval. |
KirillLykov
left a comment
There was a problem hiding this comment.
My main concern here was atomics ordering, if @vadorovsky double checks that it is ok, to me looks good now.
Adds TokenBucket and KeyedRateLimiter to replace governor crate and in general allow for better control over rate-limiting options
b348cbb to
38a8a62
Compare
Added shuttle test based on @vadorovsky 's design, the coast is clear=) |
| /// depositing new tokens (if appropriate) | ||
| fn update_state(&self, now: u64) { | ||
| // fetch last update time | ||
| let last = self.last_update.load(Ordering::SeqCst); |
There was a problem hiding this comment.
I think Ordering::Acquire would be sufficient here. But SeqCst is not incorrect. In case you don't see any perf difference, I guess it's fine to leave it as it is.
| .fetch_add(time_to_return, Ordering::Relaxed); | ||
| } | ||
| Err(_) => { | ||
| // Another thread advanced last_update first → nothing we can do now. |
There was a problem hiding this comment.
I'm wondering if we should actually do something in that case. There is a slight chance that the current thread's now is still higher than the new value set by an another thread. To handle that case, we could retry the update.
fn update_state(&self, now: u64) {
// WE MAKE IT `mut`
let mut last = self.last_update.load(Ordering::SeqCst);
// If time has not advanced, nothing to do.
while now > last {
match self.last_update.compare_exchange(
last,
now,
Ordering::AcqRel, // winner publishes new timestamp
Ordering::Acquire, // loser observes updates
) {
Ok(_) => {
// success case...
}
// THE DIFFERENCE: If CAS failed, we retry with the new value.
Err(last_update) => {
last = current;
}
}
}
}There was a problem hiding this comment.
Not having a loop here was a deliberate choice, it reduces the time spent per request, which is what we want way more than accuracy (since this code gets called per packet, and we have on the order of millions of packets).
In the current version we will just mint tokens on the next call, which is good enough for intended use, since probability that whatever other thread did not consume is enough to mint many tokens is low.
gregcusack
left a comment
There was a problem hiding this comment.
can you explain more why we need this new token bucket please? As we've previously discussed it sounds like we use the governor crate but that is bloated and has some bugs. i think one of those bugs was letting in too much traffic. how much of a problem is this? I am just wary of reimplementing something from scratch especially in a core part of the validator. seems like we need a lot of testing for this. shuttle_test_token_bucket_race is great!
| // much of the testing is impossible outside of real multithreading in release mode. | ||
| impl TokenBucket { | ||
| /// Allocate a new TokenBucket | ||
| pub fn new(initial_tokens: u64, max_tokens: u64, new_tokens_per_second: f64) -> Self { |
There was a problem hiding this comment.
can we switch the floating point math to fixed point? over time it looks like these small rouding errors could add up and create some inconsistent behavior.
There was a problem hiding this comment.
Over a billion requests these would add up to a few milliseconds, it is not really a concern. And switching to fixed point would not eliminate them, just reduce them a bit (since we would still have finite precision). And the code would get really ugly (I have tried it already, it becomes very hard to follow). Perf difference is non-existent.
|
I would like to see some data on the difference makes, like the variation of input requests and the requests passed through given some limiting configuration over period of time between this and governor. |
I have done benchmarks before and Governor compares as follows: so atomic token bucket is better perf and a fair bit more accurate. The benches for TokenBucket also ensure that it is limiting consistently over each 100ms interval and not just over long time interval. |
Problem
Summary of Changes