Skip to content

Commit

Permalink
Avoid temporary allocations by improved APIs (#580)
Browse files Browse the repository at this point in the history
* Avoid temporary allocations by improved APIs

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Sep 6, 2024
1 parent 0da150a commit adbaf2d
Show file tree
Hide file tree
Showing 21 changed files with 61 additions and 49 deletions.
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn main() {

// Acquire a re-activator for this operator.
use timely::scheduling::Scheduler;
let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let mut cap = Some(capability);
move |output| {
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub trait ParallelizationContract<T, C> {
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A direct connection
Expand All @@ -36,7 +36,7 @@ pub struct Pipeline;
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
Expand Down Expand Up @@ -72,7 +72,7 @@ where
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {

impl<T: Timestamp> ActivateCapability<T> {
/// Creates a new activating capability.
pub fn new(capability: Capability<T>, address: &[usize], activations: Rc<RefCell<Activations>>) -> Self {
pub fn new(capability: Capability<T>, address: Vec<usize>, activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address: Rc::new(address.to_vec()),
address: Rc::new(address),
activations,
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());

let address = builder.operator_info().address;
let activator = scope.activator_for(&address[..]);
let activator = scope.activator_for(address);

let (targets, stream) = builder.new_output();

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T
let ingress = IngressNub {
targets: Counter::new(targets),
phantom: PhantomData,
activator: scope.activator_for(&scope.addr()),
activator: scope.activator_for(scope.addr()),
active: false,
};
let produced = ingress.targets.produced().clone();
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
let produced = counter.produced().clone();

let index = self.allocate_operator_index();
let mut address = self.addr();
address.push(index);
let address = self.addr_for_child(index);

handle.activate.push(self.activator_for(&address[..]));
handle.activate.push(self.activator_for(address.clone()));

let progress = Rc::new(RefCell::new(ChangeBatch::new()));

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I wh
source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {

// Acquire an activator, so that the operator can rescheduled itself.
let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let mut iterator = self.into_iter().fuse();
let mut capability = Some(capability);
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ impl<G: Scope> UnorderedInput<G> for G {
let peers = self.peers();

let index = self.allocate_operator_index();
let mut address = self.addr();
address.push(index);
let address = self.addr_for_child(index);

let cap = ActivateCapability::new(cap, &address, self.activations());
let cap = ActivateCapability::new(cap, address.clone(), self.activations());

let helper = UnorderedHandle::new(counter);

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/flow_controlled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn iterator_source<
let mut target = G::Timestamp::minimum();
source(scope, name, |cap, info| {
let mut cap = Some(cap);
let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
move |output| {
cap = cap.take().and_then(|mut cap| {
loop {
Expand Down
7 changes: 3 additions & 4 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ impl<G: Scope> OperatorBuilder<G> {

let global = scope.new_identifier();
let index = scope.allocate_operator_index();
let mut address = scope.addr();
address.push(index);
let address = scope.addr_for_child(index);
let peers = scope.peers();

OperatorBuilder {
Expand Down Expand Up @@ -119,7 +118,7 @@ impl<G: Scope> OperatorBuilder<G> {

let channel_id = self.scope.new_identifier();
let logging = self.scope.logging();
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging);
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, self.address.clone(), logging);
let target = Target::new(self.index, self.shape.inputs);
stream.connect_to(target, sender, channel_id);

Expand Down Expand Up @@ -175,7 +174,7 @@ impl<G: Scope> OperatorBuilder<G> {

/// Information describing the operator.
pub fn operator_info(&self) -> OperatorInfo {
OperatorInfo::new(self.index, self.global, &self.address[..])
OperatorInfo::new(self.index, self.global, self.address.clone())
}
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
where
P: ParallelizationContract<G::Timestamp, C> {

let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()];
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
self.new_input_connection(stream, pact, connection)
}

Expand Down Expand Up @@ -94,7 +94,7 @@ impl<G: Scope> OperatorBuilder<G> {

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()];
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
self.new_output_connection(connection)
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
///
/// source(scope, "Source", |capability, info| {
///
/// let activator = scope.activator_for(&info.address[..]);
/// let activator = scope.activator_for(info.address);
///
/// let mut cap = Some(capability);
/// move |output| {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/operator_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ pub struct OperatorInfo {

impl OperatorInfo {
/// Construct a new `OperatorInfo`.
pub fn new(local_id: usize, global_id: usize, address: &[usize]) -> OperatorInfo {
pub fn new(local_id: usize, global_id: usize, address: Vec<usize>) -> OperatorInfo {
OperatorInfo {
local_id,
global_id,
address: address.to_vec(),
address,
}
}
}
13 changes: 11 additions & 2 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ where
fn config(&self) -> &Config { self.parent.config() }
fn index(&self) -> usize { self.parent.index() }
fn peers(&self) -> usize { self.parent.peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
self.parent.allocate(identifier, address)
}
fn pipeline<D: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
self.parent.pipeline(identifier, address)
}
fn new_identifier(&mut self) -> usize {
Expand Down Expand Up @@ -97,6 +97,15 @@ where
{
fn name(&self) -> String { self.subgraph.borrow().name.clone() }
fn addr(&self) -> Vec<usize> { self.subgraph.borrow().path.clone() }

fn addr_for_child(&self, index: usize) -> Vec<usize> {
let path = &self.subgraph.borrow().path[..];
let mut addr = Vec::with_capacity(path.len() + 1);
addr.extend_from_slice(path);
addr.push(index);
addr
}

fn add_edge(&self, source: Source, target: Target) {
self.subgraph.borrow_mut().connect(source, target);
}
Expand Down
4 changes: 4 additions & 0 deletions timely/src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub trait Scope: ScopeParent {
/// A sequence of scope identifiers describing the path from the worker root to this scope.
fn addr(&self) -> Vec<usize>;

/// A sequence of scope identifiers describing the path from the worker root to the child
/// indicated by `index`.
fn addr_for_child(&self, index: usize) -> Vec<usize>;

/// Connects a source of data with a target of the data. This only links the two for
/// the purposes of tracking progress, rather than effect any data movement itself.
fn add_edge(&self, source: Source, target: Target);
Expand Down
5 changes: 2 additions & 3 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ pub struct Progcaster<T:Timestamp> {

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
let (pushers, puller) = worker.allocate(channel_identifier, addr.clone());
logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
identifier: channel_identifier,
kind: crate::logging::CommChannelKind::Progress,
}));
let worker_index = worker.index();
let addr = path.clone();
Progcaster {
to_push: None,
pushers,
Expand Down
15 changes: 9 additions & 6 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ where

/// Adds a new child to the subgraph.
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
{
let mut child_path = self.path.clone();
if let Some(l) = &mut self.logging {
let mut child_path = Vec::with_capacity(self.path.len() + 1);
child_path.extend_from_slice(&self.path[..]);
child_path.push(index);
self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {

l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name().to_owned(),
}));
});
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
}
Expand All @@ -163,7 +165,8 @@ where
let mut builder = reachability::Builder::new();

// Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]);
let summary = (0..outputs).map(|_| (0..inputs).map(|_| Antichain::new()).collect()).collect();
builder.add_node(0, outputs, inputs, summary);
for (index, child) in self.children.iter().enumerate().skip(1) {
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
}
Expand All @@ -181,7 +184,7 @@ where
.map(|logger| reachability::logging::TrackerLogger::new(path, logger));
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
let progcaster = Progcaster::new(worker, self.path.clone(), self.logging.clone(), self.progress_logging.clone());

let mut incomplete = vec![true; self.children.len()];
incomplete[0] = false;
Expand Down
8 changes: 4 additions & 4 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ pub struct Activator {

impl Activator {
/// Creates a new activation handle
pub fn new(path: &[usize], queue: Rc<RefCell<Activations>>) -> Self {
pub fn new(path: Vec<usize>, queue: Rc<RefCell<Activations>>) -> Self {
Self {
path: path.to_vec(),
path,
queue,
}
}
Expand Down Expand Up @@ -259,9 +259,9 @@ pub struct SyncActivator {

impl SyncActivator {
/// Creates a new thread-safe activation handle.
pub fn new(path: &[usize], queue: SyncActivations) -> Self {
pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
Self {
path: path.to_vec(),
path,
queue,
}
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ pub trait Scheduler {
fn activations(&self) -> Rc<RefCell<Activations>>;

/// Constructs an `Activator` tied to the specified operator address.
fn activator_for(&self, path: &[usize]) -> Activator {
fn activator_for(&self, path: Vec<usize>) -> Activator {
Activator::new(path, self.activations())
}

/// Constructs a `SyncActivator` tied to the specified operator address.
fn sync_activator_for(&self, path: &[usize]) -> SyncActivator {
fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator {
let sync_activations = self.activations().borrow().sync();
SyncActivator::new(path, sync_activations)
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/synchronization/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<T: ExchangeData> Sequencer<T> {
activator_source
.borrow_mut()
.replace(CatchupActivator {
activator: scope.activator_for(&info.address[..]),
activator: scope.activator_for(info.address),
catchup_until: None,
});

Expand Down
12 changes: 6 additions & 6 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ pub trait AsWorker : Scheduler {
/// scheduled in response to the receipt of records on the channel.
/// Most commonly, this would be the address of the *target* of the
/// channel.
fn allocate<T: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
fn allocate<T: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
/// Constructs a pipeline channel from the worker to itself.
///
/// By default this method uses the native channel allocation mechanism, but the expectation is
/// that this behavior will be overriden to be more efficient.
fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);

/// Allocates a new worker-unique identifier.
fn new_identifier(&mut self) -> usize;
Expand Down Expand Up @@ -231,17 +231,17 @@ impl<A: Allocate> AsWorker for Worker<A> {
fn config(&self) -> &Config { &self.config }
fn index(&self) -> usize { self.allocator.borrow().index() }
fn peers(&self) -> usize { self.allocator.borrow().peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
if address.is_empty() { panic!("Unacceptable address: Length zero"); }
let mut paths = self.paths.borrow_mut();
paths.insert(identifier, address.to_vec());
paths.insert(identifier, address);
self.temp_channel_ids.borrow_mut().push(identifier);
self.allocator.borrow_mut().allocate(identifier)
}
fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>) {
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>) {
if address.is_empty() { panic!("Unacceptable address: Length zero"); }
let mut paths = self.paths.borrow_mut();
paths.insert(identifier, address.to_vec());
paths.insert(identifier, address);
self.temp_channel_ids.borrow_mut().push(identifier);
self.allocator.borrow_mut().pipeline(identifier)
}
Expand Down

0 comments on commit adbaf2d

Please sign in to comment.