Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental Loom support in crossbeam-deque #849

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ jobs:
- name: Install Rust
run: rustup update stable
- name: loom
run: ./ci/crossbeam-epoch-loom.sh
run: ./ci/loom.sh

# Check if the document can be generated without warning.
docs:
Expand Down
7 changes: 4 additions & 3 deletions ci/crossbeam-epoch-loom.sh → ci/loom.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/bin/bash
set -euxo pipefail
IFS=$'\n\t'
cd "$(dirname "$0")"/../crossbeam-epoch

export RUSTFLAGS="${RUSTFLAGS:-} --cfg crossbeam_loom --cfg crossbeam_sanitize"

# With MAX_PREEMPTIONS=2 the loom tests (currently) take around 11m.
# If we were to run with =3, they would take several times that,
# which is probably too costly for CI.
env LOOM_MAX_PREEMPTIONS=2 cargo test --test loom --release --features loom -- --nocapture
export LOOM_MAX_PREEMPTIONS=2

#cargo test --manifest-path="$(dirname "$0")"/../crossbeam-epoch/Cargo.toml --test loom --release --features loom -- --nocapture
cargo test --manifest-path="$(dirname "$0")"/../crossbeam-deque/Cargo.toml --release --features loom -- --nocapture
13 changes: 13 additions & 0 deletions crossbeam-deque/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ default = ["std"]
# NOTE: Disabling `std` feature is not supported yet.
std = ["crossbeam-epoch/std", "crossbeam-utils/std"]

# Enable the use of loom for concurrency testing.
#
# NOTE: This feature is outside of the normal semver guarantees and minor or
# patch versions of crossbeam may make breaking changes to them at any time.
loom = ["loom-crate", "crossbeam-utils/loom", "crossbeam-epoch/loom"]

[dependencies]
cfg-if = "1"

Expand All @@ -38,5 +44,12 @@ path = "../crossbeam-utils"
default-features = false
optional = true

# Enable the use of loom for concurrency testing.
#
# NOTE: This feature is outside of the normal semver guarantees and minor or
# patch versions of crossbeam may make breaking changes to them at any time.
[target.'cfg(crossbeam_loom)'.dependencies]
loom-crate = { package = "loom", version = "0.5", optional = true }

[dev-dependencies]
rand = "0.8"
29 changes: 17 additions & 12 deletions crossbeam-deque/src/deque.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::cell::{Cell, UnsafeCell};
use std::cell::Cell;
use std::cmp;
use std::fmt;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;

use crate::epoch::{self, Atomic, Owned};
use crate::primitive::{
cell::UnsafeCell,
sync::{
atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
Arc,
},
};
use crate::utils::{Backoff, CachePadded};

// Minimum buffer capacity.
Expand Down Expand Up @@ -1317,7 +1322,8 @@ impl<T> Injector<T> {

// Write the task into the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.task.get().write(MaybeUninit::new(task));
slot.task
.with_mut(|t| ptr::write(t, MaybeUninit::new(task)));
slot.state.fetch_or(WRITE, Ordering::Release);

return;
Expand Down Expand Up @@ -1410,7 +1416,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.with(|t| ptr::read(t).assume_init());

// Destroy the block if we've reached the end, or if another thread wanted to destroy
// but couldn't because we were busy reading from the slot.
Expand Down Expand Up @@ -1535,7 +1541,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.with(|t| ptr::read(t).assume_init());

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1547,7 +1553,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.with(|t| ptr::read(t).assume_init());

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1689,7 +1695,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.with(|t| ptr::read(t).assume_init());

match dest.flavor {
Flavor::Fifo => {
Expand All @@ -1698,7 +1704,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.with(|t| ptr::read(t).assume_init());

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1711,7 +1717,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.with(|t| ptr::read(t).assume_init());

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1836,8 +1842,7 @@ impl<T> Drop for Injector<T> {
if offset < BLOCK_CAP {
// Drop the task in the slot.
let slot = (*block).slots.get_unchecked(offset);
let p = &mut *slot.task.get();
p.as_mut_ptr().drop_in_place();
slot.task.with_mut(|t| (*t).as_mut_ptr().drop_in_place());
} else {
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Relaxed);
Expand Down
71 changes: 71 additions & 0 deletions crossbeam-deque/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,79 @@
#![allow(clippy::question_mark)] // https://github.com/rust-lang/rust-clippy/issues/8281
#![cfg_attr(not(feature = "std"), no_std)]

#[cfg(crossbeam_loom)]
extern crate loom_crate as loom;

use cfg_if::cfg_if;

#[cfg(crossbeam_loom)]
#[allow(unused_imports, dead_code)]
mod primitive {
pub(crate) mod cell {
pub(crate) use loom::cell::UnsafeCell;
}
pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use loom::sync::atomic::{
fence, AtomicIsize, AtomicPtr, AtomicUsize, Ordering,
};

// FIXME: loom does not support compiler_fence at the moment.
// https://github.com/tokio-rs/loom/issues/117
// we use fence as a stand-in for compiler_fence for the time being.
// this may miss some races since fence is stronger than compiler_fence,
// but it's the best we can do for the time being.
pub(crate) use self::fence as compiler_fence;
}
pub(crate) use loom::sync::Arc;
}
pub(crate) use loom::thread_local;
}
#[cfg(not(crossbeam_no_atomic_cas))]
#[cfg(not(crossbeam_loom))]
#[allow(unused_imports, dead_code)]
mod primitive {
pub(crate) mod cell {
#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct UnsafeCell<T>(::core::cell::UnsafeCell<T>);

// loom's UnsafeCell has a slightly different API than the standard library UnsafeCell.
// Since we want the rest of the code to be agnostic to whether it's running under loom or
// not, we write this small wrapper that provides the loom-supported API for the standard
// library UnsafeCell. This is also what the loom documentation recommends:
// https://github.com/tokio-rs/loom#handling-loom-api-differences
impl<T> UnsafeCell<T> {
#[inline]
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(::core::cell::UnsafeCell::new(data))
}

#[inline]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}

#[inline]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
}
}
pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use core::sync::atomic::{
compiler_fence, fence, AtomicIsize, AtomicPtr, AtomicUsize, Ordering,
};
}
#[cfg(feature = "std")]
pub(crate) use std::sync::Arc;
}

#[cfg(feature = "std")]
pub(crate) use std::thread_local;
}

cfg_if! {
if #[cfg(feature = "std")] {
use crossbeam_epoch as epoch;
Expand Down
2 changes: 2 additions & 0 deletions crossbeam-deque/tests/fifo.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(crossbeam_loom))]

use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
Expand Down
Loading