Skip to content

Commit c8b23a0

Browse files
committed
Use crossbeam_channel instead of std::sync::mpsc.
I really didn't expect this to be a bottleneck. The input itreator is a shared Arc<Mutex<Iterator>>, and I expected fighting over the lock for that was the bottleneck. Boy was I wrong! With mpsc: test sending_and_receiving ... bench: 122,697,790 ns/iter (+/- 22,716,060) With crossbeam_channel: test sending_and_receiving ... bench: 21,863,243 ns/iter (+/- 1,364,702) A big thanks to the crossbeam_channel author(s) for bringing back the ability to know when all senders/receivers have been closed. That was previously a blocker for me using crossbeam_channel v0.2.
1 parent 6e65f61 commit c8b23a0

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ repository = "https://github.com/NfNitLoop/pipeliner"
1010
documentation = "https://docs.rs/pipeliner/"
1111

1212
[dependencies]
13-
# None! :)
13+
# 0.3 reintroduces the ability to know when all receivers are closed:
14+
crossbeam-channel = "0.3"

src/lib.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@
3939
//!
4040
//! [Pipeline]: trait.Pipeline.html
4141
42+
extern crate crossbeam_channel;
43+
4244
mod tests;
4345
mod panic_guard;
4446

4547
use std::sync::{Arc, Mutex};
46-
use std::sync::mpsc::{sync_channel};
4748
use std::thread::spawn;
4849

4950
use panic_guard::*;
@@ -114,7 +115,7 @@ where It: Iterator<Item=In> + Send + 'static, In: Send + 'static
114115

115116
let input = SharedIterator::wrap(input);
116117

117-
let (output_tx, output_rx) = sync_channel(out_buffer);
118+
let (output_tx, output_rx) = crossbeam_channel::bounded(out_buffer);
118119
let callable = Arc::new(callable);
119120

120121
let mut iter = PipelineIter {

src/panic_guard.rs

+8
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ impl<T> Sender for mpsc::SyncSender<T> {
4747
}
4848
}
4949

50+
impl <T> Sender for ::crossbeam_channel::Sender<T> {
51+
type Item = T;
52+
type Error = ::crossbeam_channel::SendError<Self::Item>;
53+
fn send(&self, t: Self::Item) -> Result<(), Self::Error> {
54+
::crossbeam_channel::Sender::send(&self, t)
55+
}
56+
}
57+
5058
impl<T, S: Sender<Item=Result<T,()>>> Drop for PanicGuard<T,S>
5159
{
5260
fn drop(&mut self) {

0 commit comments

Comments
 (0)