Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared reference-counted operator path #582

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! The progress tracking logic assumes that this number is independent of the pact used.

use std::{fmt::{self, Debug}, marker::PhantomData};
use std::rc::Rc;

use crate::Container;
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
Expand All @@ -26,7 +27,7 @@ pub trait ParallelizationContract<T, C> {
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A direct connection
Expand All @@ -36,7 +37,7 @@ pub struct Pipeline;
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
Expand Down Expand Up @@ -72,7 +73,7 @@ where
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Vec<usize>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl<T: Timestamp> Debug for InputCapability<T> {
#[derive(Clone, Debug)]
pub struct ActivateCapability<T: Timestamp> {
pub(crate) capability: Capability<T>,
pub(crate) address: Rc<Vec<usize>>,
pub(crate) address: Rc<[usize]>,
pub(crate) activations: Rc<RefCell<Activations>>,
}

Expand All @@ -347,10 +347,10 @@ impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {

impl<T: Timestamp> ActivateCapability<T> {
/// Creates a new activating capability.
pub fn new(capability: Capability<T>, address: Vec<usize>, activations: Rc<RefCell<Activations>>) -> Self {
pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address: Rc::new(address),
address,
activations,
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
#[derive(Debug)]
struct Operator<T:Timestamp> {
name: String,
address: Vec<usize>,
address: Rc<[usize]>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
progress: Rc<RefCell<ChangeBatch<T>>>, // times closed since last asked
messages: Rc<RefCell<ChangeBatch<T>>>, // messages sent since last asked
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<G: Scope> UnorderedInput<G> for G {

struct UnorderedOperator<T:Timestamp> {
name: String,
address: Vec<usize>,
address: Rc<[usize]>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
internal: Rc<RefCell<ChangeBatch<T>>>,
produced: Rc<RefCell<ChangeBatch<T>>>,
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct OperatorBuilder<G: Scope> {
scope: G,
index: usize,
global: usize,
address: Vec<usize>, // path to the operator (ending with index).
address: Rc<[usize]>, // path to the operator (ending with index).
shape: OperatorShape,
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
}
Expand Down Expand Up @@ -184,7 +184,7 @@ where
L: FnMut(&mut SharedProgress<T>)->bool+'static,
{
shape: OperatorShape,
address: Vec<usize>,
address: Rc<[usize]>,
logic: L,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
activations: Rc<RefCell<Activations>>,
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/operators/generic/operator_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::rc::Rc;

/// Information about the operator being constructed
#[derive(Clone)]
Expand All @@ -7,12 +8,12 @@ pub struct OperatorInfo {
/// Worker-unique identifier.
pub global_id: usize,
/// Operator address.
pub address: Vec<usize>,
pub address: Rc<[usize]>,
}

impl OperatorInfo {
/// Construct a new `OperatorInfo`.
pub fn new(local_id: usize, global_id: usize, address: Vec<usize>) -> OperatorInfo {
pub fn new(local_id: usize, global_id: usize, address: Rc<[usize]>) -> OperatorInfo {
OperatorInfo {
local_id,
global_id,
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ where
fn config(&self) -> &Config { self.parent.config() }
fn index(&self) -> usize { self.parent.index() }
fn peers(&self) -> usize { self.parent.peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: Vec<usize>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
self.parent.allocate(identifier, address)
}
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Vec<usize>) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
self.parent.pipeline(identifier, address)
}
fn new_identifier(&mut self) -> usize {
Expand Down Expand Up @@ -96,14 +96,14 @@ where
T: Timestamp+Refines<G::Timestamp>,
{
fn name(&self) -> String { self.subgraph.borrow().name.clone() }
fn addr(&self) -> Vec<usize> { self.subgraph.borrow().path.clone() }
fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) }

fn addr_for_child(&self, index: usize) -> Vec<usize> {
fn addr_for_child(&self, index: usize) -> Rc<[usize]> {
let path = &self.subgraph.borrow().path[..];
let mut addr = Vec::with_capacity(path.len() + 1);
addr.extend_from_slice(path);
addr.push(index);
addr
addr.into()
}

fn add_edge(&self, source: Source, target: Target) {
Expand All @@ -125,7 +125,7 @@ where
F: FnOnce(&mut Child<Self, T2>) -> R,
{
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();
let path = self.addr_for_child(index);
frankmcsherry marked this conversation as resolved.
Show resolved Hide resolved

let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging(), self.progress_logging.clone(), name));
let result = {
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Hierarchical organization of timely dataflow graphs.

use std::rc::Rc;
use crate::progress::{Timestamp, Operate, Source, Target};
use crate::order::Product;
use crate::progress::timestamp::Refines;
Expand Down Expand Up @@ -31,11 +32,11 @@ pub trait Scope: ScopeParent {
fn name(&self) -> String;

/// A sequence of scope identifiers describing the path from the worker root to this scope.
fn addr(&self) -> Vec<usize>;
fn addr(&self) -> Rc<[usize]>;

/// A sequence of scope identifiers describing the path from the worker root to the child
/// indicated by `index`.
fn addr_for_child(&self, index: usize) -> Vec<usize>;
fn addr_for_child(&self, index: usize) -> Rc<[usize]>;

/// Connects a source of data with a target of the data. This only links the two for
/// the purposes of tracking progress, rather than effect any data movement itself.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<S: Scope, C: Container> StreamCore<S, C> {
let mut logging = self.scope().logging();
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
id: identifier,
scope_addr: self.scope.addr(),
scope_addr: self.scope.addr().to_vec(),
source: (self.name.node, self.name.port),
target: (target.node, target.port),
}));
Expand Down
9 changes: 5 additions & 4 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Broadcasts progress information among workers.

use std::rc::Rc;
use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::{Location, Port};
use crate::communication::{Message, Push, Pull};
Expand All @@ -22,7 +23,7 @@ pub struct Progcaster<T:Timestamp> {
/// Sequence number counter
counter: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this subgraph
addr: Vec<usize>,
addr: Rc<[usize]>,
/// Communication channel identifier
channel_identifier: usize,

Expand All @@ -31,7 +32,7 @@ pub struct Progcaster<T:Timestamp> {

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, addr.clone());
Expand Down Expand Up @@ -82,7 +83,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
source: self.source,
channel: self.channel_identifier,
seq_no: self.counter,
addr: self.addr.clone(),
addr: self.addr.to_vec(),
messages,
internal,
});
Expand Down Expand Up @@ -152,7 +153,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
source: source,
seq_no: counter,
channel,
addr: addr.clone(),
addr: addr.to_vec(),
messages,
internal,
});
Expand Down
10 changes: 5 additions & 5 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,26 +831,26 @@ fn summarize_outputs<T: Timestamp>(

/// Logging types for reachability tracking events.
pub mod logging {

use std::rc::Rc;
use crate::logging::{Logger, ProgressEventTimestampVec};

/// A logger with additional identifying information about the tracker.
pub struct TrackerLogger {
path: Vec<usize>,
path: Rc<[usize]>,
logger: Logger<TrackerEvent>,
}

impl TrackerLogger {
/// Create a new tracker logger from its fields.
pub fn new(path: Vec<usize>, logger: Logger<TrackerEvent>) -> Self {
pub fn new(path: Rc<[usize]>, logger: Logger<TrackerEvent>) -> Self {
Self { path, logger }
}

/// Log source update events with additional identifying information.
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
SourceUpdate {
tracker_id: self.path.clone(),
tracker_id: self.path.to_vec(),
updates,
}
})
Expand All @@ -859,7 +859,7 @@ pub mod logging {
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
TargetUpdate {
tracker_id: self.path.clone(),
tracker_id: self.path.to_vec(),
updates,
}
})
Expand Down
11 changes: 4 additions & 7 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
pub name: String,

/// A sequence of integers uniquely identifying the subgraph.
pub path: Vec<usize>,
pub path: Rc<[usize]>,

/// The index assigned to the subgraph by its parent.
index: usize,
Expand Down Expand Up @@ -97,15 +97,13 @@ where
/// Creates a new Subgraph from a channel allocator and "descriptive" indices.
pub fn new_from(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better now! At least, I can't really tell why it made sense to provide the index and the path separately, and there is no self involved for which the path would make sense. I'm tempted to remove the line in the comment that you added, because "The Subgraph's index" is no longer a concept that anyone needs to know about. I would be tempted to "say more" in the comments, but the previous comments were so vague that it's hard to require it. In fact, .. I think the comments are just wrong? There is no channel allocator here yet.

What do you think about

/// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph, terminating with the local index of the new subgraph itself.

index: usize,
mut path: Vec<usize>,
path: Rc<[usize]>,
logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
name: &str,
)
-> SubgraphBuilder<TOuter, TInner>
{
path.push(index);

// Put an empty placeholder for "outer scope" representative.
let children = vec![PerOperatorState::empty(0, 0)];

Expand Down Expand Up @@ -142,7 +140,7 @@ where
name: child.name().to_owned(),
});
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone()))
}

/// Now that initialization is complete, actually build a subgraph.
Expand Down Expand Up @@ -233,7 +231,7 @@ where
{
name: String, // an informative name.
/// Path of identifiers from the root.
pub path: Vec<usize>,
pub path: Rc<[usize]>,
inputs: usize, // number of inputs.
outputs: usize, // number of outputs.

Expand Down Expand Up @@ -640,7 +638,6 @@ impl<T: Timestamp> PerOperatorState<T> {
pub fn new(
mut scope: Box<dyn Operate<T>>,
index: usize,
mut _path: Vec<usize>,
identifier: usize,
logging: Option<Logger>
) -> PerOperatorState<T>
Expand Down
8 changes: 4 additions & 4 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ impl SyncActivations {
/// A capability to activate a specific path.
#[derive(Clone, Debug)]
pub struct Activator {
path: Vec<usize>,
path: Rc<[usize]>,
queue: Rc<RefCell<Activations>>,
}

impl Activator {
/// Creates a new activation handle
pub fn new(path: Vec<usize>, queue: Rc<RefCell<Activations>>) -> Self {
pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
Self {
path,
queue,
Expand Down Expand Up @@ -289,15 +289,15 @@ impl std::error::Error for SyncActivationError {}
#[derive(Clone, Debug)]
pub struct ActivateOnDrop<T> {
wrapped: T,
address: Rc<Vec<usize>>,
address: Rc<[usize]>,
activator: Rc<RefCell<Activations>>,
}

use std::ops::{Deref, DerefMut};

impl<T> ActivateOnDrop<T> {
/// Wraps an element so that it is unparked on drop.
pub fn new(wrapped: T, address: Rc<Vec<usize>>, activator: Rc<RefCell<Activations>>) -> Self {
pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
Self { wrapped, address, activator }
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub trait Scheduler {
fn activations(&self) -> Rc<RefCell<Activations>>;

/// Constructs an `Activator` tied to the specified operator address.
fn activator_for(&self, path: Vec<usize>) -> Activator {
fn activator_for(&self, path: Rc<[usize]>) -> Activator {
Activator::new(path, self.activations())
}

Expand Down
Loading
Loading