diff --git a/Cargo.toml b/Cargo.toml index 0f48b4536..677c64675 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,3 @@ [workspace] -members = [ - "examples", - "glommio", -] +members = ["examples", "glommio"] resolver = "2" diff --git a/glommio/Cargo.toml b/glommio/Cargo.toml index 8b6c381fc..c9332d054 100755 --- a/glommio/Cargo.toml +++ b/glommio/Cargo.toml @@ -4,7 +4,7 @@ version = "0.9.0" authors = [ "Glauber Costa ", "Hippolyte Barraud ", - "DataDog " + "DataDog ", ] edition = "2021" description = "Glommio is a thread-per-core crate that makes writing highly parallel asynchronous applications in a thread-per-core architecture easier for rustaceans." @@ -12,11 +12,25 @@ license = "Apache-2.0 OR MIT" repository = "https://github.com/DataDog/glommio" homepage = "https://github.com/DataDog/glommio" keywords = ["linux", "rust", "async", "iouring", "thread-per-core"] -categories = ["asynchronous", "concurrency", "os", "filesystem", "network-programming"] +categories = [ + "asynchronous", + "concurrency", + "os", + "filesystem", + "network-programming", +] readme = "../README.md" # This is also documented in the README.md under "Supported Rust Versions" rust-version = "1.70" +[features] +bench = [] +debugging = [] + +# Unstable features based on nightly +native-tls = [] +nightly = ["native-tls"] + [dependencies] ahash = "0.7" backtrace = { version = "0.3" } @@ -33,7 +47,16 @@ lazy_static = "1.4" libc = "0.2" lockfree = "0.5" log = "0.4" -nix = { version = "0.27", features = ["event", "fs", "ioctl", "mman", "net", "poll", "sched", "time"] } +nix = { version = "0.27", features = [ + "event", + "fs", + "ioctl", + "mman", + "net", + "poll", + "sched", + "time", +] } pin-project-lite = "0.2" rlimit = "0.6" scoped-tls = "1.0" @@ -45,26 +68,26 @@ socket2 = { version = "0.4", features = ["all"] } tracing = "0.1" typenum = "1.15" +[build-dependencies] +cc = "1.0" + [dev-dependencies] fastrand = "2" futures = "0" hdrhistogram = "7" pretty_env_logger = "0" rand = "0" -tokio = { version = "1", default-features = false, features = ["rt", "macros", "rt-multi-thread", "net", "io-util", "time", "sync"] } +tokio = { version = "1", default-features = false, features = [ + "rt", + "macros", + "rt-multi-thread", + "net", + "io-util", + "time", + "sync", +] } tracing-subscriber = { version = "0", features = ["env-filter"] } -[build-dependencies] -cc = "1.0" - -[features] -bench = [] -debugging = [] - -# Unstable features based on nightly -native-tls = [] -nightly = ["native-tls"] - [[bench]] name = "executor" harness = false diff --git a/glommio/src/channels/shared_channel.rs b/glommio/src/channels/shared_channel.rs index eae45d541..e1ba7cf32 100644 --- a/glommio/src/channels/shared_channel.rs +++ b/glommio/src/channels/shared_channel.rs @@ -36,7 +36,7 @@ type Result = crate::Result; /// [`ConnectedReceiver`]: struct.ConnectedReceiver.html /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html pub struct SharedReceiver { - state: Option>>, + state: Option>, } /// The `SharedSender` is the sending end of the Shared Channel. @@ -52,7 +52,7 @@ pub struct SharedReceiver { /// [`ConnectedSender`]: struct.ConnectedSender.html /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html pub struct SharedSender { - state: Option>>, + state: Option>, } impl fmt::Debug for SharedSender { @@ -79,7 +79,7 @@ unsafe impl Send for SharedSender {} /// The `ConnectedReceiver` is the receiving end of the Shared Channel. pub struct ConnectedReceiver { id: u64, - state: Rc>, + state: ReceiverState, reactor: Weak, notifier: Arc, } @@ -87,7 +87,7 @@ pub struct ConnectedReceiver { /// The `ConnectedSender` is the sending end of the Shared Channel. pub struct ConnectedSender { id: u64, - state: Rc>, + state: SenderState, reactor: Weak, notifier: Arc, } @@ -106,12 +106,28 @@ impl fmt::Debug for ConnectedSender { #[derive(Debug)] struct SenderState { - buffer: Producer, + buffer: Rc>, +} + +impl Clone for SenderState { + fn clone(&self) -> Self { + Self { + buffer: self.buffer.clone(), + } + } } #[derive(Debug)] struct ReceiverState { - buffer: Consumer, + buffer: Rc>, +} + +impl Clone for ReceiverState { + fn clone(&self) -> Self { + Self { + buffer: self.buffer.clone(), + } + } } struct Connector { @@ -150,10 +166,14 @@ pub fn new_bounded(size: usize) -> (SharedSender, SharedRece let (producer, consumer) = make(size); ( SharedSender { - state: Some(Rc::new(SenderState { buffer: producer })), + state: Some(SenderState { + buffer: Rc::new(producer), + }), }, SharedReceiver { - state: Some(Rc::new(ReceiverState { buffer: consumer })), + state: Some(ReceiverState { + buffer: Rc::new(consumer), + }), }, ) } diff --git a/glommio/src/channels/spsc_queue.rs b/glommio/src/channels/spsc_queue.rs index ca46df7c1..1f465ec22 100644 --- a/glommio/src/channels/spsc_queue.rs +++ b/glommio/src/channels/spsc_queue.rs @@ -3,6 +3,7 @@ use std::{ fmt, marker::PhantomData, mem::{self, MaybeUninit}, + rc::Rc, slice::from_raw_parts_mut, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -87,27 +88,11 @@ pub struct Consumer { pub(crate) buffer: Arc>, } -impl Clone for Consumer { - fn clone(&self) -> Self { - Consumer { - buffer: self.buffer.clone(), - } - } -} - /// A handle to the queue which allows adding values onto the buffer pub struct Producer { pub(crate) buffer: Arc>, } -impl Clone for Producer { - fn clone(&self) -> Self { - Producer { - buffer: self.buffer.clone(), - } - } -} - impl fmt::Debug for Consumer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Consumer {:?}", self.buffer) @@ -321,6 +306,23 @@ impl BufferHalf for Producer { } } +impl BufferHalf for Rc> { + type Item = T; + fn buffer(&self) -> &Buffer { + &self.buffer + } + + fn connect(&self, id: usize) { + assert_ne!(id, 0); + assert_ne!(id, usize::MAX); + self.buffer.ccache.producer_id.store(id, Ordering::Release); + } + + fn peer_id(&self) -> usize { + self.buffer.pcache.consumer_id.load(Ordering::Acquire) + } +} + impl Producer { /// Attempt to push a value onto the buffer. /// @@ -376,6 +378,23 @@ impl BufferHalf for Consumer { } } +impl BufferHalf for Rc> { + type Item = T; + fn buffer(&self) -> &Buffer { + &self.buffer + } + + fn connect(&self, id: usize) { + assert_ne!(id, usize::MAX); + assert_ne!(id, 0); + self.buffer.pcache.consumer_id.store(id, Ordering::Release); + } + + fn peer_id(&self) -> usize { + self.buffer.ccache.producer_id.load(Ordering::Acquire) + } +} + impl Consumer { /// Disconnects the consumer, signaling to the producer that no new values /// are going to be consumed. After this is done, any attempt on the diff --git a/glommio/src/executor/mod.rs b/glommio/src/executor/mod.rs index 5cc2d67ee..4615795c2 100644 --- a/glommio/src/executor/mod.rs +++ b/glommio/src/executor/mod.rs @@ -177,7 +177,7 @@ impl Ord for TaskQueue { impl PartialOrd for TaskQueue { fn partial_cmp(&self, other: &Self) -> Option { - Some(other.vruntime.cmp(&self.vruntime)) + Some(self.cmp(other)) } } diff --git a/glommio/src/free_list.rs b/glommio/src/free_list.rs index 73bd8b11e..f296ad555 100644 --- a/glommio/src/free_list.rs +++ b/glommio/src/free_list.rs @@ -72,7 +72,7 @@ impl FreeList { } pub(crate) fn dealloc(&mut self, idx: Idx) -> T { let slot = Slot::Free { - next_free: mem::replace(&mut self.first_free, Some(idx)), + next_free: self.first_free.replace(idx), }; match mem::replace(&mut self.slots[idx.to_raw()], slot) { Slot::Full { item } => item, diff --git a/glommio/src/io/bulk_io.rs b/glommio/src/io/bulk_io.rs index e86018dde..61b1f8ee1 100644 --- a/glommio/src/io/bulk_io.rs +++ b/glommio/src/io/bulk_io.rs @@ -13,7 +13,7 @@ use std::{ }; /// Set a limit to the size of merged IO requests. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum MergedBufferLimit { /// Disables request coalescing NoMerging, @@ -21,6 +21,7 @@ pub enum MergedBufferLimit { /// Sets the limit to the maximum the kernel allows for the underlying /// device without breaking down the request into smaller ones /// (/sys/block/.../queue/max_sectors_kb) + #[default] DeviceMaxSingleRequest, /// Sets a custom limit. @@ -29,15 +30,9 @@ pub enum MergedBufferLimit { Custom(usize), } -impl Default for MergedBufferLimit { - fn default() -> Self { - Self::DeviceMaxSingleRequest - } -} - /// Set a limit to the amount of read amplification in-between two mergeable IO /// requests. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum ReadAmplificationLimit { /// Deny read amplification. /// @@ -46,6 +41,7 @@ pub enum ReadAmplificationLimit { /// size. For instance, if the minimum IO size is 4KiB and the user reads /// [0..256] and [2048..2560] then the two will be merged into [0..4096] to /// accommodate the 4KiB minimum IO size. + #[default] NoAmplification, /// Merge two consecutive IO requests if the read amplification is below a @@ -59,12 +55,6 @@ pub enum ReadAmplificationLimit { NoLimit, } -impl Default for ReadAmplificationLimit { - fn default() -> Self { - Self::NoAmplification - } -} - /// An interface to an IO vector. pub trait IoVec { /// The read position (the offset) in the file diff --git a/glommio/src/io/directory.rs b/glommio/src/io/directory.rs index 1cec4987c..46144634d 100644 --- a/glommio/src/io/directory.rs +++ b/glommio/src/io/directory.rs @@ -117,8 +117,11 @@ impl Directory { /// Returns an iterator to the contents of this directory pub fn sync_read_dir(&self) -> Result { let path = self.file.path_required("read directory")?; - enhanced_try!(std::fs::read_dir(&*path), "Reading a directory", self.file) - .map_err(Into::into) + Ok(enhanced_try!( + std::fs::read_dir(&*path), + "Reading a directory", + self.file + )?) } /// Issues fdatasync into the underlying file. diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index d0d48c0eb..69b563a93 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -382,7 +382,11 @@ impl DmaFile { pos, self.pollable, ); - enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into) + Ok(enhanced_try!( + source.collect_rw().await, + "Writing", + self.file + )?) } /// Equivalent to [`DmaFile::write_at`] except that the caller retains @@ -442,7 +446,11 @@ impl DmaFile { pos, self.pollable, ); - enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into) + Ok(enhanced_try!( + source.collect_rw().await, + "Writing", + self.file + )?) } /// Reads from a specific position in the file and returns the buffer. @@ -771,6 +779,7 @@ impl DmaFile { /// NOTE: Clones are allowed to exist on any thread and all share the same underlying /// fd safely. try_take_last_clone is also safe to invoke from any thread and will /// behave correctly with respect to clones on other threads. + #[allow(clippy::result_large_err)] pub fn try_take_last_clone(mut self) -> std::result::Result { match self.file.try_take_last_clone() { Ok(took) => { diff --git a/glommio/src/io/glommio_file.rs b/glommio/src/io/glommio_file.rs index 6368ce5a4..c17feef7a 100644 --- a/glommio/src/io/glommio_file.rs +++ b/glommio/src/io/glommio_file.rs @@ -750,10 +750,10 @@ pub(crate) mod test { files }; - assert!(file_list().iter().any(|x| *x == gf_fd)); // sanity check that file is open + assert!(file_list().contains(&gf_fd)); // sanity check that file is open let _ = { gf }; // moves scope and drops sleep(Duration::from_millis(10)).await; // forces the reactor to run, which will drop the file - assert!(!file_list().iter().any(|x| *x == gf_fd)); // file is gone + assert!(!file_list().contains(&gf_fd)); // file is gone }); } }