From cd21ddaeece31aa5c2ca061dc0c9a6fa055d03f7 Mon Sep 17 00:00:00 2001 From: mooshi Date: Sun, 12 Oct 2025 15:47:35 -0400 Subject: [PATCH 1/3] fix ownership --- Cargo.lock | 2 +- Cargo.toml | 7 +------ src/comms/mt/broadcast.rs | 4 ++-- src/comms/mt/buses.rs | 2 +- src/logging/journal.rs | 2 +- 5 files changed, 6 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67569ff..6eda686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,7 +51,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "mesocarp" -version = "0.12.0" +version = "0.12.1" dependencies = [ "array-init", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 2a70141..d440144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,16 +9,11 @@ license = "LGPL-2.1" name = "mesocarp" readme = "README.md" repository = "https://github.com/TheMesocarp/mesocarp" -version = "0.12.0" +version = "0.12.1" [dependencies] array-init = "2.1.0" bytemuck = "1.23.1" crossbeam-queue = "0.3.2" thiserror = "2.0.12" - -[dev-dependencies] bincode = { version = "2.0.1", features = ["derive"] } - -[features] -loom = [] diff --git a/src/comms/mt/broadcast.rs b/src/comms/mt/broadcast.rs index 4e565da..f110aa2 100644 --- a/src/comms/mt/broadcast.rs +++ b/src/comms/mt/broadcast.rs @@ -21,9 +21,9 @@ impl Broadcaster { Subscriber(Arc::clone(&self.0[subscriber_id])) } - pub fn push(&mut self, value: T) -> Result<(), MesoError> { + pub fn push(&self, value: T) -> Result<(), MesoError> { self.0 - .iter_mut() + .iter() .try_for_each(|x| x.push(value.clone()).map_err(|_| MesoError::BuffersFull))?; Ok(()) } diff --git a/src/comms/mt/buses.rs b/src/comms/mt/buses.rs index 696e8a7..ca9195b 100644 --- a/src/comms/mt/buses.rs +++ b/src/comms/mt/buses.rs @@ -175,11 +175,11 @@ unsafe impl Sync for ThreadedMessenger {} unsafe impl Send for ThreadedMessengerUser {} unsafe impl Sync for ThreadedMessengerUser {} -#[cfg(all(test, not(feature = "loom")))] mod tests { use super::*; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] + #[allow(dead_code)] struct TestMessage { timestamp: u64, commit_time: u64, diff --git a/src/logging/journal.rs b/src/logging/journal.rs index 6638ecb..b440cf9 100644 --- a/src/logging/journal.rs +++ b/src/logging/journal.rs @@ -867,7 +867,7 @@ impl Drop for Journal { unsafe impl Send for Journal {} unsafe impl Sync for Journal {} -#[cfg(all(test, not(feature = "loom")))] +#[allow(dead_code, unused_imports)] mod tests { use super::*; use bincode::{Decode, Encode}; From 1b3d777a76973d84a93471d8abac9c5acce34508 Mon Sep 17 00:00:00 2001 From: mooshi Date: Sun, 12 Oct 2025 15:48:39 -0400 Subject: [PATCH 2/3] taplo --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d440144..0121a4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ version = "0.12.1" [dependencies] array-init = "2.1.0" +bincode = { version = "2.0.1", features = ["derive"] } bytemuck = "1.23.1" crossbeam-queue = "0.3.2" thiserror = "2.0.12" -bincode = { version = "2.0.1", features = ["derive"] } From cd97494516d70045e205d849cd812b9724d1b0f7 Mon Sep 17 00:00:00 2001 From: mooshi Date: Mon, 13 Oct 2025 13:55:07 -0400 Subject: [PATCH 3/3] loom tests for crossbeam primitives --- .github/workflows/rust.yaml | 2 +- Cargo.lock | 353 ++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/comms/mt/broadcast.rs | 116 ++++++++++++ src/comms/mt/buses.rs | 179 +++++++++++++++++- src/scheduling/htw.rs | 2 +- 6 files changed, 650 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yaml b/.github/workflows/rust.yaml index 43a98b4..6a74e97 100644 --- a/.github/workflows/rust.yaml +++ b/.github/workflows/rust.yaml @@ -69,7 +69,7 @@ jobs: run: cargo build --workspace - name: Clippy - run: cargo clippy --all-targets --all-features -- --deny warnings -A clippy::manual_is_multiple_of + run: cargo clippy --all-targets --all-features -- --deny warnings toml-fmt: name: taplo diff --git a/Cargo.lock b/Cargo.lock index 6eda686..75b4677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "array-init" version = "2.1.0" @@ -34,6 +43,22 @@ version = "1.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c76a5792e44e4abe34d3abf15636779261d45a7450612059293d1d2cfc63422" +[[package]] +name = "cc" +version = "1.2.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -49,6 +74,72 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "find-msvc-tools" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" + +[[package]] +name = "generator" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "605183a538e3e2a9c1038635cc5c2d194e2ee8fd0d1b66b8349fad7dbacce5a2" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.177" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" + +[[package]] +name = "log" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" + +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "memchr" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" + [[package]] name = "mesocarp" version = "0.12.1" @@ -57,9 +148,31 @@ dependencies = [ "bincode", "bytemuck", "crossbeam-queue", + "loom", "thiserror", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + [[package]] name = "proc-macro2" version = "1.0.95" @@ -78,6 +191,35 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex-automata" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "serde" version = "1.0.228" @@ -107,6 +249,27 @@ dependencies = [ "syn", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + [[package]] name = "syn" version = "2.0.104" @@ -138,6 +301,64 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-ident" version = "1.0.18" @@ -150,8 +371,140 @@ version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "virtue" version = "0.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-link 0.1.3", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core", +] + +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link 0.1.3", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core", + "windows-link 0.1.3", + "windows-threading", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core", + "windows-link 0.1.3", +] + +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link 0.1.3", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link 0.1.3", +] + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link 0.2.1", +] + +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link 0.1.3", +] diff --git a/Cargo.toml b/Cargo.toml index 0121a4a..0425de3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,5 @@ array-init = "2.1.0" bincode = { version = "2.0.1", features = ["derive"] } bytemuck = "1.23.1" crossbeam-queue = "0.3.2" +loom = "0.7.2" thiserror = "2.0.12" diff --git a/src/comms/mt/broadcast.rs b/src/comms/mt/broadcast.rs index f110aa2..1eea5e5 100644 --- a/src/comms/mt/broadcast.rs +++ b/src/comms/mt/broadcast.rs @@ -4,6 +4,9 @@ use crossbeam_queue::ArrayQueue; use crate::MesoError; +/// SPMC broadcast channel, wrapping `crossbeam_queue::ArrayQueue` for `T: Clone` types. +/// +/// Requires up-front specification of the number of feeders listening, and the number of slots each has available #[derive(Debug)] pub struct Broadcaster(Box<[Arc>]>); @@ -29,6 +32,7 @@ impl Broadcaster { } } +/// Subscriber or feeder to the broadcast. #[derive(Debug)] pub struct Subscriber(Arc>); @@ -37,3 +41,115 @@ impl Subscriber { self.0.pop() } } + +#[cfg(test)] +mod tests { + use super::*; + use loom::sync::Arc as LoomArc; + use loom::thread; + + #[test] + fn loom_test_broadcaster_all_subscribers_receive() { + loom::model(|| { + const NUM_SUBSCRIBERS: usize = 3; + const NUM_MESSAGES: usize = 5; + + let broadcaster = LoomArc::new(Broadcaster::new(NUM_SUBSCRIBERS, 16)); + let mut subscribers = Vec::new(); + + for i in 0..NUM_SUBSCRIBERS { + subscribers.push(broadcaster.subscribe(i)); + } + + let producer = thread::spawn({ + let broadcaster = broadcaster.clone(); + move || { + for i in 0..NUM_MESSAGES { + broadcaster.push(i).unwrap(); + } + } + }); + + let mut consumer_handles = Vec::new(); + for mut subscriber in subscribers { + let handle = thread::spawn(move || { + let mut received = Vec::new(); + while received.len() < NUM_MESSAGES { + if let Some(msg) = subscriber.pop() { + received.push(msg); + } + } + + // Check for order and no duplicates + assert_eq!(received.len(), NUM_MESSAGES); + for (i, msg) in received.iter().enumerate().take(NUM_MESSAGES) { + assert_eq!(*msg, i); + } + }); + consumer_handles.push(handle); + } + + producer.join().unwrap(); + for handle in consumer_handles { + handle.join().unwrap(); + } + }); + } + + #[test] + fn loom_test_concurrent_pushes_and_pops() { + loom::model(|| { + const NUM_THREADS: usize = 1; + const MESSAGES_PER_THREAD: usize = 3; + + let broadcaster = LoomArc::new(Broadcaster::::new(NUM_THREADS, 32)); + let mut subscribers = Vec::new(); + for i in 0..NUM_THREADS { + subscribers.push(broadcaster.subscribe(i)); + } + + let mut producer_handles = Vec::new(); + for i in 0..NUM_THREADS { + let broadcaster = broadcaster.clone(); + let handle = thread::spawn(move || { + for j in 0..MESSAGES_PER_THREAD { + // Unique message from each thread + let msg = i * MESSAGES_PER_THREAD + j; + broadcaster.push(msg).unwrap(); + } + }); + producer_handles.push(handle); + } + + let mut consumer_handles = Vec::new(); + for mut subscriber in subscribers { + let handle = thread::spawn(move || { + let mut received_count = 0; + let mut received_sum = 0; + let total_messages = NUM_THREADS * MESSAGES_PER_THREAD; + + while received_count < total_messages { + if let Some(msg) = subscriber.pop() { + received_sum += msg; + received_count += 1; + } + } + + assert_eq!(received_count, total_messages); + + let expected_sum: usize = (0..total_messages).sum(); + assert_eq!(received_sum, expected_sum); + }); + consumer_handles.push(handle); + } + + for handle in producer_handles { + handle.join().unwrap(); + } + + for handle in consumer_handles { + handle.join().unwrap(); + } + }); + } +} diff --git a/src/comms/mt/buses.rs b/src/comms/mt/buses.rs index ca9195b..5cecaad 100644 --- a/src/comms/mt/buses.rs +++ b/src/comms/mt/buses.rs @@ -175,10 +175,12 @@ unsafe impl Sync for ThreadedMessenger {} unsafe impl Send for ThreadedMessengerUser {} unsafe impl Sync for ThreadedMessengerUser {} +#[cfg(test)] mod tests { use super::*; + use loom::thread; - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[allow(dead_code)] struct TestMessage { timestamp: u64, @@ -273,4 +275,179 @@ mod tests { assert!(received1.contains(&broadcast_msg)); assert!(received2.contains(&broadcast_msg)); } + + /// Tests high-volume, one-way message passing with a buffer smaller than the message count + /// to ensure ordering is preserved and no messages are lost under pressure. + #[test] + fn loom_test_high_volume_unidirectional() { + let mut builder = loom::model::Builder::new(); + builder.max_branches = 100000; + + builder.check(|| { + use loom::sync::{Arc, Mutex}; + const NUM_MESSAGES: u64 = 50000; + const BUFFER_SIZE: usize = 8; + + let messenger = Arc::new(Mutex::new( + ThreadedMessenger::::new(2, BUFFER_SIZE).unwrap(), + )); + + let (user0, mut user1) = { + let mut guard = messenger.lock().unwrap(); + (guard.get_user().unwrap(), guard.get_user().unwrap()) + }; + + let producer = thread::spawn(move || { + for i in 0..NUM_MESSAGES { + let msg = TestMessage { + timestamp: i, + commit_time: i, + from_id: 0, + to_id: 1, + is_broadcast: false, + data: format!("msg-{}", i), + }; + // Loop until send is successful, as the buffer may be full. + while user0.send(msg.clone()).is_err() { + thread::yield_now(); + } + } + }); + + let consumer = thread::spawn(move || { + let mut received = Vec::new(); + while received.len() < NUM_MESSAGES as usize { + if let Some(msgs) = user1.poll() { + received.extend(msgs); + } + thread::yield_now(); + } + received.sort_by_key(|m| m.timestamp); + assert_eq!(received.len(), NUM_MESSAGES as usize); + for i in 0..NUM_MESSAGES { + assert_eq!(received[i as usize].timestamp, i); + } + }); + + let router_messenger = messenger.clone(); + let router = thread::spawn(move || { + let mut routed_count = 0; + while routed_count < NUM_MESSAGES as usize { + let mut guard = router_messenger.lock().unwrap(); + if let Ok(to_deliver) = guard.poll() { + routed_count += to_deliver.len(); + while guard.deliver(to_deliver.clone()).is_err() { + thread::yield_now(); + } + } + drop(guard); // Release lock + thread::yield_now(); + } + }); + + producer.join().unwrap(); + consumer.join().unwrap(); + router.join().unwrap(); + }); + } + + /// Tests bidirectional message passing to check for deadlocks and race conditions + /// when multiple users are sending and receiving simultaneously. + #[test] + fn loom_test_bidirectional_ping_pong() { + let mut builder = loom::model::Builder::new(); + builder.max_branches = 100000; + + builder.check(|| { + use loom::sync::{Arc, Mutex}; + const NUM_PINGS: u64 = 5000; + const BUFFER_SIZE: usize = 4; + + let messenger = Arc::new(Mutex::new( + ThreadedMessenger::::new(2, BUFFER_SIZE).unwrap(), + )); + + let (mut user0, mut user1) = { + let mut guard = messenger.lock().unwrap(); + (guard.get_user().unwrap(), guard.get_user().unwrap()) + }; + + // User 0 sends pings and expects pongs + let pinger = thread::spawn(move || { + for i in 0..NUM_PINGS { + let ping = TestMessage { + timestamp: i, + from_id: 0, + to_id: 1, + data: "ping".to_string(), + commit_time: 0, + is_broadcast: false, + }; + while user0.send(ping.clone()).is_err() { + thread::yield_now(); + } + + let mut received_pong = false; + while !received_pong { + if let Some(msgs) = user0.poll() { + for msg in msgs { + if msg.from() == 1 && msg.data == "pong" && msg.timestamp == i { + received_pong = true; + break; + } + } + } + thread::yield_now(); + } + } + }); + + // User 1 receives pings and sends pongs + let ponger = thread::spawn(move || { + for i in 0..NUM_PINGS { + let mut received_ping = false; + while !received_ping { + if let Some(msgs) = user1.poll() { + for msg in msgs { + if msg.from() == 0 && msg.data == "ping" && msg.timestamp == i { + received_ping = true; + break; + } + } + } + thread::yield_now(); + } + + let pong = TestMessage { + timestamp: i, + from_id: 1, + to_id: 0, + data: "pong".to_string(), + commit_time: 0, + is_broadcast: false, + }; + while user1.send(pong.clone()).is_err() { + thread::yield_now(); + } + } + }); + + let router_messenger = messenger.clone(); + let router = thread::spawn(move || { + // Run router for a fixed number of iterations, enough for loom to explore states. + for _ in 0..(NUM_PINGS * 4) { + let mut guard = router_messenger.lock().unwrap(); + if let Ok(to_deliver) = guard.poll() { + guard.deliver(to_deliver).unwrap(); + } + drop(guard); + thread::yield_now(); + } + }); + + pinger.join().unwrap(); + ponger.join().unwrap(); + router.join().unwrap(); + }); + } } diff --git a/src/scheduling/htw.rs b/src/scheduling/htw.rs index eccca29..de291e7 100644 --- a/src/scheduling/htw.rs +++ b/src/scheduling/htw.rs @@ -84,7 +84,7 @@ impl Clock>) { for k in 1..HEIGHT { let wheel_period = SLOTS.pow(k as u32); - if self.time % (wheel_period as u64) == 0 { + if self.time.is_multiple_of(wheel_period as u64) { if HEIGHT == k { for _ in 0..SLOTS.pow(HEIGHT as u32 - 1) { overflow.pop().map(|event| self.insert(event.0));