diff --git a/Cargo.lock b/Cargo.lock index c6c1f4df2b6a..7efc4a0b0a93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,7 +5158,7 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9" dependencies = [ "futures", "once_cell", @@ -5714,7 +5714,7 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9" dependencies = [ "io-uring", "libc", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 76dc6a576458..a83bf833a11c 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -83,7 +83,6 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true -# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25 #tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" } tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 38305e9f4e8c..8c9294bb9981 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -39,6 +39,8 @@ pub enum BlockLease<'a> { EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), + #[cfg(test)] + Vec(Vec), } impl From> for BlockLease<'static> { @@ -63,6 +65,13 @@ impl<'a> Deref for BlockLease<'a> { BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] BlockLease::Arc(v) => v.deref(), + #[cfg(test)] + BlockLease::Vec(v) => { + let v: &Vec = v; + assert_eq!(v.len(), PAGE_SZ, "caller must ensure that v has PAGE_SZ"); + // Safety: see above assertion. + unsafe { &*(v.as_ptr() as *const [u8; PAGE_SZ]) } + } } } } @@ -176,7 +185,7 @@ impl FileBlockReader { ) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file - .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64) .await } /// Read a block. diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 79741fe54960..7bea49639873 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -91,7 +91,7 @@ impl EphemeralFile { page_cache::ReadBufResult::NotFound(write_guard) => { let write_guard = self .file - .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) + .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5042aa50612e..21ee1806eb8a 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -16,11 +16,12 @@ use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; +use tokio_epoll_uring::IoBufMut; use utils::fs_ext; /// @@ -105,7 +106,38 @@ struct SlotInner { tag: u64, /// the underlying file - file: Option, + file: Option, +} + +/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. +struct PageWriteGuardBuf { + page: PageWriteGuard<'static>, + init_up_to: usize, +} +// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, +// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. +unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.page.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.page.len() + } +} +// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access, +// hence it's safe to hand out the `stable_mut_ptr()`. +unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.page.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.page.len()); + self.init_up_to = pos; + } } impl OpenFiles { @@ -273,6 +305,10 @@ macro_rules! with_file { let $ident = $this.lock_file().await?; observe_duration!($op, $($body)*) }}; + ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{ + let mut $ident = $this.lock_file().await?; + observe_duration!($op, $($body)*) + }}; } impl VirtualFile { @@ -291,61 +327,6 @@ impl VirtualFile { Self::open_with_options_async(path, options).await } - /// Open a file with given options. - /// - /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, - /// they will be applied also when the file is subsequently re-opened, not only - /// on the first time. Make sure that's sane! - #[cfg(test)] - pub async fn open_with_options( - path: &Utf8Path, - open_options: &OpenOptions, - ) -> Result { - let path_str = path.to_string(); - let parts = path_str.split('/').collect::>(); - let tenant_id; - let timeline_id; - if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME { - tenant_id = parts[parts.len() - 4].to_string(); - timeline_id = parts[parts.len() - 2].to_string(); - } else { - tenant_id = "*".to_string(); - timeline_id = "*".to_string(); - } - let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; - - // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case - // where our caller doesn't get to use the returned VirtualFile before its - // slot gets re-used by someone else. - let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?; - - // Strip all options other than read and write. - // - // It would perhaps be nicer to check just for the read and write flags - // explicitly, but OpenOptions doesn't contain any functions to read flags, - // only to set them. - let mut reopen_options = open_options.clone(); - reopen_options.create(false); - reopen_options.create_new(false); - reopen_options.truncate(false); - - let vfile = VirtualFile { - handle: RwLock::new(handle), - pos: 0, - path: path.to_path_buf(), - open_options: reopen_options, - tenant_id, - timeline_id, - }; - - // TODO: Under pressure, it's likely the slot will get re-used and - // the underlying file closed before they get around to using it. - // => https://github.com/neondatabase/neon/issues/6065 - slot_guard.file.replace(file); - - Ok(vfile) - } - /// Writes a file to the specified `final_path` in a crash safe fasion /// /// The file is first written to the specified tmp_path, and in a second @@ -424,7 +405,6 @@ impl VirtualFile { std::io::Error::new(std::io::ErrorKind::Other, system) } })?; - let file = File::from(file); file })); @@ -452,15 +432,13 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file| file - .as_ref() - .sync_all()) + with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard + .with_std_file(|std_file| std_file.sync_all())) } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file| file - .as_ref() - .metadata()) + with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard + .with_std_file(|std_file| std_file.metadata())) } /// Helper function internal to `VirtualFile` that looks up the underlying File, @@ -469,7 +447,7 @@ impl VirtualFile { /// /// We are doing it via a macro as Rust doesn't support async closures that /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + async fn lock_file(&self) -> Result { let open_files = get_open_files(); let mut handle_guard = { @@ -526,7 +504,6 @@ impl VirtualFile { std::io::Error::new(std::io::ErrorKind::Other, system) } })?; - let file = File::from(file); file }); @@ -553,9 +530,8 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |file| file - .as_ref() - .seek(SeekFrom::End(offset)))? + self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard + .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -575,66 +551,47 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at( - &self, - page: PageWriteGuard<'static>, - offset: u64, - ) -> Result, Error> { - with_file!(self, StorageIoOperation::Read, |file_guard| { - self.read_exact_at0(file_guard, page, offset).await - }) + pub async fn read_exact_at(&self, buf: B, mut offset: u64) -> Result + where + B: IoBufMut + Send, + { + use tokio_epoll_uring::BoundedBuf; + let mut buf: tokio_epoll_uring::Slice = buf.slice_full(); + while buf.bytes_total() != 0 { + let res; + (buf, res) = self.read_at(buf, offset).await; + match res { + Ok(0) => break, + Ok(n) => { + buf = buf.slice(n..); + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + if !buf.is_empty() { + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } else { + Ok(buf.into_inner()) + } } - async fn read_exact_at0( + + /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. + pub async fn read_exact_at_page( &self, - file_guard: FileGuard<'static>, - write_guard: PageWriteGuard<'static>, + page: PageWriteGuard<'static>, offset: u64, ) -> Result, Error> { - let system = tokio_epoll_uring::thread_local_system().await; - struct PageWriteGuardBuf { - buf: PageWriteGuard<'static>, - init_up_to: usize, - } - unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { - fn stable_ptr(&self) -> *const u8 { - self.buf.as_ptr() - } - fn bytes_init(&self) -> usize { - self.init_up_to - } - fn bytes_total(&self) -> usize { - self.buf.len() - } - } - unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.buf.as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - assert!(pos <= self.buf.len()); - self.init_up_to = pos; - } - } let buf = PageWriteGuardBuf { - buf: write_guard, + page, init_up_to: 0, }; - let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; - let guard = scopeguard::guard(file_guard, |_| { - panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") - }); - let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; - let _ = OwnedFd::into_raw_fd(owned_fd); - let _ = scopeguard::ScopeGuard::into_inner(guard); - let PageWriteGuardBuf { - buf: write_guard, - init_up_to, - } = buf; - if let Ok(num_read) = res { - assert!(init_up_to == num_read); // TODO need to deal with short reads here - } - res.map(|_| write_guard) + let res = self.read_exact_at(buf, offset).await; + res.map(|PageWriteGuardBuf { page, .. }| page) .map_err(|e| Error::new(ErrorKind::Other, e)) } @@ -685,11 +642,47 @@ impl VirtualFile { Ok(n) } + pub(crate) async fn read_at(&self, buf: B, offset: u64) -> (B, Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + let file_guard = match self.lock_file().await { + Ok(file_guard) => file_guard, + Err(e) => return (buf, Err(e)), + }; + let (buf, result) = observe_duration!(StorageIoOperation::Read, { + self.read_at0(file_guard, buf, offset).await + }); + (buf, result) + } + async fn read_at0( + &self, + file_guard: FileGuard, + buf: B, + offset: u64, + ) -> (B, Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + let system = tokio_epoll_uring::thread_local_system().await; + let ((_file_guard, buf), res) = system.read(file_guard, offset, buf).await; + if let Ok(size) = res { + // TODO: don't use with_label_values on hot path + // https://github.com/neondatabase/neon/issues/6107 + STORAGE_IO_SIZE + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + (buf, res.map_err(|e| Error::new(ErrorKind::Other, e))) + } + async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, |file| file - .as_ref() - .write_at(buf, offset)); + let result = with_file!(self, StorageIoOperation::Write, |file_guard| { + file_guard.with_std_file(|std_file| std_file.write_at(buf, offset)) + }); if let Ok(size) = result { + // TODO: don't use with_label_values on hot path + // https://github.com/neondatabase/neon/issues/6107 STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) .add(size as i64); @@ -698,18 +691,54 @@ impl VirtualFile { } } -struct FileGuard<'a> { - slot_guard: RwLockReadGuard<'a, SlotInner>, +struct FileGuard { + slot_guard: RwLockReadGuard<'static, SlotInner>, } -impl<'a> AsRef for FileGuard<'a> { - fn as_ref(&self) -> &File { +impl AsRef for FileGuard { + fn as_ref(&self) -> &OwnedFd { // This unwrap is safe because we only create `FileGuard`s // if we know that the file is Some. self.slot_guard.file.as_ref().unwrap() } } +impl FileGuard { + // TODO: switch to tokio-epoll-uring native operations. + fn with_std_file(&self, with: F) -> R + where + F: FnOnce(&File) -> R, + { + // SAFETY: + // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. + // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut` + let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; + let res = with(&file); + let _ = file.into_raw_fd(); + res + } + // TODO: switch to tokio-epoll-uring native operations. + fn with_std_file_mut(&mut self, with: F) -> R + where + F: FnOnce(&mut File) -> R, + { + // SAFETY: + // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. + // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd + let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; + let res = with(&mut file); + let _ = file.into_raw_fd(); + res + } +} + +impl tokio_epoll_uring::IoFd for FileGuard { + unsafe fn as_fd(&self) -> RawFd { + let owned_fd: &OwnedFd = self.as_ref(); + owned_fd.as_raw_fd() + } +} + #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( @@ -717,16 +746,19 @@ impl VirtualFile { blknum: u32, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let mut buf = [0; PAGE_SZ]; - self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64)) + let buf = vec![0; PAGE_SZ]; + let buf = self + .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64)) .await?; - Ok(std::sync::Arc::new(buf).into()) + Ok(crate::tenant::block_io::BlockLease::Vec(buf)) } async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + let mut tmp = vec![0; 128]; loop { - let mut tmp = [0; 128]; - match self.read_at(&mut tmp, self.pos).await { + let res; + (tmp, res) = self.read_at(tmp, self.pos).await; + match res { Ok(0) => return Ok(()), Ok(n) => { self.pos += n as u64; @@ -850,10 +882,10 @@ mod tests { } impl MaybeVirtualFile { - async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + async fn read_exact_at(&self, mut buf: Vec, offset: u64) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, - MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), + MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { @@ -895,14 +927,15 @@ mod tests { // Helper function to slurp a portion of a file into a string async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { - let mut buf = vec![0; len]; - self.read_exact_at(&mut buf, pos).await?; + let buf = vec![0; len]; + let buf = self.read_exact_at(buf, pos).await?; Ok(String::from_utf8(buf).unwrap()) } } #[tokio::test] - async fn test_virtual_files() -> Result<(), Error> { + async fn test_virtual_files() -> anyhow::Result<()> { + crate::tenant::harness::setup_logging(); // The real work is done in the test_files() helper function. This // allows us to run the same set of tests against a native File, and // VirtualFile. We trust the native Files and wouldn't need to test them, @@ -911,23 +944,35 @@ mod tests { // native files, you will run out of file descriptors if the ulimit // is low enough.) test_files("virtual_files", |path, open_options| async move { - let vf = VirtualFile::open_with_options(&path, &open_options).await?; + let vf = VirtualFile::open_with_options_async(&path, open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) .await } #[tokio::test] - async fn test_physical_files() -> Result<(), Error> { + async fn test_physical_files() -> anyhow::Result<()> { test_files("physical_files", |path, open_options| async move { - Ok(MaybeVirtualFile::File(open_options.open(path)?)) + Ok(MaybeVirtualFile::File({ + let system = tokio_epoll_uring::thread_local_system().await; + let owned_fd = system + .open(path, &open_options) + .await + .map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + })?; + File::from(owned_fd) + })) }) .await } - async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + async fn test_files(testname: &str, openfunc: OF) -> anyhow::Result<()> where - OF: Fn(Utf8PathBuf, OpenOptions) -> FT, + OF: Fn(Utf8PathBuf, tokio_epoll_uring::ops::open_at::OpenOptions) -> FT, FT: Future>, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); @@ -936,7 +981,7 @@ mod tests { let path_a = testdir.join("file_a"); let mut file_a = openfunc( path_a.clone(), - OpenOptions::new() + tokio_epoll_uring::ops::open_at::OpenOptions::new() .write(true) .create(true) .truncate(true) @@ -949,7 +994,13 @@ mod tests { let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; + let mut file_a = openfunc( + path_a, + tokio_epoll_uring::ops::open_at::OpenOptions::new() + .read(true) + .to_owned(), + ) + .await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar").await.unwrap_err(); @@ -986,7 +1037,7 @@ mod tests { let path_b = testdir.join("file_b"); let mut file_b = openfunc( path_b.clone(), - OpenOptions::new() + tokio_epoll_uring::ops::open_at::OpenOptions::new() .read(true) .write(true) .create(true) @@ -1007,8 +1058,13 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = - openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; + let mut vfile = openfunc( + path_b.clone(), + tokio_epoll_uring::ops::open_at::OpenOptions::new() + .read(true) + .to_owned(), + ) + .await?; assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -1053,8 +1109,12 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true)) - .await?; + let f = VirtualFile::open_with_options_async(&test_file_path, { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true); + options + }) + .await?; files.push(f); } let files = Arc::new(files); @@ -1069,11 +1129,11 @@ mod tests { for _threadno in 0..THREADS { let files = files.clone(); let hdl = rt.spawn(async move { - let mut buf = [0u8; SIZE]; + let mut buf = vec![0u8; SIZE]; let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).await.unwrap(); + buf = f.read_exact_at(buf, 0).await.unwrap(); assert!(buf == SAMPLE); } });