-
Notifications
You must be signed in to change notification settings - Fork 1k
Use io_uring for creating files when unpacking snapshot #6671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c5d9d25
23b8dff
9ebb4ea
1d394ca
be68b2c
62f5f58
612df06
34766a2
f92a79f
f070e54
d02184a
1ac790c
2ca1479
4521781
72791e5
438e2cf
4754f94
ace219b
3d81986
2d26add
b4b9178
9130fc3
be62804
959cc7e
4e876bc
31cfa40
16ef1cc
118bed8
bc60d7c
07c0045
1aa77de
52e7e73
26f342a
8aa107e
ac6618c
cff6383
c957160
ece5288
c46bf43
1007d2d
8f70a55
5eb267f
b1b3355
b981293
a8c705f
a6de647
b4cad55
ae2b062
17ae987
d34d19b
196bd8f
dec2263
9cb5aa4
9445866
dc14171
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,11 @@ | ||
| //! File i/o helper functions. | ||
| use std::{fs::File, ops::Range}; | ||
| use std::{ | ||
| fs::{File, OpenOptions}, | ||
| io::{self, BufWriter, Write}, | ||
| ops::Range, | ||
| path::PathBuf, | ||
| sync::Arc, | ||
| }; | ||
|
|
||
| /// `buffer` contains `valid_bytes` of data at its end. | ||
| /// Move those valid bytes to the beginning of `buffer`, then read from `offset` to fill the rest of `buffer`. | ||
|
|
@@ -83,10 +89,105 @@ pub fn read_into_buffer( | |
| Ok(total_bytes_read) | ||
| } | ||
|
|
||
| /// An asynchronous queue for file creation. | ||
| pub trait FileCreator { | ||
| /// Schedule creating a file at `path` with `mode` permissions and bytes read from `contents`. | ||
| /// | ||
| /// `parent_dir_handle` is assumed to be a parent directory of `path` such that file may be | ||
| /// created using optimized kernel API to create `path.file_name()` inside `parent_dir_handle`. | ||
| fn schedule_create_at_dir( | ||
| &mut self, | ||
| path: PathBuf, | ||
| mode: u32, | ||
| parent_dir_handle: Arc<File>, | ||
| contents: &mut dyn io::Read, | ||
| ) -> io::Result<()>; | ||
|
|
||
| /// Invoke implementation specific logic to handle file creation completion. | ||
| fn file_complete(&mut self, path: PathBuf); | ||
|
|
||
| /// Waits for all operations to be completed | ||
| fn drain(&mut self) -> io::Result<()>; | ||
| } | ||
|
|
||
| pub fn file_creator<'a>( | ||
| buf_size: usize, | ||
| file_complete: impl FnMut(PathBuf) + 'a, | ||
| ) -> io::Result<Box<dyn FileCreator + 'a>> { | ||
| #[cfg(target_os = "linux")] | ||
| if agave_io_uring::io_uring_supported() { | ||
|
alessandrod marked this conversation as resolved.
Outdated
|
||
| use crate::io_uring::file_creator::IoUringFileCreator; | ||
|
|
||
| let io_uring_creator = IoUringFileCreator::with_buffer_capacity(buf_size, file_complete)?; | ||
| return Ok(Box::new(io_uring_creator)); | ||
|
Comment on lines
121
to
122
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much of the other io_uring impl fall back to the slow/backup impl if the io_uring construction fails. I see we're not doing that here. Not saying this is wrong, just calling it out as different. Is this the correct/desired behavior?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, this is basically the direction that alessandro insisted on, and I agree this the right call at this point:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the reply. I'm on board with this approach. |
||
| } | ||
| Ok(Box::new(SyncIoFileCreator::new(buf_size, file_complete))) | ||
| } | ||
|
|
||
| pub struct SyncIoFileCreator<'a> { | ||
| file_complete: Box<dyn FnMut(PathBuf) + 'a>, | ||
| } | ||
|
|
||
| impl<'a> SyncIoFileCreator<'a> { | ||
| fn new(_buf_size: usize, file_complete: impl FnMut(PathBuf) + 'a) -> Self { | ||
| Self { | ||
| file_complete: Box::new(file_complete), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(not(unix))] | ||
| pub(super) fn set_file_readonly(path: &std::path::Path, readonly: bool) -> io::Result<()> { | ||
| let mut perm = std::fs::metadata(path)?.permissions(); | ||
| perm.set_readonly(readonly); | ||
| std::fs::set_permissions(path, perm) | ||
| } | ||
|
|
||
| impl FileCreator for SyncIoFileCreator<'_> { | ||
| fn schedule_create_at_dir( | ||
| &mut self, | ||
| path: PathBuf, | ||
| mode: u32, | ||
| _parent_dir_handle: Arc<File>, | ||
| contents: &mut dyn io::Read, | ||
| ) -> io::Result<()> { | ||
| // Open for writing (also allows overwrite) and apply `mode` | ||
| let mut options = OpenOptions::new(); | ||
| options.create(true).truncate(true).write(true); | ||
|
|
||
| #[cfg(unix)] | ||
| std::os::unix::fs::OpenOptionsExt::mode(&mut options, mode); | ||
|
|
||
| let mut file_buf = BufWriter::new(options.open(&path)?); | ||
| io::copy(contents, &mut file_buf)?; | ||
| file_buf.flush()?; | ||
|
|
||
| #[cfg(not(unix))] | ||
| set_file_readonly(&path, mode & 0o200 == 0)?; | ||
|
|
||
| self.file_complete(path); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn file_complete(&mut self, path: PathBuf) { | ||
| (self.file_complete)(path) | ||
| } | ||
|
|
||
| fn drain(&mut self) -> io::Result<()> { | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
|
|
||
| use {super::*, std::io::Write, tempfile::tempfile}; | ||
| use { | ||
| super::*, | ||
| std::{ | ||
| fs, | ||
| io::{Cursor, Write}, | ||
| }, | ||
| tempfile::tempfile, | ||
| }; | ||
|
|
||
| #[test] | ||
| fn test_read_into_buffer() { | ||
|
|
@@ -193,4 +294,68 @@ mod tests { | |
| bytes[start_offset..file_size] | ||
| ); | ||
| } | ||
|
|
||
| fn read_file_to_string(path: &PathBuf) -> String { | ||
| String::from_utf8(fs::read(path).expect("Failed to read file")) | ||
| .expect("Failed to decode file contents") | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_create_writes_contents() -> io::Result<()> { | ||
| let temp_dir = tempfile::tempdir()?; | ||
| let file_path = temp_dir.path().join("test.txt"); | ||
| let contents = "Hello, world!"; | ||
|
|
||
| // Shared state to capture callback invocations | ||
| let mut callback_invoked_path = None; | ||
|
|
||
| // Instantiate FileCreator | ||
| let mut creator = file_creator(2 << 20, |path| { | ||
| callback_invoked_path.replace(path); | ||
| })?; | ||
|
|
||
| let dir = Arc::new(File::open(temp_dir.path())?); | ||
| creator.schedule_create_at_dir( | ||
| file_path.clone(), | ||
| 0o644, | ||
| dir, | ||
| &mut Cursor::new(contents), | ||
| )?; | ||
| creator.drain()?; | ||
| drop(creator); | ||
|
|
||
| assert_eq!(read_file_to_string(&file_path), contents); | ||
| assert_eq!(callback_invoked_path, Some(file_path)); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_multiple_file_creations() -> io::Result<()> { | ||
| let temp_dir = tempfile::tempdir()?; | ||
| let mut callback_counter = 0; | ||
|
|
||
| let mut creator = file_creator(2 << 20, |path: PathBuf| { | ||
| let contents = read_file_to_string(&path); | ||
| assert!(contents.starts_with("File ")); | ||
| callback_counter += 1; | ||
| })?; | ||
|
|
||
| let dir = Arc::new(File::open(temp_dir.path())?); | ||
| for i in 0..5 { | ||
| let file_path = temp_dir.path().join(format!("file_{i}.txt")); | ||
| let data = format!("File {i}"); | ||
| creator.schedule_create_at_dir( | ||
| file_path, | ||
| 0o600, | ||
| dir.clone(), | ||
| &mut Cursor::new(data), | ||
| )?; | ||
| } | ||
| creator.drain()?; | ||
| drop(creator); | ||
|
|
||
| assert_eq!(callback_counter, 5); | ||
| Ok(()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @willhickey