-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: parallel write buffer writing for large object disk cache #619
Conversation
Signed-off-by: MrCroxx <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #619 +/- ##
==========================================
+ Coverage 81.70% 81.77% +0.06%
==========================================
Files 66 67 +1
Lines 8425 8522 +97
==========================================
+ Hits 6884 6969 +85
- Misses 1541 1553 +12 ☔ View full report in Codecov by Sentry. |
foyer-storage/src/large/batch.rs
Outdated
.map(|group| { | ||
// TODO(MrCroxx): Refine to logic. | ||
// Do not filter empty group here. | ||
// An empty group can be used to trigger makring evictable region in flusher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo makring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sharper sight than typos
🥵.
Signed-off-by: MrCroxx <[email protected]>
|
||
rx: mpsc::UnboundedReceiver<Submission<K, V, S>>, | ||
notify: Arc<Notify>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace Notify
with CondationVariable
later, for the implementation of Notify
also includes a mutex.
let mut indices = group.indices; | ||
for haddr in indices.iter_mut() { | ||
haddr.address.region = region.id(); | ||
} | ||
}; | ||
try_join(try_join_all(futures), future).await?; | ||
if let Some(init_time) = batch.init_time.as_ref() { | ||
metrics.storage_queue_rotate.increment(1); | ||
metrics.storage_queue_rotate_duration.record(init_time.elapsed()); | ||
indexer.insert_batch(indices); | ||
for tx in group.txs { | ||
let _ = tx.send(Ok(true)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move notification to a dedicated thread to increase the throughput.
Signed-off-by: MrCroxx <[email protected]>
What's changed and what's your intention?
This PR refined the write model of the large object disk cache.
With the previous design, entries submitted to the same flusher will be serialized in the flusher runner thread (tokio task), whose throughput is limited by the 1 CPU core. With the new design, every entry is serialized in its submit thread, and the serialization between entries can be paralleled.
The parallelism is achieved by allocating with a mutex and waiting with
WaitGroup
#615.Checklist
make all
(ormake fast
instead if the old tests are not modified) in my local environment.Related issues or PRs (optional)
#596 #615
close #581