diff --git a/CHANGELOG.md b/CHANGELOG.md index 4858ffb1e..9bc8cc29f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## Unreleased + +### Added + +A `Worker` now has a `step_or_park(Option)` method, which instructs the worker to take a step and gives it permission to part the worker thread for at most the supplied timeout if there is no work to perform. A value of `None` implies no timeout (unboundedly parked) whereas a value of `Some(0)` should return immediately. The communication layers are implemented to awaken workers if they receive new communications, and workers should hand out copies of their `Thread` if they want other threads to wake them for other reasons (e.g. queues from threads external to timely). + ## 0.9.0 ### Added diff --git a/communication/src/allocator/counters.rs b/communication/src/allocator/counters.rs index e73a298db..003de69d4 100644 --- a/communication/src/allocator/counters.rs +++ b/communication/src/allocator/counters.rs @@ -91,10 +91,11 @@ impl> Push for ArcPusher { // else { // self.count += 1; // } + let _ = self.events - .send((self.index, Event::Pushed(1))) + .send((self.index, Event::Pushed(1))); // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown). - .expect("Failed to send message count"); + // .expect("Failed to send message count"); self.pusher.push(element) } diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index c1261e417..bea977c80 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -94,6 +94,14 @@ impl Allocate for Generic { fn receive(&mut self) { self.receive(); } fn release(&mut self) { self.release(); } fn events(&self) -> &Rc>> { self.events() } + fn await_events(&self, _duration: Option) { + match self { + &Generic::Thread(ref t) => t.await_events(_duration), + &Generic::Process(ref p) => p.await_events(_duration), + &Generic::ProcessBinary(ref pb) => pb.await_events(_duration), + &Generic::ZeroCopy(ref z) => z.await_events(_duration), + } + } } diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 0cd36b88a..343350c38 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -5,11 +5,13 @@ use std::cell::RefCell; use std::sync::{Arc, Mutex}; use std::any::Any; use std::sync::mpsc::{Sender, Receiver, channel}; +use std::time::Duration; use std::collections::{HashMap, VecDeque}; use crate::allocator::thread::{ThreadBuilder}; use crate::allocator::{Allocate, AllocateBuilder, Event, Thread}; use crate::{Push, Pull, Message}; +use crate::buzzer::Buzzer; /// An allocator for inter-thread, intra-process communication pub struct ProcessBuilder { @@ -19,6 +21,10 @@ pub struct ProcessBuilder { // below: `Box` is a `Box>, Receiver)>>>` channels: Arc>>>, + // Buzzers for waking other local workers. + buzzers_send: Vec>, + buzzers_recv: Vec>, + counters_send: Vec>, counters_recv: Receiver<(usize, Event)>, } @@ -26,11 +32,23 @@ pub struct ProcessBuilder { impl AllocateBuilder for ProcessBuilder { type Allocator = Process; fn build(self) -> Self::Allocator { + + // Initialize buzzers; send first, then recv. + for worker in self.buzzers_send.iter() { + let buzzer = Buzzer::new(); + worker.send(buzzer).expect("Failed to send buzzer"); + } + let mut buzzers = Vec::new(); + for worker in self.buzzers_recv.iter() { + buzzers.push(worker.recv().expect("Failed to recv buzzer")); + } + Process { inner: self.inner.build(), index: self.index, peers: self.peers, channels: self.channels, + buzzers, counters_send: self.counters_send, counters_recv: self.counters_recv, } @@ -44,6 +62,7 @@ pub struct Process { peers: usize, // below: `Box` is a `Box>, Receiver)>>>` channels: Arc>>>, + buzzers: Vec, counters_send: Vec>, counters_recv: Receiver<(usize, Event)>, } @@ -64,14 +83,21 @@ impl Process { let channels = Arc::new(Mutex::new(HashMap::new())); + // Allocate matrix of buzzer send and recv endpoints. + let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers); + counters_recv .into_iter() + .zip(buzzers_send.into_iter()) + .zip(buzzers_recv.into_iter()) .enumerate() - .map(|(index, recv)| { + .map(|(index, ((recv, bsend), brecv))| { ProcessBuilder { inner: ThreadBuilder, index, peers, + buzzers_send: bsend, + buzzers_recv: brecv, channels: channels.clone(), counters_send: counters_send.clone(), counters_recv: recv, @@ -99,10 +125,10 @@ impl Allocate for Process { let mut pushers = Vec::new(); let mut pullers = Vec::new(); - for _ in 0..self.peers { + for index in 0 .. self.peers { let (s, r): (Sender>, Receiver>) = channel(); - pushers.push(Pusher { target: s }); + pushers.push(Pusher { target: s, buzzer: self.buzzers[index].clone() }); pullers.push(Puller { source: r, current: None }); } @@ -152,6 +178,10 @@ impl Allocate for Process { self.inner.events() } + fn await_events(&self, duration: Option) { + self.inner.await_events(duration); + } + fn receive(&mut self) { let mut events = self.inner.events().borrow_mut(); while let Ok((index, event)) = self.counters_recv.try_recv() { @@ -163,11 +193,15 @@ impl Allocate for Process { /// The push half of an intra-process channel. struct Pusher { target: Sender, + buzzer: Buzzer, } impl Clone for Pusher { fn clone(&self) -> Self { - Pusher { target: self.target.clone() } + Self { + target: self.target.clone(), + buzzer: self.buzzer.clone() + } } } @@ -175,6 +209,7 @@ impl Push for Pusher { #[inline] fn push(&mut self, element: &mut Option) { if let Some(element) = element.take() { self.target.send(element).unwrap(); + self.buzzer.buzz(); } } } diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 8ec6a5284..c706b3795 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -2,7 +2,7 @@ use std::rc::Rc; use std::cell::RefCell; use std::collections::{VecDeque, HashMap}; -// use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::mpsc::{Sender, Receiver}; use bytes::arc::Bytes; @@ -10,11 +10,10 @@ use crate::networking::MessageHeader; use crate::{Allocate, Message, Data, Push, Pull}; use crate::allocator::AllocateBuilder; -use crate::allocator::{Event, Process}; -use crate::allocator::process::ProcessBuilder; +use crate::allocator::Event; use crate::allocator::canary::Canary; -use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue, Signal}; +use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; use super::push_pull::{Pusher, PullerInner}; /// Builds an instance of a TcpAllocator. @@ -24,12 +23,11 @@ use super::push_pull::{Pusher, PullerInner}; /// shared between threads here, and then provide a method that will instantiate the non-movable /// members once in the destination thread. pub struct TcpBuilder { - inner: A, - index: usize, // number out of peers - peers: usize, // number of peer allocators. - sends: Vec, // for pushing bytes at remote processes. - recvs: Vec, // for pulling bytes from remote processes. - signal: Signal, + inner: A, + index: usize, // number out of peers + peers: usize, // number of peer allocators. + futures: Vec>, // to receive queues to each network thread. + promises: Vec>, // to send queues from each network thread. } /// Creates a vector of builders, sharing appropriate state. @@ -44,58 +42,37 @@ pub struct TcpBuilder { /// info to spawn ingress comm thresds, /// ) /// ``` -pub fn new_vector( +pub fn new_vector( + allocators: Vec, my_process: usize, - threads: usize, processes: usize) --> (Vec>, - Vec<(Vec, Signal)>, - Vec>) { +-> (Vec>, + Vec>>, + Vec>>) +{ + let threads = allocators.len(); - // The results are a vector of builders, as well as the necessary shared state to build each - // of the send and receive communication threads, respectively. - - // One signal per local destination worker thread - let worker_signals: Vec = (0 .. threads).map(|_| Signal::new()).collect(); - - // One signal per destination egress communication thread - let network_signals: Vec = (0 .. processes-1).map(|_| Signal::new()).collect(); - - let worker_to_network: Vec> = (0 .. threads).map(|_| (0 .. processes-1).map(|p| MergeQueue::new(network_signals[p].clone())).collect()).collect(); - let network_to_worker: Vec> = (0 .. processes-1).map(|_| (0 .. threads).map(|t| MergeQueue::new(worker_signals[t].clone())).collect()).collect(); - - let worker_from_network: Vec> = (0 .. threads).map(|t| (0 .. processes-1).map(|p| network_to_worker[p][t].clone()).collect()).collect(); - let network_from_worker: Vec> = (0 .. processes-1).map(|p| (0 .. threads).map(|t| worker_to_network[t][p].clone()).collect()).collect(); + // For queues from worker threads to network threads, and vice versa. + let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads); + let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1); let builders = - Process::new_vector(threads) // Vec (Process is Allocate) + allocators .into_iter() - .zip(worker_signals) - .zip(worker_to_network) - .zip(worker_from_network) + .zip(worker_promises) + .zip(worker_futures) .enumerate() - .map(|(index, (((inner, signal), sends), recvs))| { - // sends are handles to MergeQueues to remote processes - // (one per remote process) - // recvs are handles to MergeQueues from remote processes - // (one per remote process) + .map(|(index, ((inner, promises), futures))| { TcpBuilder { inner, index: my_process * threads + index, peers: threads * processes, - sends, - recvs, - signal, + promises, + futures, }}) .collect(); - // for each egress communicaton thread, construct the tuple (MergeQueues from local - // threads, corresponding signal) - let sends = network_from_worker.into_iter().zip(network_signals).collect(); - - (/* AllocateBuilder for local threads */ builders, - /* info to spawn egress comm threads */ sends, - /* info to spawn ingress comm thresds */ network_to_worker) + (builders, network_promises, network_futures) } impl TcpBuilder { @@ -103,18 +80,34 @@ impl TcpBuilder { /// Builds a `TcpAllocator`, instantiating `Rc>` elements. pub fn build(self) -> TcpAllocator { - let sends: Vec<_> = self.sends.into_iter().map( - |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect(); + // Fulfill puller obligations. + let mut recvs = Vec::with_capacity(self.peers); + for promise in self.promises.into_iter() { + let buzzer = crate::buzzer::Buzzer::new(); + let queue = MergeQueue::new(buzzer); + promise.send(queue.clone()).expect("Failed to send MergeQueue"); + recvs.push(queue.clone()); + } + + // Extract pusher commitments. + let mut sends = Vec::with_capacity(self.peers); + for pusher in self.futures.into_iter() { + let queue = pusher.recv().expect("Failed to receive push queue"); + let sendpoint = SendEndpoint::new(queue); + sends.push(Rc::new(RefCell::new(sendpoint))); + } + + // let sends: Vec<_> = self.sends.into_iter().map( + // |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect(); TcpAllocator { inner: self.inner.build(), index: self.index, peers: self.peers, - _signal: self.signal, canaries: Rc::new(RefCell::new(Vec::new())), staged: Vec::new(), sends, - recvs: self.recvs, + recvs, to_local: HashMap::new(), } } @@ -128,8 +121,6 @@ pub struct TcpAllocator { index: usize, // number out of peers peers: usize, // number of peer allocators (for typed channel allocation). - _signal: Signal, - staged: Vec, // staging area for incoming Bytes canaries: Rc>>, @@ -260,4 +251,7 @@ impl Allocate for TcpAllocator { fn events(&self) -> &Rc>> { self.inner.events() } + fn await_events(&self, duration: Option) { + self.inner.await_events(duration); + } } \ No newline at end of file diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index 21e8330b8..76a44d84a 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -3,6 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; use std::collections::{VecDeque, HashMap}; +use std::sync::mpsc::{Sender, Receiver}; use bytes::arc::Bytes; @@ -12,7 +13,7 @@ use crate::{Allocate, Message, Data, Push, Pull}; use crate::allocator::{AllocateBuilder, Event}; use crate::allocator::canary::Canary; -use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue, Signal}; +use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; use super::push_pull::{Pusher, Puller}; @@ -23,11 +24,11 @@ use super::push_pull::{Pusher, Puller}; /// shared between threads here, and then provide a method that will instantiate the non-movable /// members once in the destination thread. pub struct ProcessBuilder { - index: usize, // number out of peers - peers: usize, // number of peer allocators. - sends: Vec, // for pushing bytes at remote processes. - recvs: Vec, // for pulling bytes from remote processes. - signal: Signal, + index: usize, // number out of peers + peers: usize, // number of peer allocators. + pushers: Vec>, // for pushing bytes at other workers. + pullers: Vec>, // for pulling bytes from other workers. + // signal: Signal, } impl ProcessBuilder { @@ -36,44 +37,41 @@ impl ProcessBuilder { /// This method requires access to a byte exchanger, from which it mints channels. pub fn new_vector(count: usize) -> Vec { - let signals: Vec = (0 .. count).map(|_| Signal::new()).collect(); + // Channels for the exchange of `MergeQueue` endpoints. + let (pullers_vec, pushers_vec) = crate::promise_futures(count, count); - let mut sends = Vec::new(); - let mut recvs = Vec::new(); - for _ in 0 .. count { sends.push(Vec::new()); } - for _ in 0 .. count { recvs.push(Vec::new()); } - - for source in 0 .. count { - for target in 0 .. count { - let send = MergeQueue::new(signals[target].clone()); - let recv = send.clone(); - sends[source].push(send); - recvs[target].push(recv); - } - } - - sends.into_iter() - .zip(recvs) - .zip(signals) - .enumerate() - .map(|(index, ((sends, recvs), signal))| + pushers_vec + .into_iter() + .zip(pullers_vec) + .enumerate() + .map(|(index, (pushers, pullers))| ProcessBuilder { index, peers: count, - sends, - recvs, - signal, + pushers, + pullers, } - ) - .collect() + ) + .collect() } /// Builds a `ProcessAllocator`, instantiating `Rc>` elements. pub fn build(self) -> ProcessAllocator { - let mut sends = Vec::new(); - for send in self.sends.into_iter() { - let sendpoint = SendEndpoint::new(send); + // Fulfill puller obligations. + let mut recvs = Vec::with_capacity(self.peers); + for puller in self.pullers.into_iter() { + let buzzer = crate::buzzer::Buzzer::new(); + let queue = MergeQueue::new(buzzer); + puller.send(queue.clone()).expect("Failed to send MergeQueue"); + recvs.push(queue.clone()); + } + + // Extract pusher commitments. + let mut sends = Vec::with_capacity(self.peers); + for pusher in self.pushers.into_iter() { + let queue = pusher.recv().expect("Failed to receive MergeQueue"); + let sendpoint = SendEndpoint::new(queue); sends.push(Rc::new(RefCell::new(sendpoint))); } @@ -84,9 +82,9 @@ impl ProcessBuilder { canaries: Rc::new(RefCell::new(Vec::new())), staged: Vec::new(), sends, - recvs: self.recvs, + recvs, to_local: HashMap::new(), - _signal: self.signal, + // _signal: self.signal, } } } @@ -110,7 +108,7 @@ pub struct ProcessAllocator { canaries: Rc>>, - _signal: Signal, + // _signal: Signal, // sending, receiving, and responding to binary buffers. staged: Vec, sends: Vec>>>, // sends[x] -> goes to thread x. @@ -224,4 +222,14 @@ impl Allocate for ProcessAllocator { fn events(&self) -> &Rc>> { &self.events } + fn await_events(&self, duration: Option) { + if self.events.borrow().is_empty() { + if let Some(duration) = duration { + std::thread::park_timeout(duration); + } + else { + std::thread::park(); + } + } + } } \ No newline at end of file diff --git a/communication/src/allocator/zero_copy/bytes_exchange.rs b/communication/src/allocator/zero_copy/bytes_exchange.rs index 945eb7b6e..ddedf806a 100644 --- a/communication/src/allocator/zero_copy/bytes_exchange.rs +++ b/communication/src/allocator/zero_copy/bytes_exchange.rs @@ -1,7 +1,6 @@ //! Types and traits for sharing `Bytes`. -use std::thread::Thread; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::collections::VecDeque; use bytes::arc::Bytes; @@ -22,41 +21,6 @@ pub trait BytesPull { fn drain_into(&mut self, vec: &mut Vec); } -/// A signal appropriate to wake a single thread. -/// -/// Internally this type uses thread parking and unparking, where the first thread to call -/// `wait` is registered as the thread to wake. Other threads that call `wait` will just be -/// parked without registering themselves, which would probably be a bug (of theirs). -#[derive(Clone)] -pub struct Signal { - thread: Arc>>, -} - -impl Signal { - /// Creates a new signal. - pub fn new() -> Self { - Signal { thread: Arc::new(RwLock::new(None)) } - } - /// Blocks unless or until ping is called. - pub fn wait(&self) { - // It is important not to block on the first call; doing so would fail to unblock - // from pings before the first call to wait. This may appear as a spurious wake-up, - // and ideally the caller is prepared for that. - if self.thread.read().expect("failed to read thread").is_none() { - *self.thread.write().expect("failed to set thread") = Some(::std::thread::current()) - } - else { - ::std::thread::park(); - } - } - /// Unblocks the current or next call to wait. - pub fn ping(&self) { - if let Some(thread) = self.thread.read().expect("failed to read thread").as_ref() { - thread.unpark(); - } - } -} - use std::sync::atomic::{AtomicBool, Ordering}; /// An unbounded queue of bytes intended for point-to-point communication /// between threads. Cloning returns another handle to the same queue. @@ -65,16 +29,16 @@ use std::sync::atomic::{AtomicBool, Ordering}; #[derive(Clone)] pub struct MergeQueue { queue: Arc>>, // queue of bytes. - dirty: Signal, // indicates whether there may be data present. + buzzer: crate::buzzer::Buzzer, // awakens receiver thread. panic: Arc, } impl MergeQueue { /// Allocates a new queue with an associated signal. - pub fn new(signal: Signal) -> Self { + pub fn new(buzzer: crate::buzzer::Buzzer) -> Self { MergeQueue { queue: Arc::new(Mutex::new(VecDeque::new())), - dirty: signal, + buzzer, panic: Arc::new(AtomicBool::new(false)), } } @@ -122,7 +86,7 @@ impl BytesPush for MergeQueue { // Wakeup corresponding thread *after* releasing the lock ::std::mem::drop(queue); if should_ping { - self.dirty.ping(); // only signal from empty to non-empty. + self.buzzer.buzz(); // only signal from empty to non-empty. } } } @@ -156,7 +120,7 @@ impl Drop for MergeQueue { } // Drop the queue before pinging. self.queue = Arc::new(Mutex::new(VecDeque::new())); - self.dirty.ping(); + self.buzzer.buzz(); } } diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 4489117d5..6764189ba 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -48,11 +48,11 @@ pub fn initialize_networking( let mut results: Vec> = create_sockets(addresses, my_index, noisy)?; - let (builders, remote_recvs, remote_sends) = new_vector(my_index, threads, processes); - // egress queues - let mut remote_recv_iter = remote_recvs.into_iter(); - // ingress queues - let mut remote_send_iter = remote_sends.into_iter(); + let process_allocators = crate::allocator::process::Process::new_vector(threads); + let (builders, promises, futures) = new_vector(process_allocators, my_index, processes); + + let mut promises_iter = promises.into_iter(); + let mut futures_iter = futures.into_iter(); let mut send_guards = Vec::new(); let mut recv_guards = Vec::new(); @@ -63,7 +63,7 @@ pub fn initialize_networking( if let Some(stream) = results[index].take() { // remote process - let (remote_recv, signal) = remote_recv_iter.next().unwrap(); + let remote_recv = promises_iter.next().unwrap(); { let log_sender = log_sender.clone(); @@ -79,13 +79,13 @@ pub fn initialize_networking( remote: Some(index), }); - send_loop(stream, remote_recv, signal, my_index, index, logger); + send_loop(stream, remote_recv, my_index, index, logger); })?; send_guards.push(join_guard); } - let remote_send = remote_send_iter.next().unwrap(); + let remote_send = futures_iter.next().unwrap(); { // let remote_sends = remote_sends.clone(); diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index 6e44dc425..68499bbde 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -2,11 +2,12 @@ use std::io::{Read, Write}; use std::net::TcpStream; +use std::sync::mpsc::{Sender, Receiver}; use crate::networking::MessageHeader; use super::bytes_slab::BytesSlab; -use super::bytes_exchange::{MergeQueue, Signal}; +use super::bytes_exchange::MergeQueue; use logging_core::Logger; @@ -20,7 +21,7 @@ use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, State /// take down the computation and cause the failures to cascade. pub fn recv_loop( mut reader: TcpStream, - mut targets: Vec, + targets: Vec>, worker_offset: usize, process: usize, remote: usize, @@ -29,6 +30,8 @@ pub fn recv_loop( // Log the receive thread's start. logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true })); + let mut targets: Vec = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect(); + let mut buffer = BytesSlab::new(20); // Where we stash Bytes before handing them off. @@ -111,8 +114,7 @@ pub fn recv_loop( pub fn send_loop( // TODO: Maybe we don't need BufWriter with consolidation in writes. writer: TcpStream, - mut sources: Vec, - signal: Signal, + sources: Vec>, process: usize, remote: usize, mut logger: Option>) @@ -121,6 +123,13 @@ pub fn send_loop( // Log the receive thread's start. logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, })); + let mut sources: Vec = sources.into_iter().map(|x| { + let buzzer = crate::buzzer::Buzzer::new(); + let queue = MergeQueue::new(buzzer); + x.send(queue.clone()).expect("failed to send MergeQueue"); + queue + }).collect(); + let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer); let mut stash = Vec::new(); @@ -142,7 +151,7 @@ pub fn send_loop( writer.flush().expect("Failed to flush writer."); sources.retain(|source| !source.is_complete()); if !sources.is_empty() { - signal.wait(); + std::thread::park(); } } else { diff --git a/communication/src/buzzer.rs b/communication/src/buzzer.rs new file mode 100644 index 000000000..09048dc19 --- /dev/null +++ b/communication/src/buzzer.rs @@ -0,0 +1,22 @@ +//! A type that can unpark specific threads. + +use std::thread::Thread; + +/// Can unpark a specific thread. +#[derive(Clone)] +pub struct Buzzer { + thread: Thread, +} + +impl Buzzer { + /// Creates a new buzzer for the current thread. + pub fn new() -> Self { + Self { + thread: std::thread::current() + } + } + /// Unparks the target thread. + pub fn buzz(&self) { + self.thread.unpark() + } +} \ No newline at end of file diff --git a/communication/src/lib.rs b/communication/src/lib.rs index 528e20805..7af0714b3 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -93,6 +93,7 @@ pub mod networking; pub mod initialize; pub mod logging; pub mod message; +pub mod buzzer; use std::any::Any; @@ -164,3 +165,27 @@ impl> Pull for Box

{ #[inline] fn pull(&mut self) -> &mut Option { (**self).pull() } } + + +use std::sync::mpsc::{Sender, Receiver, channel}; + +/// Allocate a matrix of send and receive changes to exchange items. +/// +/// This method constructs channels for `sends` threads to create and send +/// items of type `T` to `recvs` receiver threads. +fn promise_futures(sends: usize, recvs: usize) -> (Vec>>, Vec>>) { + + // each pair of workers has a sender and a receiver. + let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect(); + let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect(); + + for sender in 0 .. sends { + for recver in 0 .. recvs { + let (send, recv) = channel(); + senders[sender].push(send); + recvers[recver].push(recv); + } + } + + (senders, recvers) +} \ No newline at end of file diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 4672c3b47..adb8ed620 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -198,6 +198,29 @@ pub struct InputEvent { pub start_stop: StartStop, } +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +/// Input logic start/stop +pub struct ParkEvent { + /// True when activity begins, false when it stops + pub event: ParkUnpark +} + +impl ParkEvent { + /// Creates a new park event from the supplied duration. + pub fn park(duration: Option) -> Self { ParkEvent { event: ParkUnpark::Park(duration) } } + /// Creates a new unpark event. + pub fn unpark() -> Self { ParkEvent { event: ParkUnpark::Unpark } } +} + +/// Records the starting and stopping of an operator. +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)] +pub enum ParkUnpark { + /// Worker parks. + Park(Option), + /// Worker unparks. + Unpark, +} + #[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)] /// An event in a timely worker pub enum TimelyEvent { @@ -225,6 +248,8 @@ pub enum TimelyEvent { CommChannels(CommChannelsEvent), /// Input event. Input(InputEvent), + /// Park event. + Park(ParkEvent), /// Unstructured event. Text(String), } @@ -276,3 +301,7 @@ impl From for TimelyEvent { impl From for TimelyEvent { fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) } } + +impl From for TimelyEvent { + fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) } +} diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 73ec3d00c..e40771167 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -147,9 +147,12 @@ impl Worker { /// Performs one step of the computation. /// /// A step gives each dataflow operator a chance to run, and is the - /// main way to ensure that a computation proceeds. This method may - /// park the thread until there is work to perform, with an optional - /// timeout. + /// main way to ensure that a computation proceeds. + /// + /// This method takes an optional timeout and may park the thread until + /// there is work to perform or until this timeout expires. A value of + /// `None` allows the worker to park indefinitely, whereas a value of + /// `Some(Duration::new(0, 0))` will return without parking the thread. /// /// # Examples /// @@ -196,10 +199,19 @@ impl Worker { .borrow_mut() .advance(); - if self.activations.borrow().is_empty() { + // Consider parking only if we have no pending events, some dataflows, and a non-zero duration. + if self.activations.borrow().is_empty() && !self.dataflows.borrow().is_empty() && duration != Some(Duration::new(0,0)) { + + // Log parking and flush log. + self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::park(duration))); + self.logging.borrow_mut().flush(); + self.allocator .borrow() .await_events(duration); + + // Log return from unpark. + self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); } else { // Schedule active dataflows.