Skip to content

Commit

Permalink
Shared reference-counted operator path (#582)
Browse files Browse the repository at this point in the history
* Shared reference-counted operator path

Encapsulate the path for an operator as a `Rc<[usize]>` that can be shared
among all interested parties, instead of passing owned vectors around. This
avoids some memory allocations for rendering dataflow graphs.

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Sep 6, 2024
1 parent adbaf2d commit ca17c86
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 52 deletions.
7 changes: 4 additions & 3 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! The progress tracking logic assumes that this number is independent of the pact used.

use std::{fmt::{self, Debug}, marker::PhantomData};
use std::rc::Rc;

use crate::Container;
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
Expand All @@ -26,7 +27,7 @@ pub trait ParallelizationContract<T, C> {
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A direct connection
Expand All @@ -36,7 +37,7 @@ pub struct Pipeline;
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
Expand Down Expand Up @@ -72,7 +73,7 @@ where
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl<T: Timestamp> Debug for InputCapability<T> {
#[derive(Clone, Debug)]
pub struct ActivateCapability<T: Timestamp> {
pub(crate) capability: Capability<T>,
pub(crate) address: Rc<Vec<usize>>,
pub(crate) address: Rc<[usize]>,
pub(crate) activations: Rc<RefCell<Activations>>,
}

Expand All @@ -347,10 +347,10 @@ impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {

impl<T: Timestamp> ActivateCapability<T> {
/// Creates a new activating capability.
pub fn new(capability: Capability<T>, address: Vec<usize>, activations: Rc<RefCell<Activations>>) -> Self {
pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address: Rc::new(address),
address,
activations,
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
#[derive(Debug)]
struct Operator<T:Timestamp> {
name: String,
address: Vec<usize>,
address: Rc<[usize]>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
progress: Rc<RefCell<ChangeBatch<T>>>, // times closed since last asked
messages: Rc<RefCell<ChangeBatch<T>>>, // messages sent since last asked
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<G: Scope> UnorderedInput<G> for G {

struct UnorderedOperator<T:Timestamp> {
name: String,
address: Vec<usize>,
address: Rc<[usize]>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
internal: Rc<RefCell<ChangeBatch<T>>>,
produced: Rc<RefCell<ChangeBatch<T>>>,
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct OperatorBuilder<G: Scope> {
scope: G,
index: usize,
global: usize,
address: Vec<usize>, // path to the operator (ending with index).
address: Rc<[usize]>, // path to the operator (ending with index).
shape: OperatorShape,
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
}
Expand Down Expand Up @@ -184,7 +184,7 @@ where
L: FnMut(&mut SharedProgress<T>)->bool+'static,
{
shape: OperatorShape,
address: Vec<usize>,
address: Rc<[usize]>,
logic: L,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
activations: Rc<RefCell<Activations>>,
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/operators/generic/operator_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::rc::Rc;

/// Information about the operator being constructed
#[derive(Clone)]
Expand All @@ -7,12 +8,12 @@ pub struct OperatorInfo {
/// Worker-unique identifier.
pub global_id: usize,
/// Operator address.
pub address: Vec<usize>,
pub address: Rc<[usize]>,
}

impl OperatorInfo {
/// Construct a new `OperatorInfo`.
pub fn new(local_id: usize, global_id: usize, address: Vec<usize>) -> OperatorInfo {
pub fn new(local_id: usize, global_id: usize, address: Rc<[usize]>) -> OperatorInfo {
OperatorInfo {
local_id,
global_id,
Expand Down
14 changes: 7 additions & 7 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ where
fn config(&self) -> &Config { self.parent.config() }
fn index(&self) -> usize { self.parent.index() }
fn peers(&self) -> usize { self.parent.peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
self.parent.allocate(identifier, address)
}
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
self.parent.pipeline(identifier, address)
}
fn new_identifier(&mut self) -> usize {
Expand Down Expand Up @@ -96,14 +96,14 @@ where
T: Timestamp+Refines<G::Timestamp>,
{
fn name(&self) -> String { self.subgraph.borrow().name.clone() }
fn addr(&self) -> Vec<usize> { self.subgraph.borrow().path.clone() }
fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) }

fn addr_for_child(&self, index: usize) -> Vec<usize> {
fn addr_for_child(&self, index: usize) -> Rc<[usize]> {
let path = &self.subgraph.borrow().path[..];
let mut addr = Vec::with_capacity(path.len() + 1);
addr.extend_from_slice(path);
addr.push(index);
addr
addr.into()
}

fn add_edge(&self, source: Source, target: Target) {
Expand All @@ -125,9 +125,9 @@ where
F: FnOnce(&mut Child<Self, T2>) -> R,
{
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();
let path = self.addr_for_child(index);

let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging(), self.progress_logging.clone(), name));
let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), self.progress_logging.clone(), name));
let result = {
let mut builder = Child {
subgraph: &subscope,
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Hierarchical organization of timely dataflow graphs.

use std::rc::Rc;
use crate::progress::{Timestamp, Operate, Source, Target};
use crate::order::Product;
use crate::progress::timestamp::Refines;
Expand Down Expand Up @@ -31,11 +32,11 @@ pub trait Scope: ScopeParent {
fn name(&self) -> String;

/// A sequence of scope identifiers describing the path from the worker root to this scope.
fn addr(&self) -> Vec<usize>;
fn addr(&self) -> Rc<[usize]>;

/// A sequence of scope identifiers describing the path from the worker root to the child
/// indicated by `index`.
fn addr_for_child(&self, index: usize) -> Vec<usize>;
fn addr_for_child(&self, index: usize) -> Rc<[usize]>;

/// Connects a source of data with a target of the data. This only links the two for
/// the purposes of tracking progress, rather than effect any data movement itself.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<S: Scope, C: Container> StreamCore<S, C> {
let mut logging = self.scope().logging();
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
id: identifier,
scope_addr: self.scope.addr(),
scope_addr: self.scope.addr().to_vec(),
source: (self.name.node, self.name.port),
target: (target.node, target.port),
}));
Expand Down
9 changes: 5 additions & 4 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Broadcasts progress information among workers.

use std::rc::Rc;
use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::{Location, Port};
use crate::communication::{Message, Push, Pull};
Expand All @@ -22,7 +23,7 @@ pub struct Progcaster<T:Timestamp> {
/// Sequence number counter
counter: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this subgraph
addr: Vec<usize>,
addr: Rc<[usize]>,
/// Communication channel identifier
channel_identifier: usize,

Expand All @@ -31,7 +32,7 @@ pub struct Progcaster<T:Timestamp> {

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, addr.clone());
Expand Down Expand Up @@ -82,7 +83,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
source: self.source,
channel: self.channel_identifier,
seq_no: self.counter,
addr: self.addr.clone(),
addr: self.addr.to_vec(),
messages,
internal,
});
Expand Down Expand Up @@ -152,7 +153,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
source: source,
seq_no: counter,
channel,
addr: addr.clone(),
addr: addr.to_vec(),
messages,
internal,
});
Expand Down
10 changes: 5 additions & 5 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,26 +831,26 @@ fn summarize_outputs<T: Timestamp>(

/// Logging types for reachability tracking events.
pub mod logging {

use std::rc::Rc;
use crate::logging::{Logger, ProgressEventTimestampVec};

/// A logger with additional identifying information about the tracker.
pub struct TrackerLogger {
path: Vec<usize>,
path: Rc<[usize]>,
logger: Logger<TrackerEvent>,
}

impl TrackerLogger {
/// Create a new tracker logger from its fields.
pub fn new(path: Vec<usize>, logger: Logger<TrackerEvent>) -> Self {
pub fn new(path: Rc<[usize]>, logger: Logger<TrackerEvent>) -> Self {
Self { path, logger }
}

/// Log source update events with additional identifying information.
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
SourceUpdate {
tracker_id: self.path.clone(),
tracker_id: self.path.to_vec(),
updates,
}
})
Expand All @@ -859,7 +859,7 @@ pub mod logging {
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
TargetUpdate {
tracker_id: self.path.clone(),
tracker_id: self.path.to_vec(),
updates,
}
})
Expand Down
16 changes: 7 additions & 9 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
pub name: String,

/// A sequence of integers uniquely identifying the subgraph.
pub path: Vec<usize>,
pub path: Rc<[usize]>,

/// The index assigned to the subgraph by its parent.
index: usize,
Expand Down Expand Up @@ -94,20 +94,19 @@ where
self.edge_stash.push((source, target));
}

/// Creates a new Subgraph from a channel allocator and "descriptive" indices.
/// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph,
/// terminating with the local index of the new subgraph itself.
pub fn new_from(
index: usize,
mut path: Vec<usize>,
path: Rc<[usize]>,
logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
name: &str,
)
-> SubgraphBuilder<TOuter, TInner>
{
path.push(index);

// Put an empty placeholder for "outer scope" representative.
let children = vec![PerOperatorState::empty(0, 0)];
let index = path[path.len() - 1];

SubgraphBuilder {
name: name.to_owned(),
Expand Down Expand Up @@ -142,7 +141,7 @@ where
name: child.name().to_owned(),
});
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone()))
}

/// Now that initialization is complete, actually build a subgraph.
Expand Down Expand Up @@ -233,7 +232,7 @@ where
{
name: String, // an informative name.
/// Path of identifiers from the root.
pub path: Vec<usize>,
pub path: Rc<[usize]>,
inputs: usize, // number of inputs.
outputs: usize, // number of outputs.

Expand Down Expand Up @@ -640,7 +639,6 @@ impl<T: Timestamp> PerOperatorState<T> {
pub fn new(
mut scope: Box<dyn Operate<T>>,
index: usize,
mut _path: Vec<usize>,
identifier: usize,
logging: Option<Logger>
) -> PerOperatorState<T>
Expand Down
8 changes: 4 additions & 4 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ impl SyncActivations {
/// A capability to activate a specific path.
#[derive(Clone, Debug)]
pub struct Activator {
path: Vec<usize>,
path: Rc<[usize]>,
queue: Rc<RefCell<Activations>>,
}

impl Activator {
/// Creates a new activation handle
pub fn new(path: Vec<usize>, queue: Rc<RefCell<Activations>>) -> Self {
pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
Self {
path,
queue,
Expand Down Expand Up @@ -289,15 +289,15 @@ impl std::error::Error for SyncActivationError {}
#[derive(Clone, Debug)]
pub struct ActivateOnDrop<T> {
wrapped: T,
address: Rc<Vec<usize>>,
address: Rc<[usize]>,
activator: Rc<RefCell<Activations>>,
}

use std::ops::{Deref, DerefMut};

impl<T> ActivateOnDrop<T> {
/// Wraps an element so that it is unparked on drop.
pub fn new(wrapped: T, address: Rc<Vec<usize>>, activator: Rc<RefCell<Activations>>) -> Self {
pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
Self { wrapped, address, activator }
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub trait Scheduler {
fn activations(&self) -> Rc<RefCell<Activations>>;

/// Constructs an `Activator` tied to the specified operator address.
fn activator_for(&self, path: Vec<usize>) -> Activator {
fn activator_for(&self, path: Rc<[usize]>) -> Activator {
Activator::new(path, self.activations())
}

Expand Down
Loading

0 comments on commit ca17c86

Please sign in to comment.