Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[workspace]
members = [
"examples",
"glommio",
]
members = ["examples", "glommio"]
resolver = "2"
53 changes: 38 additions & 15 deletions glommio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,33 @@ version = "0.9.0"
authors = [
"Glauber Costa <glommer@gmail.com>",
"Hippolyte Barraud <hippolyte.barraud@datadoghq.com>",
"DataDog <info@datadoghq.com>"
"DataDog <info@datadoghq.com>",
]
edition = "2021"
description = "Glommio is a thread-per-core crate that makes writing highly parallel asynchronous applications in a thread-per-core architecture easier for rustaceans."
license = "Apache-2.0 OR MIT"
repository = "https://github.com/DataDog/glommio"
homepage = "https://github.com/DataDog/glommio"
keywords = ["linux", "rust", "async", "iouring", "thread-per-core"]
categories = ["asynchronous", "concurrency", "os", "filesystem", "network-programming"]
categories = [
"asynchronous",
"concurrency",
"os",
"filesystem",
"network-programming",
]
readme = "../README.md"
# This is also documented in the README.md under "Supported Rust Versions"
rust-version = "1.70"

[features]
bench = []
debugging = []

# Unstable features based on nightly
native-tls = []
nightly = ["native-tls"]

[dependencies]
ahash = "0.7"
backtrace = { version = "0.3" }
Expand All @@ -33,7 +47,16 @@ lazy_static = "1.4"
libc = "0.2"
lockfree = "0.5"
log = "0.4"
nix = { version = "0.27", features = ["event", "fs", "ioctl", "mman", "net", "poll", "sched", "time"] }
nix = { version = "0.27", features = [
"event",
"fs",
"ioctl",
"mman",
"net",
"poll",
"sched",
"time",
] }
pin-project-lite = "0.2"
rlimit = "0.6"
scoped-tls = "1.0"
Expand All @@ -45,26 +68,26 @@ socket2 = { version = "0.4", features = ["all"] }
tracing = "0.1"
typenum = "1.15"

[build-dependencies]
cc = "1.0"

[dev-dependencies]
fastrand = "2"
futures = "0"
hdrhistogram = "7"
pretty_env_logger = "0"
rand = "0"
tokio = { version = "1", default-features = false, features = ["rt", "macros", "rt-multi-thread", "net", "io-util", "time", "sync"] }
tokio = { version = "1", default-features = false, features = [
"rt",
"macros",
"rt-multi-thread",
"net",
"io-util",
"time",
"sync",
] }
tracing-subscriber = { version = "0", features = ["env-filter"] }

[build-dependencies]
cc = "1.0"

[features]
bench = []
debugging = []

# Unstable features based on nightly
native-tls = []
nightly = ["native-tls"]

[[bench]]
name = "executor"
harness = false
Expand Down
36 changes: 28 additions & 8 deletions glommio/src/channels/shared_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Result<T, V> = crate::Result<T, V>;
/// [`ConnectedReceiver`]: struct.ConnectedReceiver.html
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
pub struct SharedReceiver<T: Send + Sized> {
state: Option<Rc<ReceiverState<T>>>,
state: Option<ReceiverState<T>>,
}

/// The `SharedSender` is the sending end of the Shared Channel.
Expand All @@ -52,7 +52,7 @@ pub struct SharedReceiver<T: Send + Sized> {
/// [`ConnectedSender`]: struct.ConnectedSender.html
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
pub struct SharedSender<T: Send + Sized> {
state: Option<Rc<SenderState<T>>>,
state: Option<SenderState<T>>,
}

impl<T: Send + Sized> fmt::Debug for SharedSender<T> {
Expand All @@ -79,15 +79,15 @@ unsafe impl<T: Send + Sized> Send for SharedSender<T> {}
/// The `ConnectedReceiver` is the receiving end of the Shared Channel.
pub struct ConnectedReceiver<T: Send + Sized> {
id: u64,
state: Rc<ReceiverState<T>>,
state: ReceiverState<T>,
reactor: Weak<Reactor>,
notifier: Arc<SleepNotifier>,
}

/// The `ConnectedSender` is the sending end of the Shared Channel.
pub struct ConnectedSender<T: Send + Sized> {
id: u64,
state: Rc<SenderState<T>>,
state: SenderState<T>,
reactor: Weak<Reactor>,
notifier: Arc<SleepNotifier>,
}
Expand All @@ -106,12 +106,28 @@ impl<T: Send + Sized> fmt::Debug for ConnectedSender<T> {

#[derive(Debug)]
struct SenderState<V: Send + Sized> {
buffer: Producer<V>,
buffer: Rc<Producer<V>>,
}

impl<V: Send + Sized> Clone for SenderState<V> {
fn clone(&self) -> Self {
Self {
buffer: self.buffer.clone(),
}
}
}

#[derive(Debug)]
struct ReceiverState<V: Send + Sized> {
buffer: Consumer<V>,
buffer: Rc<Consumer<V>>,
}

impl<V: Send + Sized> Clone for ReceiverState<V> {
fn clone(&self) -> Self {
Self {
buffer: self.buffer.clone(),
}
}
}

struct Connector<T: BufferHalf + Clone> {
Expand Down Expand Up @@ -150,10 +166,14 @@ pub fn new_bounded<T: Send + Sized>(size: usize) -> (SharedSender<T>, SharedRece
let (producer, consumer) = make(size);
(
SharedSender {
state: Some(Rc::new(SenderState { buffer: producer })),
state: Some(SenderState {
buffer: Rc::new(producer),
}),
},
SharedReceiver {
state: Some(Rc::new(ReceiverState { buffer: consumer })),
state: Some(ReceiverState {
buffer: Rc::new(consumer),
}),
},
)
}
Expand Down
51 changes: 35 additions & 16 deletions glommio/src/channels/spsc_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
fmt,
marker::PhantomData,
mem::{self, MaybeUninit},
rc::Rc,
slice::from_raw_parts_mut,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Expand Down Expand Up @@ -87,27 +88,11 @@ pub struct Consumer<T> {
pub(crate) buffer: Arc<Buffer<T>>,
}

impl<T> Clone for Consumer<T> {
fn clone(&self) -> Self {
Consumer {
buffer: self.buffer.clone(),
}
}
}

/// A handle to the queue which allows adding values onto the buffer
pub struct Producer<T> {
pub(crate) buffer: Arc<Buffer<T>>,
}

impl<T> Clone for Producer<T> {
fn clone(&self) -> Self {
Producer {
buffer: self.buffer.clone(),
}
}
}

impl<T> fmt::Debug for Consumer<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Consumer {:?}", self.buffer)
Expand Down Expand Up @@ -321,6 +306,23 @@ impl<T> BufferHalf for Producer<T> {
}
}

impl<T> BufferHalf for Rc<Producer<T>> {
type Item = T;
fn buffer(&self) -> &Buffer<T> {
&self.buffer
}

fn connect(&self, id: usize) {
assert_ne!(id, 0);
assert_ne!(id, usize::MAX);
self.buffer.ccache.producer_id.store(id, Ordering::Release);
}

fn peer_id(&self) -> usize {
self.buffer.pcache.consumer_id.load(Ordering::Acquire)
}
}

impl<T> Producer<T> {
/// Attempt to push a value onto the buffer.
///
Expand Down Expand Up @@ -376,6 +378,23 @@ impl<T> BufferHalf for Consumer<T> {
}
}

impl<T> BufferHalf for Rc<Consumer<T>> {
type Item = T;
fn buffer(&self) -> &Buffer<T> {
&self.buffer
}

fn connect(&self, id: usize) {
assert_ne!(id, usize::MAX);
assert_ne!(id, 0);
self.buffer.pcache.consumer_id.store(id, Ordering::Release);
}

fn peer_id(&self) -> usize {
self.buffer.ccache.producer_id.load(Ordering::Acquire)
}
}

impl<T> Consumer<T> {
/// Disconnects the consumer, signaling to the producer that no new values
/// are going to be consumed. After this is done, any attempt on the
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl Ord for TaskQueue {

impl PartialOrd for TaskQueue {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(other.vruntime.cmp(&self.vruntime))
Some(self.cmp(other))
}
}

Expand Down
2 changes: 1 addition & 1 deletion glommio/src/free_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<T> FreeList<T> {
}
pub(crate) fn dealloc(&mut self, idx: Idx<T>) -> T {
let slot = Slot::Free {
next_free: mem::replace(&mut self.first_free, Some(idx)),
next_free: self.first_free.replace(idx),
};
match mem::replace(&mut self.slots[idx.to_raw()], slot) {
Slot::Full { item } => item,
Expand Down
18 changes: 4 additions & 14 deletions glommio/src/io/bulk_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ use std::{
};

/// Set a limit to the size of merged IO requests.
#[derive(Debug)]
#[derive(Debug, Default)]
pub enum MergedBufferLimit {
/// Disables request coalescing
NoMerging,

/// Sets the limit to the maximum the kernel allows for the underlying
/// device without breaking down the request into smaller ones
/// (/sys/block/.../queue/max_sectors_kb)
#[default]
DeviceMaxSingleRequest,

/// Sets a custom limit.
Expand All @@ -29,15 +30,9 @@ pub enum MergedBufferLimit {
Custom(usize),
}

impl Default for MergedBufferLimit {
fn default() -> Self {
Self::DeviceMaxSingleRequest
}
}

/// Set a limit to the amount of read amplification in-between two mergeable IO
/// requests.
#[derive(Debug)]
#[derive(Debug, Default)]
pub enum ReadAmplificationLimit {
/// Deny read amplification.
///
Expand All @@ -46,6 +41,7 @@ pub enum ReadAmplificationLimit {
/// size. For instance, if the minimum IO size is 4KiB and the user reads
/// [0..256] and [2048..2560] then the two will be merged into [0..4096] to
/// accommodate the 4KiB minimum IO size.
#[default]
NoAmplification,

/// Merge two consecutive IO requests if the read amplification is below a
Expand All @@ -59,12 +55,6 @@ pub enum ReadAmplificationLimit {
NoLimit,
}

impl Default for ReadAmplificationLimit {
fn default() -> Self {
Self::NoAmplification
}
}

/// An interface to an IO vector.
pub trait IoVec {
/// The read position (the offset) in the file
Expand Down
7 changes: 5 additions & 2 deletions glommio/src/io/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ impl Directory {
/// Returns an iterator to the contents of this directory
pub fn sync_read_dir(&self) -> Result<std::fs::ReadDir> {
let path = self.file.path_required("read directory")?;
enhanced_try!(std::fs::read_dir(&*path), "Reading a directory", self.file)
.map_err(Into::into)
Ok(enhanced_try!(
std::fs::read_dir(&*path),
"Reading a directory",
self.file
)?)
}

/// Issues fdatasync into the underlying file.
Expand Down
13 changes: 11 additions & 2 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,11 @@ impl DmaFile {
pos,
self.pollable,
);
enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into)
Ok(enhanced_try!(
source.collect_rw().await,
"Writing",
self.file
)?)
}

/// Equivalent to [`DmaFile::write_at`] except that the caller retains
Expand Down Expand Up @@ -442,7 +446,11 @@ impl DmaFile {
pos,
self.pollable,
);
enhanced_try!(source.collect_rw().await, "Writing", self.file).map_err(Into::into)
Ok(enhanced_try!(
source.collect_rw().await,
"Writing",
self.file
)?)
}

/// Reads from a specific position in the file and returns the buffer.
Expand Down Expand Up @@ -771,6 +779,7 @@ impl DmaFile {
/// NOTE: Clones are allowed to exist on any thread and all share the same underlying
/// fd safely. try_take_last_clone is also safe to invoke from any thread and will
/// behave correctly with respect to clones on other threads.
#[allow(clippy::result_large_err)]
pub fn try_take_last_clone(mut self) -> std::result::Result<Self, Self> {
match self.file.try_take_last_clone() {
Ok(took) => {
Expand Down
Loading
Loading