Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further step_or_park implementations [WIP] #268

Merged
merged 8 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## Unreleased

### Added

A `Worker` now has a `step_or_park(Option<Duration>)` method, which instructs the worker to take a step and gives it permission to part the worker thread for at most the supplied timeout if there is no work to perform. A value of `None` implies no timeout (unboundedly parked) whereas a value of `Some(0)` should return immediately. The communication layers are implemented to awaken workers if they receive new communications, and workers should hand out copies of their `Thread` if they want other threads to wake them for other reasons (e.g. queues from threads external to timely).

## 0.9.0

### Added
Expand Down
5 changes: 3 additions & 2 deletions communication/src/allocator/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// else {
// self.count += 1;
// }
let _ =
self.events
.send((self.index, Event::Pushed(1)))
.send((self.index, Event::Pushed(1)));
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
.expect("Failed to send message count");
// .expect("Failed to send message count");

self.pusher.push(element)
}
Expand Down
8 changes: 8 additions & 0 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ impl Allocate for Generic {
fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
&Generic::Thread(ref t) => t.await_events(_duration),
&Generic::Process(ref p) => p.await_events(_duration),
&Generic::ProcessBinary(ref pb) => pb.await_events(_duration),
&Generic::ZeroCopy(ref z) => z.await_events(_duration),
}
}
}


Expand Down
43 changes: 39 additions & 4 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::sync::mpsc::{Sender, Receiver, channel};
use std::time::Duration;
use std::collections::{HashMap, VecDeque};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
use crate::{Push, Pull, Message};
use crate::buzzer::Buzzer;

/// An allocator for inter-thread, intra-process communication
pub struct ProcessBuilder {
Expand All @@ -19,18 +21,34 @@ pub struct ProcessBuilder {
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap<usize, Box<Any+Send>>>>,

// Buzzers for waking other local workers.
buzzers_send: Vec<Sender<Buzzer>>,
buzzers_recv: Vec<Receiver<Buzzer>>,

counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
}

impl AllocateBuilder for ProcessBuilder {
type Allocator = Process;
fn build(self) -> Self::Allocator {

// Initialize buzzers; send first, then recv.
for worker in self.buzzers_send.iter() {
let buzzer = Buzzer::new();
worker.send(buzzer).expect("Failed to send buzzer");
}
let mut buzzers = Vec::new();
for worker in self.buzzers_recv.iter() {
buzzers.push(worker.recv().expect("Failed to recv buzzer"));
}

Process {
inner: self.inner.build(),
index: self.index,
peers: self.peers,
channels: self.channels,
buzzers,
counters_send: self.counters_send,
counters_recv: self.counters_recv,
}
Expand All @@ -44,6 +62,7 @@ pub struct Process {
peers: usize,
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<Any+Send>>>>,
buzzers: Vec<Buzzer>,
counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
}
Expand All @@ -64,14 +83,21 @@ impl Process {

let channels = Arc::new(Mutex::new(HashMap::new()));

// Allocate matrix of buzzer send and recv endpoints.
let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers);

counters_recv
.into_iter()
.zip(buzzers_send.into_iter())
.zip(buzzers_recv.into_iter())
.enumerate()
.map(|(index, recv)| {
.map(|(index, ((recv, bsend), brecv))| {
ProcessBuilder {
inner: ThreadBuilder,
index,
peers,
buzzers_send: bsend,
buzzers_recv: brecv,
channels: channels.clone(),
counters_send: counters_send.clone(),
counters_recv: recv,
Expand Down Expand Up @@ -99,10 +125,10 @@ impl Allocate for Process {

let mut pushers = Vec::new();
let mut pullers = Vec::new();
for _ in 0..self.peers {
for index in 0 .. self.peers {

let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = channel();
pushers.push(Pusher { target: s });
pushers.push(Pusher { target: s, buzzer: self.buzzers[index].clone() });
pullers.push(Puller { source: r, current: None });
}

Expand Down Expand Up @@ -152,6 +178,10 @@ impl Allocate for Process {
self.inner.events()
}

fn await_events(&self, duration: Option<Duration>) {
self.inner.await_events(duration);
}

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok((index, event)) = self.counters_recv.try_recv() {
Expand All @@ -163,18 +193,23 @@ impl Allocate for Process {
/// The push half of an intra-process channel.
struct Pusher<T> {
target: Sender<T>,
buzzer: Buzzer,
}

impl<T> Clone for Pusher<T> {
fn clone(&self) -> Self {
Pusher { target: self.target.clone() }
Self {
target: self.target.clone(),
buzzer: self.buzzer.clone()
}
}
}

impl<T> Push<T> for Pusher<T> {
#[inline] fn push(&mut self, element: &mut Option<T>) {
if let Some(element) = element.take() {
self.target.send(element).unwrap();
self.buzzer.buzz();
}
}
}
Expand Down
102 changes: 48 additions & 54 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::{VecDeque, HashMap};
// use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::mpsc::{Sender, Receiver};

use bytes::arc::Bytes;

use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::allocator::{Event, Process};
use crate::allocator::process::ProcessBuilder;
use crate::allocator::Event;
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue, Signal};
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
use super::push_pull::{Pusher, PullerInner};

/// Builds an instance of a TcpAllocator.
Expand All @@ -24,12 +23,11 @@ use super::push_pull::{Pusher, PullerInner};
/// shared between threads here, and then provide a method that will instantiate the non-movable
/// members once in the destination thread.
pub struct TcpBuilder<A: AllocateBuilder> {
inner: A,
index: usize, // number out of peers
peers: usize, // number of peer allocators.
sends: Vec<MergeQueue>, // for pushing bytes at remote processes.
recvs: Vec<MergeQueue>, // for pulling bytes from remote processes.
signal: Signal,
inner: A,
index: usize, // number out of peers
peers: usize, // number of peer allocators.
futures: Vec<Receiver<MergeQueue>>, // to receive queues to each network thread.
promises: Vec<Sender<MergeQueue>>, // to send queues from each network thread.
}

/// Creates a vector of builders, sharing appropriate state.
Expand All @@ -44,77 +42,72 @@ pub struct TcpBuilder<A: AllocateBuilder> {
/// info to spawn ingress comm thresds,
/// )
/// ```
pub fn new_vector(
pub fn new_vector<A: AllocateBuilder>(
allocators: Vec<A>,
my_process: usize,
threads: usize,
processes: usize)
-> (Vec<TcpBuilder<ProcessBuilder>>,
Vec<(Vec<MergeQueue>, Signal)>,
Vec<Vec<MergeQueue>>) {
-> (Vec<TcpBuilder<A>>,
Vec<Vec<Sender<MergeQueue>>>,
Vec<Vec<Receiver<MergeQueue>>>)
{
let threads = allocators.len();

// The results are a vector of builders, as well as the necessary shared state to build each
// of the send and receive communication threads, respectively.

// One signal per local destination worker thread
let worker_signals: Vec<Signal> = (0 .. threads).map(|_| Signal::new()).collect();

// One signal per destination egress communication thread
let network_signals: Vec<Signal> = (0 .. processes-1).map(|_| Signal::new()).collect();

let worker_to_network: Vec<Vec<_>> = (0 .. threads).map(|_| (0 .. processes-1).map(|p| MergeQueue::new(network_signals[p].clone())).collect()).collect();
let network_to_worker: Vec<Vec<_>> = (0 .. processes-1).map(|_| (0 .. threads).map(|t| MergeQueue::new(worker_signals[t].clone())).collect()).collect();

let worker_from_network: Vec<Vec<_>> = (0 .. threads).map(|t| (0 .. processes-1).map(|p| network_to_worker[p][t].clone()).collect()).collect();
let network_from_worker: Vec<Vec<_>> = (0 .. processes-1).map(|p| (0 .. threads).map(|t| worker_to_network[t][p].clone()).collect()).collect();
// For queues from worker threads to network threads, and vice versa.
let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);

let builders =
Process::new_vector(threads) // Vec<Process> (Process is Allocate)
allocators
.into_iter()
.zip(worker_signals)
.zip(worker_to_network)
.zip(worker_from_network)
.zip(worker_promises)
.zip(worker_futures)
.enumerate()
.map(|(index, (((inner, signal), sends), recvs))| {
// sends are handles to MergeQueues to remote processes
// (one per remote process)
// recvs are handles to MergeQueues from remote processes
// (one per remote process)
.map(|(index, ((inner, promises), futures))| {
TcpBuilder {
inner,
index: my_process * threads + index,
peers: threads * processes,
sends,
recvs,
signal,
promises,
futures,
}})
.collect();

// for each egress communicaton thread, construct the tuple (MergeQueues from local
// threads, corresponding signal)
let sends = network_from_worker.into_iter().zip(network_signals).collect();

(/* AllocateBuilder for local threads */ builders,
/* info to spawn egress comm threads */ sends,
/* info to spawn ingress comm thresds */ network_to_worker)
(builders, network_promises, network_futures)
}

impl<A: AllocateBuilder> TcpBuilder<A> {

/// Builds a `TcpAllocator`, instantiating `Rc<RefCell<_>>` elements.
pub fn build(self) -> TcpAllocator<A::Allocator> {

let sends: Vec<_> = self.sends.into_iter().map(
|send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for promise in self.promises.into_iter() {
let buzzer = crate::buzzer::Buzzer::new();
let queue = MergeQueue::new(buzzer);
promise.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
}

// Extract pusher commitments.
let mut sends = Vec::with_capacity(self.peers);
for pusher in self.futures.into_iter() {
let queue = pusher.recv().expect("Failed to receive push queue");
let sendpoint = SendEndpoint::new(queue);
sends.push(Rc::new(RefCell::new(sendpoint)));
}

// let sends: Vec<_> = self.sends.into_iter().map(
// |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();

TcpAllocator {
inner: self.inner.build(),
index: self.index,
peers: self.peers,
_signal: self.signal,
canaries: Rc::new(RefCell::new(Vec::new())),
staged: Vec::new(),
sends,
recvs: self.recvs,
recvs,
to_local: HashMap::new(),
}
}
Expand All @@ -128,8 +121,6 @@ pub struct TcpAllocator<A: Allocate> {
index: usize, // number out of peers
peers: usize, // number of peer allocators (for typed channel allocation).

_signal: Signal,

staged: Vec<Bytes>, // staging area for incoming Bytes
canaries: Rc<RefCell<Vec<usize>>>,

Expand Down Expand Up @@ -260,4 +251,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}
Loading