Skip to content

Commit

Permalink
replace closure based event handlers with EventHandler trait (#4)
Browse files Browse the repository at this point in the history
Co-authored-by: Louis Ponet <[email protected]>
  • Loading branch information
sklose and louisponet authored Jun 5, 2023
1 parent 5b280ab commit cff4199
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 94 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "disrustor"
version = "0.3.0"
version = "0.4.0"
edition = "2018"
authors = ["Sebastian Klose <[email protected]>"]
description = "This project is a port of the LMAX Disruptor to Rust"
Expand Down
12 changes: 9 additions & 3 deletions benches/u64_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::Duration;

struct Checker;

impl EventHandler<i64> for Checker {
fn handle_event(&mut self, event: &i64, seq: Sequence, _: bool) {
assert_eq!(*event, seq);
}
}

fn mpsc_channel(n: u64) {
let (tx, rx) = channel();

Expand All @@ -30,9 +38,7 @@ fn disrustor_channel<S: Sequencer, F: FnOnce(&RingBuffer<i64>) -> S>(n: u64, b:

let gating_sequence = vec![sequencer.get_cursor()];
let barrier = sequencer.create_barrier(&gating_sequence);
let processor = BatchEventProcessor::create(move |data, sequence, _| {
assert_eq!(*data, sequence);
});
let processor = BatchEventProcessor::create(Checker {});

sequencer.add_gating_sequence(&processor.get_cursor());

Expand Down
54 changes: 33 additions & 21 deletions examples/multi_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,46 @@ use log::*;

const MAX: i64 = 200i64;

struct Doubler;

impl EventHandlerMut<u32> for Doubler {
fn handle_event(&mut self, data: &mut u32, sequence: Sequence, _: bool) {
let val = *data;
if i64::from(val) != sequence {
panic!(
"concurrency problem detected (p1), expected {}, but got {}",
sequence, val
);
}
debug!("updating sequence {} from {} to {}", sequence, val, val * 2);
*data = val * 2;
}
}

struct Checker;

impl EventHandler<u32> for Checker {
fn handle_event(&mut self, data: &u32, sequence: Sequence, _: bool) {
let val = *data;
if i64::from(val) != sequence * 2 {
panic!(
"concurrency problem detected (p2), expected {}, but got {}",
sequence * 2,
val
);
}
}
}

fn follow_sequence<W: WaitStrategy + 'static>() {
let (executor, producer) = DisrustorBuilder::with_ring_buffer(128)
.with_wait_strategy::<W>()
.with_multi_producer()
.with_barrier(|b| {
b.handle_events_mut(|data, sequence, _| {
let val = *data;
if i64::from(val) != sequence {
panic!(
"concurrency problem detected (p1), expected {}, but got {}",
sequence, val
);
}
debug!("updating sequence {} from {} to {}", sequence, val, val * 2);
*data = val * 2;
});
b.handle_events_mut(Doubler {});
})
.with_barrier(|b| {
b.handle_events(|data, sequence, _| {
let val = *data;
if i64::from(val) != sequence * 2 {
panic!(
"concurrency problem detected (p2), expected {}, but got {}",
sequence * 2,
val
);
}
});
b.handle_events(Checker {});
})
.build();

Expand Down
54 changes: 33 additions & 21 deletions examples/single_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,46 @@ use log::*;

const MAX: i64 = 200i64;

struct Doubler;

impl EventHandlerMut<u32> for Doubler {
fn handle_event(&mut self, data: &mut u32, sequence: Sequence, _: bool) {
let val = *data;
if i64::from(val) != sequence {
panic!(
"concurrency problem detected (p1), expected {}, but got {}",
sequence, val
);
}
debug!("updating sequence {} from {} to {}", sequence, val, val * 2);
*data = val * 2;
}
}

struct Checker;

impl EventHandler<u32> for Checker {
fn handle_event(&mut self, data: &u32, sequence: Sequence, _: bool) {
let val = *data;
if i64::from(val) != sequence * 2 {
panic!(
"concurrency problem detected (p2), expected {}, but got {}",
sequence * 2,
val
);
}
}
}

fn follow_sequence<W: WaitStrategy + 'static>() {
let (executor, producer) = DisrustorBuilder::with_ring_buffer(128)
.with_wait_strategy::<W>()
.with_single_producer()
.with_barrier(|b| {
b.handle_events_mut(|data, sequence, _| {
let val = *data;
if i64::from(val) != sequence {
panic!(
"concurrency problem detected (p1), expected {}, but got {}",
sequence, val
);
}
debug!("updating sequence {} from {} to {}", sequence, val, val * 2);
*data = val * 2;
});
b.handle_events_mut(Doubler {});
})
.with_barrier(|b| {
b.handle_events(|data, sequence, _| {
let val = *data;
if i64::from(val) != sequence * 2 {
panic!(
"concurrency problem detected (p2), expected {}, but got {}",
sequence * 2,
val
);
}
});
b.handle_events(Checker {});
})
.build();

Expand Down
56 changes: 28 additions & 28 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::sync::Arc;
pub struct BatchEventProcessor;

impl BatchEventProcessor {
pub fn create<'a, F, T>(handler: F) -> impl EventProcessor<'a, T>
pub fn create<'a, E, T>(handler: E) -> impl EventProcessor<'a, T>
where
T: Send + 'a,
F: Fn(&T, Sequence, bool) + Send + 'static,
E: EventHandler<T> + Send + 'a,
{
Processor {
handler,
Expand All @@ -17,10 +17,10 @@ impl BatchEventProcessor {
}
}

pub fn create_mut<'a, F, T>(handler: F) -> impl EventProcessorMut<'a, T>
pub fn create_mut<'a, E, T>(handler: E) -> impl EventProcessorMut<'a, T>
where
T: Send + 'a,
F: Fn(&mut T, Sequence, bool) + Send + 'static,
E: EventHandlerMut<T> + Send + 'a,
{
ProcessorMut {
handler,
Expand All @@ -30,33 +30,33 @@ impl BatchEventProcessor {
}
}

struct Processor<F, T> {
handler: F,
struct Processor<E, T> {
handler: E,
cursor: Arc<AtomicSequence>,
_marker: PhantomData<T>,
}

struct ProcessorMut<F, T> {
handler: F,
struct ProcessorMut<E, T> {
handler: E,
cursor: Arc<AtomicSequence>,
_marker: PhantomData<T>,
}

struct RunnableProcessor<F, T, D: DataProvider<T>, B: SequenceBarrier> {
processor: Processor<F, T>,
struct RunnableProcessor<E, T, D: DataProvider<T>, B: SequenceBarrier> {
processor: Processor<E, T>,
data_provider: Arc<D>,
barrier: B,
}

struct RunnableProcessorMut<F, T, D: DataProvider<T>, B: SequenceBarrier> {
processor: ProcessorMut<F, T>,
struct RunnableProcessorMut<E, T, D: DataProvider<T>, B: SequenceBarrier> {
processor: ProcessorMut<E, T>,
data_provider: Arc<D>,
barrier: B,
}

impl<'a, F, T> EventProcessorMut<'a, T> for Processor<F, T>
impl<'a, E, T> EventProcessorMut<'a, T> for Processor<E, T>
where
F: Fn(&T, Sequence, bool) + Send + 'static,
E: EventHandler<T> + Send + 'a,
T: Send + 'a,
{
fn prepare<B: SequenceBarrier + 'a, D: DataProvider<T> + 'a>(
Expand All @@ -76,9 +76,9 @@ where
}
}

impl<'a, F, T> EventProcessorMut<'a, T> for ProcessorMut<F, T>
impl<'a, E, T> EventProcessorMut<'a, T> for ProcessorMut<E, T>
where
F: Fn(&mut T, Sequence, bool) + Send + 'static,
E: EventHandlerMut<T> + Send + 'a,
T: Send + 'a,
{
fn prepare<B: SequenceBarrier + 'a, D: DataProvider<T> + 'a>(
Expand All @@ -98,22 +98,22 @@ where
}
}

impl<'a, F, T> EventProcessor<'a, T> for Processor<F, T>
impl<'a, E, T> EventProcessor<'a, T> for Processor<E, T>
where
F: Fn(&T, Sequence, bool) + Send + 'static,
E: EventHandler<T> + Send + 'a,
T: Send + 'a,
{
}

impl<F, T, D, B> Runnable for RunnableProcessor<F, T, D, B>
impl<E, T, D, B> Runnable for RunnableProcessor<E, T, D, B>
where
F: Fn(&T, Sequence, bool) + Send + 'static,
E: EventHandler<T> + Send,
D: DataProvider<T>,
B: SequenceBarrier,
T: Send,
{
fn run(self: Box<Self>) {
let f = &self.processor.handler;
fn run(mut self: Box<Self>) {
let f = &mut self.processor.handler;
let cursor = &self.processor.cursor;
let data_provider = &self.data_provider;
let barrier = &self.barrier;
Expand All @@ -127,7 +127,7 @@ where

for i in next..=available {
let value = unsafe { data_provider.get(i) };
f(value, i, i == available);
f.handle_event(value, i, i == available);
}

cursor.set(available);
Expand All @@ -136,15 +136,15 @@ where
}
}

impl<F, T, D, B> Runnable for RunnableProcessorMut<F, T, D, B>
impl<E, T, D, B> Runnable for RunnableProcessorMut<E, T, D, B>
where
F: Fn(&mut T, Sequence, bool) + Send + 'static,
E: EventHandlerMut<T> + Send,
D: DataProvider<T>,
B: SequenceBarrier,
T: Send,
{
fn run(self: Box<Self>) {
let f = &self.processor.handler;
fn run(mut self: Box<Self>) {
let f = &mut self.processor.handler;
let cursor = &self.processor.cursor;
let data_provider = &self.data_provider;
let barrier = &self.barrier;
Expand All @@ -158,7 +158,7 @@ where

for i in next..=available {
let value = unsafe { data_provider.get_mut(i) };
f(value, i, i == available);
f.handle_event(value, i, i == available);
}

cursor.set(available);
Expand Down
8 changes: 4 additions & 4 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ impl<'a, S: Sequencer + 'a, W: WaitStrategy, D: DataProvider<T> + 'a, T: Send +
}

impl<'a, S: Sequencer + 'a, D: DataProvider<T> + 'a, T: Send + 'a> BarrierScope<'a, S, D, T> {
pub fn handle_events<F>(&mut self, handler: F)
pub fn handle_events<E>(&mut self, handler: E)
where
F: Fn(&T, Sequence, bool) + Send + 'static,
E: EventHandler<T> + Send + 'a,
{
self.handle_events_with(BatchEventProcessor::create(handler))
}

pub fn handle_events_mut<F>(&mut self, handler: F)
pub fn handle_events_mut<E>(&mut self, handler: E)
where
F: Fn(&mut T, Sequence, bool) + Send + 'static,
E: EventHandlerMut<T> + Send + 'a,
{
self.handle_events_with(BatchEventProcessor::create_mut(handler))
}
Expand Down
Loading

0 comments on commit cff4199

Please sign in to comment.