Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 75 additions & 22 deletions fs/src/io_uring/sequential_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,28 +290,33 @@ impl<'a> SequentialFileReader<'a> {
state.current_buf_len = 0;
state.left_to_consume = 0;

// Reclaim current and all subsequent unread buffers of removed file as uninitialized.
let sentinel_buf_index = state
.files
.front()
.and_then(|f| f.start_buf_index)
.unwrap_or(state.current_buf_index);
let num_bufs = self.ring.context().len();
loop {
self.ring.process_completions()?;
let current_buf = self.ring.context_mut().get_mut(state.current_buf_index);
if current_buf.is_reading() {
// Still no data, wait for more completions, but submit in case there are queued
// entries in the submission queue.
self.ring.submit()?;
continue;
}
current_buf.transition_to_uninit();
if removed_file.had_scheduled_reads() {
// Reclaim current and all subsequent unread buffers of removed file as uninitialized.
// This includes all buffers until sentinel index, which is:
// * an index used for next scheduled read (if any file has some scheduled)
// * otherwise `state.next_read_buf_index` (default buffer index to start read from)
let sentinel_buf_index = state
.files
.iter()
.find_map(|f| f.start_buf_index)
.unwrap_or(state.next_read_buf_index);
let num_bufs = self.ring.context().len();
loop {
self.ring.process_completions()?;
let current_buf = self.ring.context_mut().get_mut(state.current_buf_index);
if current_buf.is_reading() {
// Still no data, wait for more completions, but submit in case there are queued
// entries in the submission queue.
self.ring.submit()?;
continue;
}
current_buf.transition_to_uninit();

let next_buf_index = (state.current_buf_index + 1) % num_bufs;
state.current_buf_index = next_buf_index;
if sentinel_buf_index == next_buf_index {
break;
let next_buf_index = (state.current_buf_index + 1) % num_bufs;
state.current_buf_index = next_buf_index;
if sentinel_buf_index == next_buf_index {
break;
}
}
}

Expand Down Expand Up @@ -351,7 +356,12 @@ impl<'a> SequentialFileReader<'a> {
}

fn wait_current_buf_full(&mut self) -> io::Result<bool> {
if self.state.files.is_empty() {
if self
.state
.files
.front()
.is_none_or(|file| !file.had_scheduled_reads())
{
return Ok(false);
}
let num_bufs = self.ring.context().len();
Expand Down Expand Up @@ -614,6 +624,10 @@ impl FileState {
self.raw_fd == file.as_raw_fd()
}

fn had_scheduled_reads(&self) -> bool {
self.start_buf_index.is_some()
}

/// Create a new read operation into the `bufs[index]` buffer and update file state.
///
/// This is called whenever new reads can be scheduled (on added file or freed buffer).
Expand Down Expand Up @@ -884,6 +898,11 @@ mod tests {
}
}

#[test]
fn test_reading_empty_file() {
check_reading_file(0, 4096, 1024, false);
}

/// Test with buffer larger than the whole file
#[test]
fn test_reading_small_file() {
Expand Down Expand Up @@ -951,6 +970,7 @@ mod tests {

#[test]
fn test_direct_io_read() {
check_reading_file(0, 4096, 4096, true);
check_reading_file(2_500, 4096, 4096, true);
check_reading_file(2_500, 16384, 4096, true);
check_reading_file(25_000, 4096, 4096, true);
Expand Down Expand Up @@ -1094,4 +1114,37 @@ mod tests {
reader.set_file(temp2.as_file(), 4).unwrap();
assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]);
}

#[test]
fn test_multiple_files_including_zero_limit() {
let mut temp1 = NamedTempFile::new().unwrap();
io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc]).unwrap();
let mut temp2 = NamedTempFile::new().unwrap();
io::Write::write_all(&mut temp2, &[0xd, 0xe, 0xf, 0x10]).unwrap();

let mut reader = SequentialFileReaderBuilder::new()
.read_capacity(512)
.build(1024)
.unwrap();

reader.add_file_to_prefetch(temp1.as_file(), 3).unwrap();
reader.add_file_to_prefetch(temp2.as_file(), 0).unwrap();
reader.add_file_to_prefetch(temp1.as_file(), 10).unwrap();

assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]);

reader.move_to_next_file().unwrap();
assert_eq!(read_as_vec(&mut reader), vec![]);

reader.move_to_next_file().unwrap();
assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]);

reader.add_file_to_prefetch(temp1.as_file(), 0).unwrap();
reader.move_to_next_file().unwrap();
assert_eq!(read_as_vec(&mut reader), vec![]);

reader.add_file_to_prefetch(temp2.as_file(), 4).unwrap();
reader.move_to_next_file().unwrap();
assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]);
}
}
Loading