Support reading sequence of multiple files with a single read-ahead io_uring file reader#6878
Support reading sequence of multiple files with a single read-ahead io_uring file reader#6878kskalski wants to merge 1 commit into
Conversation
8d349ee to
70a6d6d
Compare
3788f14 to
cb9715b
Compare
cb9715b to
e922afd
Compare
9478b4d to
e9b48fd
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #6878 +/- ##
========================================
Coverage 83.1% 83.1%
========================================
Files 810 810
Lines 357414 357960 +546
========================================
+ Hits 297133 297651 +518
- Misses 60281 60309 +28 🚀 New features to boost your workflow:
|
| // the lifetime of the operation | ||
| self.inner.push(op)?; | ||
| /// It is required that the previous file is fully read before calling this method. | ||
| pub fn move_to_next_file(&mut self) -> io::Result<()> { |
There was a problem hiding this comment.
Perhaps we could implement Iterator and turn this method into its next implementation? Then we could iterate over SequentialFileReader, which I think would be a nice API. If such iterator was yielding some wrapper type, which implements Read, we could do something like:
let mut reader = SequentialFileReader::with_buffer(vec![0; 1024], 512).unwrap();
reader.add_file(temp1.as_file(), 2).unwrap();
reader.add_file(temp2.as_file(), 3).unwrap();
reader.add_file(temp1.as_file(), 4).unwrap();
reader.add_file(temp2.as_file(), 5).unwrap();
for reader in reader {
let reader = reader.unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).unwrap();
[...]
}The current API requires calling move_to_next_file manually and kinda forces developers to be explicit about how many files are there, which could be annoying.
If implementing Iterator is too hard or impossible for some reasons I'm overseeing, perhaps we could add some method like len() or remaining(), so we can still write a loop?
let remaining_files = reader.remaining();
for _ in 0..remaining_files {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).unwrap();
[...]
reader.move_to_next_file();
)There was a problem hiding this comment.
Hm, I think I would find iterator here to be a bit confusing, since on first sight it's not clear if it should iterate over bytes or files.
As of now I don't foresee using move_to_next_file directly (maybe it should be made private), as accounts_db has the files stored in layered data structure and scan methods iterate over "storages" not files. Because of that the planned use is mostly through set_file, which "ensures that specified file is active == at the front of the queue".
So the plan is to get storages in chunks, add them to read-ahead queue (through add_file) and then let scan control whenever it wants to move to specific file (as long as it moves in the same order as read-ahead order, the simple approach imeplemented here will work).
Btw, it is in fact possible to read_to_end, which is provided by Read trait, because we still stop and return 0 from read (or &[] from fill_buf) when we reach the current file's end. Reading from new files starts only after explicit move_to_next_file (or set_file). Finally, in practice the reader might not actually read until the end of file and we need to support moving to next file for this scenario too.
There was a problem hiding this comment.
I rebased on top of the actual definition for FileBufRead that is now used in
agave/accounts-db/src/append_vec.rs
Line 1051 in 5e4c0cb
| inner: Ring<SequentialFileReaderState, ReadOp>, | ||
| owned_files: VecDeque<File>, |
There was a problem hiding this comment.
I'm struggling with understanding what is the purpose of this queue.
SequentialFileReaderState uses the raw file descriptors, added through add_file_by_fd.
Then the only usage of owned_files I see is in move_to_next_file, where we first get a file descriptor from the state first:
let Some(mut file_state) = state.files.pop_front() else {
return Ok(());
};to then compare that file descriptor with the owned file from the queue:
if self
.owned_files
.front()
.is_some_and(|f| file_state.is_same_file(f))
{
self.owned_files.pop_front();
}What's the point of this comparison? I was wondering whether it's some kind of integrity check, but if this statement isn't true, nothing happens.
There was a problem hiding this comment.
The implementation supports adding owned files (add_path, add_file_owned - this is the only way used for now) and/or file references (add_file / set_file), which ensures file is not closed while it is being read in background while supporting encapsulating API (e.g. you can have fn new_reader(path) -> SequentialFileReader)
What's the point of this comparison? I was wondering whether it's some kind of integrity check, but if this statement isn't true, nothing happens.
That code basically allows mixing the two ways of adding files, the assumption is that when owned file is being read (front of state.files), it is also at front of self.owned_files, so when moving on to next file, we should move both queues.
e9b48fd to
65953e8
Compare
|
I think this code is now close to be usable in accounts-db scans - I already have a branch that plugs it in, the final way to do that might involve a small generalization of API defined here ( @brooksprumo let me know if you can do a high level look at this or include others to review - unfortunately this PR rewrites a big part of existing impl, but I still don't feel like it deserves a separate mod / struct, since it would duplicate a lot of code. |
I need to do another deeper pass. My initial thought was that the interaction of adding files and moving to the next file felt strange. Like you'd add a file, but it had to already be the next one already in the list to work on. Or you can the same file again too, as long as it was next. I think I need to spend more time looking at the actual uses since I believe I have some details wrong. |
| /// Add `file` reference to read. Starts reading the file as soon as a buffer is available. | ||
| /// | ||
| /// The read finishes when EOF is reached or `read_limit` bytes are read. | ||
| /// Multiple files can be added to the reader and they will be read-ahead in FIFO order. | ||
| /// | ||
| /// Lifetime of reference is tied to the reader's lifetime. | ||
| #[allow(unused)] | ||
| pub fn add_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { | ||
| self.add_file_by_fd(file.as_raw_fd(), read_limit) | ||
| } |
There was a problem hiding this comment.
For example it seems unsafe to add a file here without the actual File getting added to self.owned_files. I see it is used, so the #[allow(unused)] is strange. Maybe this fn is fine, just not as a public method.
There was a problem hiding this comment.
In this PR this function is only used in tests.
The intended usage is:
- do a chunk of
add_file(s) - then repeat the same sequence (or shorter, it works as FIFO) doing
set_file,read(s) - repeat
Clearly there are many possible weird uses of those functions, though I think no sequence will result in any error (i.e. set_file takes an absolute priority, it will discard any added files if the sequence is not right), only the above mentioned one is really useful.
Ok, I think the names of those functions could be changed to:
- add_readahead_file
- activate_file
The safety of add_file operation relies on 'a lifetime, i.e. there is some guarantee that we can use passed FD while our own object is alive, though in theory someone could close the file / FD, but that is forbidden in the doc comment.
There was a problem hiding this comment.
adding file reference or owned files is conceptually very similar, but in our codebase it is / will be used in a disjoint way:
- we create a single reader with owned snapshot archive file, then we just read that single file until the end
- we create a reader for accounts files, add readahead file references, then activate and read one by one
|
Changed names to |
10aea1f to
7c419f0
Compare
brooksprumo
left a comment
There was a problem hiding this comment.
I've done a few passes over the code, but I think I need another person who can do a proper review of sequential_file_reader.rs.
| // the lifetime of the operation | ||
| self.inner.push(op)?; | ||
| /// Lifetime of reference is tied to the reader's lifetime. | ||
| #[allow(unused)] |
There was a problem hiding this comment.
nit: Since this method is currently only used by tests, let's update the annotation.
| #[allow(unused)] | |
| #[cfg(test)] |
Or if this is only intended to ever be called by tests, let's move the method into the tests submodule.
There was a problem hiding this comment.
I take back previous comment - it is actually used in a trait function activate_file as a fallback when file specified for activation wasn't ever added to prefetch, which is a valid situation (this is how regular BufferedReader is used)
|
Quick update on this project / PR - I'm iterating on top of this code to tune performance and beat current master's approach. The changes I have still don't affect implementation here too much:
So I plan to make above independently for merge into master or after this PR is merged, but if anyone has preference to review a state closer to final version, I can do a series of commits in this PR. |
I'd prefer to review the final state here |
| let new_indices = storages.take_up_to_capacity(&mut chunk); | ||
| let new_files = | ||
| chunk.range(new_indices).filter_map(|s| s.accounts.file()); | ||
| reader.add_files_to_prefetch(new_files).unwrap(); |
There was a problem hiding this comment.
Please no bare unwraps without a SAFETY comment indicating why this can never fail.
There was a problem hiding this comment.
changed to expect, similarly as the code below this function panics on scan error instead of returning error up the stack
| /// Return the `File` and size of the underlying `AppendVec` account file. | ||
| pub fn file(&self) -> Option<(&File, usize)> { | ||
| match self { | ||
| Self::AppendVec(av) => Some((av.file(), av.len())), |
There was a problem hiding this comment.
Note that .len() is not the size of the file. If you want the file size, use .capacity().
| Self::AppendVec(av) => Some((av.file(), av.len())), | |
| Self::AppendVec(av) => Some((av.file(), av.capacity())), |
| match self.backing { | ||
| AppendVecFileBacking::File(ref file) => file, | ||
| AppendVecFileBacking::Mmap(_) => { | ||
| panic!("Memory-backed AppendVec does not have a file") |
There was a problem hiding this comment.
The append vec here is not memory backed; there is an underlying file, and we could get it if needed. Do we want to do that? I dunno. Note that RPC providers are still using Mmap file backing in v2.3, so we need to ensure they don't panic.
| panic!("Memory-backed AppendVec does not have a file") | |
| panic!("Memory-mapped AppendVec does not have a File") |
There was a problem hiding this comment.
I was considering an alternative APIs to avoid such pitfalls, add a function
add_file_prefetch_to_reader(&self, reader: impl FileBufRead)
but that will require a different approach for submitting IOops to kernel (probably a separate fn trigger_prefetch() in FileBufRead)
I think getting a file is cleaner, but possibly I could always make it return an Option
| const READ_SIZE: usize = 512 * 1024; | ||
| // scan accounts implementations will submit operations to kernel using | ||
| // FileBufRead::add_files_to_prefetch - just make sure queue size can hold all buffers. | ||
| const RING_QSIZE: u32 = (SCAN_ACCOUNTS_BUFFER_SIZE / READ_SIZE) as u32; |
There was a problem hiding this comment.
Looks like this truncates. I think we should either (1) assert remainder is zero, or (2) round up.
| const RING_QSIZE: u32 = (SCAN_ACCOUNTS_BUFFER_SIZE / READ_SIZE) as u32; | |
| const RING_QSIZE: u32 = SCAN_ACCOUNTS_BUFFER_SIZE.div_ceil(READ_SIZE) as u32; |
| pub fn file(&self) -> Option<(&File, usize)> { | ||
| match self { | ||
| Self::AppendVec(av) => Some((av.file(), av.len())), | ||
| Self::TieredStorage(_) => None, |
There was a problem hiding this comment.
TieredStorage does have an underlying file, so I don't love the None here. Maybe we say unimplemented!() and remove the Option from the return type? (If going that route, need to doc comments indicating as much.)
There was a problem hiding this comment.
yeah, hard to say, as mentioned in a other comment - we could go into all Option / all unimplemented!() / a completely different API to prefetch - seems like the easiest will be unimplemented and no Options, but I want to be sure this would early fail when somebody uses that APIs in a wrong way as opposed to late runtime fail.
There was a problem hiding this comment.
I reverted back to using Option and renamed this accessor to indicate we want to fetch file_io information - the returned information is used for prefetch that is only applicable to file-io and we need to filter out storages that are not based on file-io, which we may encounter here when storage_access is set to mmap (with the current logic of AppendVec's "reopen as readonly" we may actually be running with a mix of mmap and file-io AppendVecs when storage access is set to mmap, but reopening always returns file-io)
|
I'm done with optimizations and updating APIs to let the scan do prefetching. Typical percentage of CPU used for hashing (as opposed to buffer ops and syscalls) is now 97%, will post tomorrow some numbers comparing wall time gains relative to master. Seems like the majority of review will fall on @alessandrod who prefers to look at the whole solution in one PR, so I included now all the code that uses new prefetch APIs and how I changed lt hash verification scan. @brooksprumo this actually brings a bunch of changes here, that are more specific to accounts-db code, this way at least there are no more "unused" blocks. Thanks for quick pass. |
|
I discovered a small regression in snapshot unpacking after removal of sqpoll from reader, so I tuned ring options for tar archive reader and file creator - they come with explanation in code comments. This seems to be fixed now, though I'm a bit puzzled, since all my tests indicate that mixing reads and writes on the same ring kernel workers is slowing down the writes... (a separate surprise being that without sqpoll both rings created in the same user thread (solTarUnpack) will share the same kernel worker pool). |
86dd01f to
1070a3b
Compare
|
@brooksprumo - rebased to resolve conflict and tweaked back accounts-db scan code to better support all storage access configs (since we still need to allow running with mmap mode) |
uh surely we can still aim for 3.1? |
1070a3b to
50edc58
Compare
Could be! I got confused - master is marked as 3.1, but there isn't a 3.1 tag yet, so we are now composing the solution that will go out then. In the meantime Brooks consolidated all start-up scans into single pass, I will rebase and update numbers once his change is in. |
5c0ab57 to
62d9650
Compare
ecdab21 to
e8ce318
Compare


Problem
accounts_db::io_uring::SequentialFileReadersupports async read-ahead reads from a specified file, but there are use-cases (accounts storage scan) where we need to read a sequence of many (often small) files.Creating reader for each file separately would involve:
This complexity should be hidden behind separate wrapper or embedded in
SequentialFileReaderSummary of Changes
move_to_next_filefunction that allows transitioningBufReadto next fileset_fileto ensure head file == given filePerformance change
Measuring startup lt hash verification (
calculate_accounts_lt_hash_at_startup_from_storages) - there is 14-15% of speedup in hashing rate / thread (i.e. walltime of the whole lt hash calculation time / number of user threads enabled), e.g.