-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: reduce lock contention in distributor channels #10026
Conversation
Reduce lock contention in distributor channels via: - use atomic counters instead of "counter behind mutex" where appropriate - use less state - only lock when needed - move "wake" operation out of lock scopes (they are eventual operations anyways and many wake operations results in "futex wake" operations -- i.e. a syscall -- which you should avoid while holding the lock)
/benchmark |
Benchmark resultsBenchmarks comparing 5820507 (main) and b0f2504 (PR)
|
/benchmark |
Benchmark resultsBenchmarks comparing 03d8ba1 (main) and b0f2504 (PR)
|
Benchmarks are mostly noise, but locally in a server-like setting with multiple consumers and a limited number of tokio threads, I can totally see both a reduces lock contention (based on futex-waits) and higher performance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I wonder if benchmarks show a difference on higher cpu count (instead of GH runners)?
I will run them on my 8 core benchmark machine to see if I see a difference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thank you @crepererum and @Dandandan
I read the code carefully, and I think it looks good, though there is one area related to a potential race condition that I think would be worth reviewing
Also, given the trickiness of this code (with several inter-related mutexes) I think encapsulating the functionality into methods on Channel
/ ChannelState
would make the flow easier to follow and verify. However, this is a personal preference and I don't think it is required.
I'll post my performance results here when I have them
|
||
// does ANY receiver need data? | ||
// if so, allow sender to create another | ||
if this.gate.empty_channels.load(Ordering::SeqCst) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is accessing this field before taking the send_wakers a race condition?
I think the answer is no, as self.channel.state.lock()
is preventing empty_channels
from changing between this line and the guard
However, if this is the case wouldn't it be clearer to not use an atomic usize here and just hang the count on channel state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, if this is the case wouldn't it be clearer to not use an atomic usize here and just hang the count on channel state?
Reading this size should be cheap (=lockless). If you place that behind a mutex, all threads will have a single hot-spot critical section.
n_senders: 1, | ||
recv_alive: true, | ||
recv_wakers: Vec::default(), | ||
Arc::new(Channel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help me with readability / encapsulation to make this Channel::new()
or Channel::default()
so it is easier to follow the locking logic
let to_wake = guard_channel_state | ||
.recv_wakers | ||
.as_mut() | ||
.expect("not closed"); | ||
let mut tmp = Vec::with_capacity(to_wake.capacity()); | ||
std::mem::swap(to_wake, &mut tmp); | ||
tmp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be more readable if it was in a function like
guard_channel_state.take_wakers()
Or something - that would also give you a place to document what was happening more
if state | ||
.data | ||
.as_ref() | ||
.map(|data| data.is_empty()) | ||
.unwrap_or_default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we put this into ChannelState as a function with a more readable name? Like
if state.channel_gone() {
...
Benchmark results. The only one that looks potentially problematic is clickbench 28 -- I'll rerun and see if I can reproduce the results
|
🤔 it seems like q28 is really quite a bit slower
|
We need to investigate the performance regression
I've pushed another change that makes the code prettier and also fixes a potential "Time-of-check to time-of-use" race condition. With that, I disagree on benchmark results:
Or in other words: I think it's noise. You may wonder why my perf results are so bad: I'm testing on a Laptop but locked the frequency down to 1.5GHz to avoid thermal throttling, otherwise the numbers would be all over the place (and would also depend on previous runs, uptime and other factors too much). |
Thanks @crepererum -- I am rerunning on my test machine (a VM on gcp) |
In case anyone is interested, here is q28: SELECT
REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k,
AVG(length("Referer")) AS l,
COUNT(*) AS c,
MIN("Referer")
FROM 'hits.parquet'
WHERE "Referer" <> ''
GROUP BY k
HAVING COUNT(*) > 100000
ORDER BY l DESC
LIMIT 25; My unscientific run on my laptop without control also showed no difference with the current ode However, when I ran on my 8 core (relatively old) GCP VM I still see the difference. I am now trying to reproduce standalone and will profile
|
Sorry for the delay, I spent time trying to reproduce performance difference in this PR Reproducer
cat q28.sql
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM 'hits.parquet' WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; TestsBuild datafusion-cli with cd datafusion-cli
cargo build --release
cp target/release/datafusion-cli ~/Downloads/datafusion-cli-distributor Also build it with merge-base
for i in `seq 1 5`; do ./datafusion-cli-distributor -f q28.sql | grep Elapsed ; done
Elapsed 6.576 seconds.
Elapsed 6.591 seconds.
Elapsed 6.499 seconds.
Elapsed 6.599 seconds.
Elapsed 6.515 seconds. for i in `seq 1 5`; do ./datafusion-cli-merge-base -f q28.sql | grep Elapsed ; done
Elapsed 6.756 seconds.
Elapsed 6.761 seconds.
Elapsed 6.633 seconds.
Elapsed 6.565 seconds.
Elapsed 6.565 seconds. If anything this looks better for the distributor branch. I'll see if I can reproduce the results on the gcp machine |
If it helps, I can rebase this PR as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-ran the tests manually and can't see any significant difference
alamb@aal-dev:~$ for i in `seq 1 5`; do ./datafusion-cli-merge-base -f q28.sql | grep Elapsed ; done
Elapsed 28.206 seconds.
Elapsed 27.259 seconds.
Elapsed 27.456 seconds.
Elapsed 27.371 seconds.
Elapsed 27.130 seconds.
alamb@aal-dev:~$ ln -s arrow-datafusion11/datafusion-cli/target/release/datafusion-cli ./datafusion-cli-distributor
ln: failed to create symbolic link './datafusion-cli-distributor': File exists
alamb@aal-dev:~$ for i in `seq 1 5`; do ./datafusion-cli-distributor -f q28.sql | grep Elapsed ; done
Elapsed 27.382 seconds.
Elapsed 27.145 seconds.
Elapsed 27.302 seconds.
Elapsed 27.031 seconds.
Elapsed 27.338 seconds.
Thus I think we should proceed with merging this PR and we can adjust it later if needed
I merged this branch up from main, and plan to merge it once the CI checks pass |
* fix: lock contention in distributor channels Reduce lock contention in distributor channels via: - use atomic counters instead of "counter behind mutex" where appropriate - use less state - only lock when needed - move "wake" operation out of lock scopes (they are eventual operations anyways and many wake operations results in "futex wake" operations -- i.e. a syscall -- which you should avoid while holding the lock) * refactor: add more docs and tests for distributor channels --------- Co-authored-by: Andrew Lamb <[email protected]>
Which issue does this PR close?
-
Rationale for this change
Reduce lock contention in distributor channels. Found while investigating runtime performance using
tokio2chrome
(eBPF version).What changes are included in this PR?
Are these changes tested?
Existing tests pass. Benchmark didn't detect a regression.
Are there any user-facing changes?
Faster DataFusion.