Skip to content

Commit

Permalink
Merge pull request #268 from TimelyDataflow/chill
Browse files Browse the repository at this point in the history
Further `step_or_park` implementations [WIP]
  • Loading branch information
frankmcsherry committed May 5, 2019
2 parents 6d75370 + 41b4d6f commit ef2f213
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 156 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## Unreleased

### Added

A `Worker` now has a `step_or_park(Option<Duration>)` method, which instructs the worker to take a step and gives it permission to part the worker thread for at most the supplied timeout if there is no work to perform. A value of `None` implies no timeout (unboundedly parked) whereas a value of `Some(0)` should return immediately. The communication layers are implemented to awaken workers if they receive new communications, and workers should hand out copies of their `Thread` if they want other threads to wake them for other reasons (e.g. queues from threads external to timely).

## 0.9.0

### Added
Expand Down
5 changes: 3 additions & 2 deletions communication/src/allocator/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// else {
// self.count += 1;
// }
let _ =
self.events
.send((self.index, Event::Pushed(1)))
.send((self.index, Event::Pushed(1)));
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
.expect("Failed to send message count");
// .expect("Failed to send message count");

self.pusher.push(element)
}
Expand Down
8 changes: 8 additions & 0 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ impl Allocate for Generic {
fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
&Generic::Thread(ref t) => t.await_events(_duration),
&Generic::Process(ref p) => p.await_events(_duration),
&Generic::ProcessBinary(ref pb) => pb.await_events(_duration),
&Generic::ZeroCopy(ref z) => z.await_events(_duration),
}
}
}


Expand Down
43 changes: 39 additions & 4 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::sync::mpsc::{Sender, Receiver, channel};
use std::time::Duration;
use std::collections::{HashMap, VecDeque};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
use crate::{Push, Pull, Message};
use crate::buzzer::Buzzer;

/// An allocator for inter-thread, intra-process communication
pub struct ProcessBuilder {
Expand All @@ -19,18 +21,34 @@ pub struct ProcessBuilder {
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap<usize, Box<Any+Send>>>>,

// Buzzers for waking other local workers.
buzzers_send: Vec<Sender<Buzzer>>,
buzzers_recv: Vec<Receiver<Buzzer>>,

counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
}

impl AllocateBuilder for ProcessBuilder {
type Allocator = Process;
fn build(self) -> Self::Allocator {

// Initialize buzzers; send first, then recv.
for worker in self.buzzers_send.iter() {
let buzzer = Buzzer::new();
worker.send(buzzer).expect("Failed to send buzzer");
}
let mut buzzers = Vec::new();
for worker in self.buzzers_recv.iter() {
buzzers.push(worker.recv().expect("Failed to recv buzzer"));
}

Process {
inner: self.inner.build(),
index: self.index,
peers: self.peers,
channels: self.channels,
buzzers,
counters_send: self.counters_send,
counters_recv: self.counters_recv,
}
Expand All @@ -44,6 +62,7 @@ pub struct Process {
peers: usize,
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<Any+Send>>>>,
buzzers: Vec<Buzzer>,
counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
}
Expand All @@ -64,14 +83,21 @@ impl Process {

let channels = Arc::new(Mutex::new(HashMap::new()));

// Allocate matrix of buzzer send and recv endpoints.
let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers);

counters_recv
.into_iter()
.zip(buzzers_send.into_iter())
.zip(buzzers_recv.into_iter())
.enumerate()
.map(|(index, recv)| {
.map(|(index, ((recv, bsend), brecv))| {
ProcessBuilder {
inner: ThreadBuilder,
index,
peers,
buzzers_send: bsend,
buzzers_recv: brecv,
channels: channels.clone(),
counters_send: counters_send.clone(),
counters_recv: recv,
Expand Down Expand Up @@ -99,10 +125,10 @@ impl Allocate for Process {

let mut pushers = Vec::new();
let mut pullers = Vec::new();
for _ in 0..self.peers {
for index in 0 .. self.peers {

let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = channel();
pushers.push(Pusher { target: s });
pushers.push(Pusher { target: s, buzzer: self.buzzers[index].clone() });
pullers.push(Puller { source: r, current: None });
}

Expand Down Expand Up @@ -152,6 +178,10 @@ impl Allocate for Process {
self.inner.events()
}

fn await_events(&self, duration: Option<Duration>) {
self.inner.await_events(duration);
}

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok((index, event)) = self.counters_recv.try_recv() {
Expand All @@ -163,18 +193,23 @@ impl Allocate for Process {
/// The push half of an intra-process channel.
struct Pusher<T> {
target: Sender<T>,
buzzer: Buzzer,
}

impl<T> Clone for Pusher<T> {
fn clone(&self) -> Self {
Pusher { target: self.target.clone() }
Self {
target: self.target.clone(),
buzzer: self.buzzer.clone()
}
}
}

impl<T> Push<T> for Pusher<T> {
#[inline] fn push(&mut self, element: &mut Option<T>) {
if let Some(element) = element.take() {
self.target.send(element).unwrap();
self.buzzer.buzz();
}
}
}
Expand Down
102 changes: 48 additions & 54 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::{VecDeque, HashMap};
// use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::mpsc::{Sender, Receiver};

use bytes::arc::Bytes;

use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::allocator::{Event, Process};
use crate::allocator::process::ProcessBuilder;
use crate::allocator::Event;
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue, Signal};
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
use super::push_pull::{Pusher, PullerInner};

/// Builds an instance of a TcpAllocator.
Expand All @@ -24,12 +23,11 @@ use super::push_pull::{Pusher, PullerInner};
/// shared between threads here, and then provide a method that will instantiate the non-movable
/// members once in the destination thread.
pub struct TcpBuilder<A: AllocateBuilder> {
inner: A,
index: usize, // number out of peers
peers: usize, // number of peer allocators.
sends: Vec<MergeQueue>, // for pushing bytes at remote processes.
recvs: Vec<MergeQueue>, // for pulling bytes from remote processes.
signal: Signal,
inner: A,
index: usize, // number out of peers
peers: usize, // number of peer allocators.
futures: Vec<Receiver<MergeQueue>>, // to receive queues to each network thread.
promises: Vec<Sender<MergeQueue>>, // to send queues from each network thread.
}

/// Creates a vector of builders, sharing appropriate state.
Expand All @@ -44,77 +42,72 @@ pub struct TcpBuilder<A: AllocateBuilder> {
/// info to spawn ingress comm thresds,
/// )
/// ```
pub fn new_vector(
pub fn new_vector<A: AllocateBuilder>(
allocators: Vec<A>,
my_process: usize,
threads: usize,
processes: usize)
-> (Vec<TcpBuilder<ProcessBuilder>>,
Vec<(Vec<MergeQueue>, Signal)>,
Vec<Vec<MergeQueue>>) {
-> (Vec<TcpBuilder<A>>,
Vec<Vec<Sender<MergeQueue>>>,
Vec<Vec<Receiver<MergeQueue>>>)
{
let threads = allocators.len();

// The results are a vector of builders, as well as the necessary shared state to build each
// of the send and receive communication threads, respectively.

// One signal per local destination worker thread
let worker_signals: Vec<Signal> = (0 .. threads).map(|_| Signal::new()).collect();

// One signal per destination egress communication thread
let network_signals: Vec<Signal> = (0 .. processes-1).map(|_| Signal::new()).collect();

let worker_to_network: Vec<Vec<_>> = (0 .. threads).map(|_| (0 .. processes-1).map(|p| MergeQueue::new(network_signals[p].clone())).collect()).collect();
let network_to_worker: Vec<Vec<_>> = (0 .. processes-1).map(|_| (0 .. threads).map(|t| MergeQueue::new(worker_signals[t].clone())).collect()).collect();

let worker_from_network: Vec<Vec<_>> = (0 .. threads).map(|t| (0 .. processes-1).map(|p| network_to_worker[p][t].clone()).collect()).collect();
let network_from_worker: Vec<Vec<_>> = (0 .. processes-1).map(|p| (0 .. threads).map(|t| worker_to_network[t][p].clone()).collect()).collect();
// For queues from worker threads to network threads, and vice versa.
let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);

let builders =
Process::new_vector(threads) // Vec<Process> (Process is Allocate)
allocators
.into_iter()
.zip(worker_signals)
.zip(worker_to_network)
.zip(worker_from_network)
.zip(worker_promises)
.zip(worker_futures)
.enumerate()
.map(|(index, (((inner, signal), sends), recvs))| {
// sends are handles to MergeQueues to remote processes
// (one per remote process)
// recvs are handles to MergeQueues from remote processes
// (one per remote process)
.map(|(index, ((inner, promises), futures))| {
TcpBuilder {
inner,
index: my_process * threads + index,
peers: threads * processes,
sends,
recvs,
signal,
promises,
futures,
}})
.collect();

// for each egress communicaton thread, construct the tuple (MergeQueues from local
// threads, corresponding signal)
let sends = network_from_worker.into_iter().zip(network_signals).collect();

(/* AllocateBuilder for local threads */ builders,
/* info to spawn egress comm threads */ sends,
/* info to spawn ingress comm thresds */ network_to_worker)
(builders, network_promises, network_futures)
}

impl<A: AllocateBuilder> TcpBuilder<A> {

/// Builds a `TcpAllocator`, instantiating `Rc<RefCell<_>>` elements.
pub fn build(self) -> TcpAllocator<A::Allocator> {

let sends: Vec<_> = self.sends.into_iter().map(
|send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for promise in self.promises.into_iter() {
let buzzer = crate::buzzer::Buzzer::new();
let queue = MergeQueue::new(buzzer);
promise.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
}

// Extract pusher commitments.
let mut sends = Vec::with_capacity(self.peers);
for pusher in self.futures.into_iter() {
let queue = pusher.recv().expect("Failed to receive push queue");
let sendpoint = SendEndpoint::new(queue);
sends.push(Rc::new(RefCell::new(sendpoint)));
}

// let sends: Vec<_> = self.sends.into_iter().map(
// |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();

TcpAllocator {
inner: self.inner.build(),
index: self.index,
peers: self.peers,
_signal: self.signal,
canaries: Rc::new(RefCell::new(Vec::new())),
staged: Vec::new(),
sends,
recvs: self.recvs,
recvs,
to_local: HashMap::new(),
}
}
Expand All @@ -128,8 +121,6 @@ pub struct TcpAllocator<A: Allocate> {
index: usize, // number out of peers
peers: usize, // number of peer allocators (for typed channel allocation).

_signal: Signal,

staged: Vec<Bytes>, // staging area for incoming Bytes
canaries: Rc<RefCell<Vec<usize>>>,

Expand Down Expand Up @@ -260,4 +251,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}
Loading

0 comments on commit ef2f213

Please sign in to comment.