From fb39a0dd78179de24d2a9787919bbac117d858fa Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 6 Sep 2024 20:20:09 +0200 Subject: [PATCH 1/3] Shared reference-counted operator path Encapsulate the path for an operator as a `Rc<[usize]>` that can be shared among all interested parties, instead of passing owned vectors around. This avoids some memory allocations for rendering dataflow graphs. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pact.rs | 7 ++++--- timely/src/dataflow/operators/capability.rs | 6 +++--- timely/src/dataflow/operators/core/input.rs | 2 +- .../src/dataflow/operators/core/unordered_input.rs | 2 +- timely/src/dataflow/operators/generic/builder_raw.rs | 4 ++-- .../src/dataflow/operators/generic/operator_info.rs | 5 +++-- timely/src/dataflow/scopes/child.rs | 12 ++++++------ timely/src/dataflow/scopes/mod.rs | 5 +++-- timely/src/dataflow/stream.rs | 2 +- timely/src/progress/broadcast.rs | 9 +++++---- timely/src/progress/reachability.rs | 10 +++++----- timely/src/progress/subgraph.rs | 11 ++++------- timely/src/scheduling/activate.rs | 8 ++++---- timely/src/scheduling/mod.rs | 2 +- timely/src/worker.rs | 12 ++++++------ 15 files changed, 49 insertions(+), 48 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index d32d0a32c..f2b2959f3 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -8,6 +8,7 @@ //! The progress tracking logic assumes that this number is independent of the pact used. use std::{fmt::{self, Debug}, marker::PhantomData}; +use std::rc::Rc; use crate::Container; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; @@ -26,7 +27,7 @@ pub trait ParallelizationContract { /// Type implementing `Pull` produced by this pact. type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. - fn connect(self, allocator: &mut A, identifier: usize, address: Vec, logging: Option) -> (Self::Pusher, Self::Puller); + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller); } /// A direct connection @@ -36,7 +37,7 @@ pub struct Pipeline; impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; - fn connect(self, allocator: &mut A, identifier: usize, address: Vec, logging: Option) -> (Self::Pusher, Self::Puller) { + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), LogPuller::new(puller, allocator.index(), identifier, logging)) @@ -72,7 +73,7 @@ where type Pusher = ExchangePusher>>>, H>; type Puller = LogPuller>>>; - fn connect(self, allocator: &mut A, identifier: usize, address: Vec, logging: Option) -> (Self::Pusher, Self::Puller) { + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 550b10fb1..59bdaded7 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -334,7 +334,7 @@ impl Debug for InputCapability { #[derive(Clone, Debug)] pub struct ActivateCapability { pub(crate) capability: Capability, - pub(crate) address: Rc>, + pub(crate) address: Rc<[usize]>, pub(crate) activations: Rc>, } @@ -347,10 +347,10 @@ impl CapabilityTrait for ActivateCapability { impl ActivateCapability { /// Creates a new activating capability. - pub fn new(capability: Capability, address: Vec, activations: Rc>) -> Self { + pub fn new(capability: Capability, address: Rc<[usize]>, activations: Rc>) -> Self { Self { capability, - address: Rc::new(address), + address, activations, } } diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 7438919e3..78ee65764 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -179,7 +179,7 @@ impl Input for G where ::Timestamp: TotalOrder { #[derive(Debug)] struct Operator { name: String, - address: Vec, + address: Rc<[usize]>, shared_progress: Rc>>, progress: Rc>>, // times closed since last asked messages: Rc>>, // messages sent since last asked diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index bcad2d8d8..48eb32677 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -112,7 +112,7 @@ impl UnorderedInput for G { struct UnorderedOperator { name: String, - address: Vec, + address: Rc<[usize]>, shared_progress: Rc>>, internal: Rc>>, produced: Rc>>, diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index bf0a379eb..8ba131612 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -58,7 +58,7 @@ pub struct OperatorBuilder { scope: G, index: usize, global: usize, - address: Vec, // path to the operator (ending with index). + address: Rc<[usize]>, // path to the operator (ending with index). shape: OperatorShape, summary: Vec::Summary>>>, } @@ -184,7 +184,7 @@ where L: FnMut(&mut SharedProgress)->bool+'static, { shape: OperatorShape, - address: Vec, + address: Rc<[usize]>, logic: L, shared_progress: Rc>>, activations: Rc>, diff --git a/timely/src/dataflow/operators/generic/operator_info.rs b/timely/src/dataflow/operators/generic/operator_info.rs index f2b5313f6..a8f18c1cf 100644 --- a/timely/src/dataflow/operators/generic/operator_info.rs +++ b/timely/src/dataflow/operators/generic/operator_info.rs @@ -1,3 +1,4 @@ +use std::rc::Rc; /// Information about the operator being constructed #[derive(Clone)] @@ -7,12 +8,12 @@ pub struct OperatorInfo { /// Worker-unique identifier. pub global_id: usize, /// Operator address. - pub address: Vec, + pub address: Rc<[usize]>, } impl OperatorInfo { /// Construct a new `OperatorInfo`. - pub fn new(local_id: usize, global_id: usize, address: Vec) -> OperatorInfo { + pub fn new(local_id: usize, global_id: usize, address: Rc<[usize]>) -> OperatorInfo { OperatorInfo { local_id, global_id, diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index dae65550e..770ab2b62 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -58,10 +58,10 @@ where fn config(&self) -> &Config { self.parent.config() } fn index(&self) -> usize { self.parent.index() } fn peers(&self) -> usize { self.parent.peers() } - fn allocate(&mut self, identifier: usize, address: Vec) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>>, Box>>) { self.parent.allocate(identifier, address) } - fn pipeline(&mut self, identifier: usize, address: Vec) -> (ThreadPusher>, ThreadPuller>) { + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher>, ThreadPuller>) { self.parent.pipeline(identifier, address) } fn new_identifier(&mut self) -> usize { @@ -96,14 +96,14 @@ where T: Timestamp+Refines, { fn name(&self) -> String { self.subgraph.borrow().name.clone() } - fn addr(&self) -> Vec { self.subgraph.borrow().path.clone() } + fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) } - fn addr_for_child(&self, index: usize) -> Vec { + fn addr_for_child(&self, index: usize) -> Rc<[usize]> { let path = &self.subgraph.borrow().path[..]; let mut addr = Vec::with_capacity(path.len() + 1); addr.extend_from_slice(path); addr.push(index); - addr + addr.into() } fn add_edge(&self, source: Source, target: Target) { @@ -125,7 +125,7 @@ where F: FnOnce(&mut Child) -> R, { let index = self.subgraph.borrow_mut().allocate_child_id(); - let path = self.subgraph.borrow().path.clone(); + let path = self.addr_for_child(index); let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging(), self.progress_logging.clone(), name)); let result = { diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index ba6566684..28cc141f7 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -1,5 +1,6 @@ //! Hierarchical organization of timely dataflow graphs. +use std::rc::Rc; use crate::progress::{Timestamp, Operate, Source, Target}; use crate::order::Product; use crate::progress::timestamp::Refines; @@ -31,11 +32,11 @@ pub trait Scope: ScopeParent { fn name(&self) -> String; /// A sequence of scope identifiers describing the path from the worker root to this scope. - fn addr(&self) -> Vec; + fn addr(&self) -> Rc<[usize]>; /// A sequence of scope identifiers describing the path from the worker root to the child /// indicated by `index`. - fn addr_for_child(&self, index: usize) -> Vec; + fn addr_for_child(&self, index: usize) -> Rc<[usize]>; /// Connects a source of data with a target of the data. This only links the two for /// the purposes of tracking progress, rather than effect any data movement itself. diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 173e777af..653976377 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -42,7 +42,7 @@ impl StreamCore { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { id: identifier, - scope_addr: self.scope.addr(), + scope_addr: self.scope.addr().to_vec(), source: (self.name.node, self.name.port), target: (target.node, target.port), })); diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 27a484dfc..d83d190e1 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,5 +1,6 @@ //! Broadcasts progress information among workers. +use std::rc::Rc; use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::{Location, Port}; use crate::communication::{Message, Push, Pull}; @@ -22,7 +23,7 @@ pub struct Progcaster { /// Sequence number counter counter: usize, /// Sequence of nested scope identifiers indicating the path from the root to this subgraph - addr: Vec, + addr: Rc<[usize]>, /// Communication channel identifier channel_identifier: usize, @@ -31,7 +32,7 @@ pub struct Progcaster { impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, addr: Vec, mut logging: Option, progress_logging: Option) -> Progcaster { + pub fn new(worker: &mut A, addr: Rc<[usize]>, mut logging: Option, progress_logging: Option) -> Progcaster { let channel_identifier = worker.new_identifier(); let (pushers, puller) = worker.allocate(channel_identifier, addr.clone()); @@ -82,7 +83,7 @@ impl Progcaster { source: self.source, channel: self.channel_identifier, seq_no: self.counter, - addr: self.addr.clone(), + addr: self.addr.to_vec(), messages, internal, }); @@ -152,7 +153,7 @@ impl Progcaster { source: source, seq_no: counter, channel, - addr: addr.clone(), + addr: addr.to_vec(), messages, internal, }); diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 5a1f1f6f6..0380216c8 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -831,18 +831,18 @@ fn summarize_outputs( /// Logging types for reachability tracking events. pub mod logging { - + use std::rc::Rc; use crate::logging::{Logger, ProgressEventTimestampVec}; /// A logger with additional identifying information about the tracker. pub struct TrackerLogger { - path: Vec, + path: Rc<[usize]>, logger: Logger, } impl TrackerLogger { /// Create a new tracker logger from its fields. - pub fn new(path: Vec, logger: Logger) -> Self { + pub fn new(path: Rc<[usize]>, logger: Logger) -> Self { Self { path, logger } } @@ -850,7 +850,7 @@ pub mod logging { pub fn log_source_updates(&mut self, updates: Box) { self.logger.log({ SourceUpdate { - tracker_id: self.path.clone(), + tracker_id: self.path.to_vec(), updates, } }) @@ -859,7 +859,7 @@ pub mod logging { pub fn log_target_updates(&mut self, updates: Box) { self.logger.log({ TargetUpdate { - tracker_id: self.path.clone(), + tracker_id: self.path.to_vec(), updates, } }) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 17a66e3c0..283a20cc0 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -45,7 +45,7 @@ where pub name: String, /// A sequence of integers uniquely identifying the subgraph. - pub path: Vec, + pub path: Rc<[usize]>, /// The index assigned to the subgraph by its parent. index: usize, @@ -97,15 +97,13 @@ where /// Creates a new Subgraph from a channel allocator and "descriptive" indices. pub fn new_from( index: usize, - mut path: Vec, + path: Rc<[usize]>, logging: Option, progress_logging: Option, name: &str, ) -> SubgraphBuilder { - path.push(index); - // Put an empty placeholder for "outer scope" representative. let children = vec![PerOperatorState::empty(0, 0)]; @@ -142,7 +140,7 @@ where name: child.name().to_owned(), }); } - self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone())) + self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone())) } /// Now that initialization is complete, actually build a subgraph. @@ -233,7 +231,7 @@ where { name: String, // an informative name. /// Path of identifiers from the root. - pub path: Vec, + pub path: Rc<[usize]>, inputs: usize, // number of inputs. outputs: usize, // number of outputs. @@ -640,7 +638,6 @@ impl PerOperatorState { pub fn new( mut scope: Box>, index: usize, - mut _path: Vec, identifier: usize, logging: Option ) -> PerOperatorState diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 04c7091a1..3046f62af 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -218,13 +218,13 @@ impl SyncActivations { /// A capability to activate a specific path. #[derive(Clone, Debug)] pub struct Activator { - path: Vec, + path: Rc<[usize]>, queue: Rc>, } impl Activator { /// Creates a new activation handle - pub fn new(path: Vec, queue: Rc>) -> Self { + pub fn new(path: Rc<[usize]>, queue: Rc>) -> Self { Self { path, queue, @@ -289,7 +289,7 @@ impl std::error::Error for SyncActivationError {} #[derive(Clone, Debug)] pub struct ActivateOnDrop { wrapped: T, - address: Rc>, + address: Rc<[usize]>, activator: Rc>, } @@ -297,7 +297,7 @@ use std::ops::{Deref, DerefMut}; impl ActivateOnDrop { /// Wraps an element so that it is unparked on drop. - pub fn new(wrapped: T, address: Rc>, activator: Rc>) -> Self { + pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc>) -> Self { Self { wrapped, address, activator } } } diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs index 6e36d34f7..fef24d1f9 100644 --- a/timely/src/scheduling/mod.rs +++ b/timely/src/scheduling/mod.rs @@ -26,7 +26,7 @@ pub trait Scheduler { fn activations(&self) -> Rc>; /// Constructs an `Activator` tied to the specified operator address. - fn activator_for(&self, path: Vec) -> Activator { + fn activator_for(&self, path: Rc<[usize]>) -> Activator { Activator::new(path, self.activations()) } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 6809e080e..75369e028 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -191,12 +191,12 @@ pub trait AsWorker : Scheduler { /// scheduled in response to the receipt of records on the channel. /// Most commonly, this would be the address of the *target* of the /// channel. - fn allocate(&mut self, identifier: usize, address: Vec) -> (Vec>>>, Box>>); + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>>, Box>>); /// Constructs a pipeline channel from the worker to itself. /// /// By default this method uses the native channel allocation mechanism, but the expectation is /// that this behavior will be overriden to be more efficient. - fn pipeline(&mut self, identifier: usize, address: Vec) -> (ThreadPusher>, ThreadPuller>); + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher>, ThreadPuller>); /// Allocates a new worker-unique identifier. fn new_identifier(&mut self) -> usize; @@ -211,7 +211,7 @@ pub trait AsWorker : Scheduler { pub struct Worker { config: Config, timer: Instant, - paths: Rc>>>, + paths: Rc>>>, allocator: Rc>, identifiers: Rc>, // dataflows: Rc>>, @@ -231,14 +231,14 @@ impl AsWorker for Worker { fn config(&self) -> &Config { &self.config } fn index(&self) -> usize { self.allocator.borrow().index() } fn peers(&self) -> usize { self.allocator.borrow().peers() } - fn allocate(&mut self, identifier: usize, address: Vec) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>>, Box>>) { if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); paths.insert(identifier, address); self.temp_channel_ids.borrow_mut().push(identifier); self.allocator.borrow_mut().allocate(identifier) } - fn pipeline(&mut self, identifier: usize, address: Vec) -> (ThreadPusher>, ThreadPuller>) { + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher>, ThreadPuller>) { if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); paths.insert(identifier, address); @@ -622,8 +622,8 @@ impl Worker { F: FnOnce(&mut V, &mut Child)->R, V: Any+'static, { - let addr = vec![]; let dataflow_index = self.allocate_dataflow_index(); + let addr = vec![dataflow_index].into(); let identifier = self.new_identifier(); let progress_logging = self.logging.borrow_mut().get("timely/progress"); From 3e445591a49a8093a9c93fb448ed32e7e1db0bd3 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 6 Sep 2024 20:37:07 +0200 Subject: [PATCH 2/3] SubgraphBuilder::new_from only takes path Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/scopes/child.rs | 2 +- timely/src/progress/subgraph.rs | 5 +++-- timely/src/worker.rs | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 770ab2b62..2aed37887 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -127,7 +127,7 @@ where let index = self.subgraph.borrow_mut().allocate_child_id(); let path = self.addr_for_child(index); - let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging(), self.progress_logging.clone(), name)); + let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), self.progress_logging.clone(), name)); let result = { let mut builder = Child { subgraph: &subscope, diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 283a20cc0..0f3657044 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -94,9 +94,9 @@ where self.edge_stash.push((source, target)); } - /// Creates a new Subgraph from a channel allocator and "descriptive" indices. + /// Creates a new Subgraph from a channel allocator and "descriptive" indices. The Subgraph's + /// index is the last element of `path`. pub fn new_from( - index: usize, path: Rc<[usize]>, logging: Option, progress_logging: Option, @@ -106,6 +106,7 @@ where { // Put an empty placeholder for "outer scope" representative. let children = vec![PerOperatorState::empty(0, 0)]; + let index = path[path.len() - 1]; SubgraphBuilder { name: name.to_owned(), diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 75369e028..de335d734 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -627,7 +627,7 @@ impl Worker { let identifier = self.new_identifier(); let progress_logging = self.logging.borrow_mut().get("timely/progress"); - let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name); + let subscope = SubgraphBuilder::new_from(addr, logging.clone(), progress_logging.clone(), name); let subscope = RefCell::new(subscope); let result = { From 55a9889eee9810647343b2718bbcb0113ae96539 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 6 Sep 2024 22:42:23 +0200 Subject: [PATCH 3/3] Review comments Signed-off-by: Moritz Hoffmann --- timely/src/progress/subgraph.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 0f3657044..68c3bf93d 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -94,8 +94,8 @@ where self.edge_stash.push((source, target)); } - /// Creates a new Subgraph from a channel allocator and "descriptive" indices. The Subgraph's - /// index is the last element of `path`. + /// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph, + /// terminating with the local index of the new subgraph itself. pub fn new_from( path: Rc<[usize]>, logging: Option,