diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 23228f4e8..518fce32f 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -29,13 +29,13 @@ fn main() { println!("PROGRESS: {:?}", x); let (_, _, ev) = x; print!("PROGRESS: TYPED MESSAGES: "); - for (n, p, t, d) in ev.messages.iter() { - print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + for (l, t, d) in ev.messages.iter() { + print!("{:?}, ", (l, t.as_any().downcast_ref::(), d)); } println!(); print!("PROGRESS: TYPED INTERNAL: "); - for (n, p, t, d) in ev.internal.iter() { - print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + for (l, t, d) in ev.internal.iter() { + print!("{:?}, ", (l, t.as_any().downcast_ref::(), d)); } println!(); }) diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ca9868fbe..54cdaa6de 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -11,6 +11,7 @@ pub type TimelyProgressLogger = Logger; use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; +use crate::progress::Location; /// Logs events as a timely stream, with progress statements. pub struct BatchLogger where P: EventPusher { @@ -80,10 +81,15 @@ pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any { /// /// # Example /// ```rust - /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)]; + /// use timely::progress::Location; + /// + /// let ts = vec![ + /// (Location::new_target(0, 0), (23u64, 10u64), -4i64), + /// (Location::new_target(0, 0), (23u64, 11u64), 1i64), + /// ]; /// let ts: &timely::logging::ProgressEventTimestampVec = &ts; - /// for (n, p, t, d) in ts.iter() { - /// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d)); + /// for (l, t, d) in ts.iter() { + /// print!("{:?}, ", (l, t.as_any().downcast_ref::<(u64, u64)>(), d)); /// } /// println!(); /// ``` @@ -114,14 +120,14 @@ impl ProgressEventTimestamp fo /// for each dynamically typed element). pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any { /// Iterate over the contents of the vector - fn iter<'a>(&'a self) -> Box+'a>; + fn iter<'a>(&'a self) -> Box+'a>; } -impl ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> { - fn iter<'a>(&'a self) -> Box+'a> { - Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| { +impl ProgressEventTimestampVec for Vec<(Location, T, i64)> { + fn iter<'a>(&'a self) -> Box+'a> { + Box::new(<[_]>::iter(&self[..]).map(|(l, t, d)| { let t: &dyn ProgressEventTimestamp = t; - (n, p, t, d) + (l, t, d) })) } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 7c0b95dfe..6b9c8fda0 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,7 +1,7 @@ //! Broadcasts progress information among workers. use crate::progress::{ChangeBatch, Timestamp}; -use crate::progress::{Location, Port}; +use crate::progress::Location; use crate::communication::{Message, Push, Pull}; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; @@ -68,13 +68,10 @@ impl Progcaster { let mut internal = Box::new(Vec::with_capacity(changes.len())); for ((location, time), diff) in changes.iter() { - match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) - }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) - } + if location.is_target() { + messages.push((*location, time.clone(), *diff)) + } else { + internal.push((*location, time.clone(), *diff)) } } @@ -137,14 +134,10 @@ impl Progcaster { let mut internal = Box::new(Vec::with_capacity(changes.len())); for ((location, time), diff) in recv_changes.iter() { - - match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) - }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) - } + if location.is_target() { + messages.push((*location, time.clone(), *diff)) + } else { + internal.push((*location, time.clone(), *diff)) } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 5a1f1f6f6..fb4038dd0 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -33,12 +33,9 @@ //! tracker.update_source(Source::new(0, 0), 17, 1); //! //! // Propagate changes; until this call updates are simply buffered. -//! tracker.propagate_all(); +//! let updates = tracker.propagate_all(); //! -//! let mut results = -//! tracker -//! .pushed() -//! .drain() +//! let mut results = updates //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); //! @@ -55,12 +52,9 @@ //! tracker.update_source(Source::new(0, 0), 17, -1); //! //! // Propagate changes; until this call updates are simply buffered. -//! tracker.propagate_all(); +//! let updates = tracker.propagate_all(); //! -//! let mut results = -//! tracker -//! .pushed() -//! .drain() +//! let mut results = updates //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); //! @@ -564,30 +558,25 @@ impl Tracker { /// Propagates all pending updates. /// /// The method drains `self.input_changes` and circulates their implications - /// until we cease deriving new implications. - pub fn propagate_all(&mut self) { + /// until we cease deriving new implications. It returns an iterator over updates + /// to implications. + pub fn propagate_all(&mut self) -> impl Iterator + '_ { + self.pushed_changes.clear(); // Step 0: If logging is enabled, construct and log inbound changes. if let Some(logger) = &mut self.logger { + let changes_count = self.target_changes.len() + self.source_changes.len(); + let mut changes = Vec::with_capacity(changes_count); - let target_changes = - self.target_changes - .iter() - .map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff)) - .collect::>(); - - if !target_changes.is_empty() { - logger.log_target_updates(Box::new(target_changes)); + for ((target, time), diff) in self.target_changes.iter() { + changes.push((Location::from(*target), time.clone(), *diff)); + } + for ((source, time), diff) in self.source_changes.iter() { + changes.push((Location::from(*source), time.clone(), *diff)); } - let source_changes = - self.source_changes - .iter() - .map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff)) - .collect::>(); - - if !source_changes.is_empty() { - logger.log_source_updates(Box::new(source_changes)); + if !changes.is_empty() { + logger.log_pointstamp_updates(Box::new(changes)); } } @@ -674,6 +663,7 @@ impl Tracker { } self.pushed_changes.update((location, time), diff); } + } // Update to an operator output. // Propagate any changes forward along outgoing edges. @@ -699,6 +689,21 @@ impl Tracker { }; } } + + // Step 3: If logging is enabled, construct and log outbound changes. + if let Some(logger) = &mut self.logger { + let changes: Vec<_> = self + .pushed_changes + .iter() + .map(|((location, time), diff)| (*location, time.clone(), *diff)) + .collect(); + + if !changes.is_empty() { + logger.log_frontier_updates(Box::new(changes)); + } + } + + self.pushed_changes.drain() } /// Implications of maintained capabilities projected to each output. @@ -706,11 +711,6 @@ impl Tracker { &mut self.output_changes[..] } - /// A mutable reference to the pushed results of changes. - pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> { - &mut self.pushed_changes - } - /// Reveals per-operator frontier state. pub fn node_state(&self, index: usize) -> &PerOperator { &self.per_operator[index] @@ -846,56 +846,61 @@ pub mod logging { Self { path, logger } } - /// Log source update events with additional identifying information. - pub fn log_source_updates(&mut self, updates: Box) { - self.logger.log({ - SourceUpdate { + /// Log pointstamp update events with additional identifying information. + pub fn log_pointstamp_updates(&mut self, updates: Box) { + self.logger.log( + PointstampUpdates { tracker_id: self.path.clone(), updates, } - }) + ) } - /// Log target update events with additional identifying information. - pub fn log_target_updates(&mut self, updates: Box) { - self.logger.log({ - TargetUpdate { + + /// Log frontier update events with additional identifying information. + pub fn log_frontier_updates(&mut self, updates: Box) { + self.logger.log( + FrontierUpdates { tracker_id: self.path.clone(), updates, } - }) + ) } } /// Events that the tracker may record. pub enum TrackerEvent { - /// Updates made at a source of data. - SourceUpdate(SourceUpdate), - /// Updates made at a target of data. - TargetUpdate(TargetUpdate), + /// Pointstamp updates made. + PointstampUpdates(PointstampUpdates), + /// Frontier updates made. + FrontierUpdates(FrontierUpdates), } - /// An update made at a source of data. - pub struct SourceUpdate { + /// Pointstamp updates reported by a tracker. + pub struct PointstampUpdates { /// An identifier for the tracker. pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. + /// Updates themselves, as `(location, time, diff)`. pub updates: Box, } - /// An update made at a target of data. - pub struct TargetUpdate { + /// Frontier updates reported by a tracker. + pub struct FrontierUpdates { /// An identifier for the tracker. pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. + /// Updates themselves, as `(location, time, diff)`. pub updates: Box, } - impl From for TrackerEvent { - fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) } + impl From for TrackerEvent { + fn from(v: PointstampUpdates) -> Self { + Self::PointstampUpdates(v) + } } - impl From for TrackerEvent { - fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) } + impl From for TrackerEvent { + fn from(v: FrontierUpdates) -> Self { + Self::FrontierUpdates(v) + } } } @@ -913,32 +918,46 @@ impl Drop for Tracker { }; // Retract pending data that `propagate_all` would normally log. + let mut pointstamp_changes = Vec::new(); + let mut frontier_changes = Vec::new(); + for (index, per_operator) in self.per_operator.iter_mut().enumerate() { - let target_changes = per_operator.targets - .iter_mut() - .enumerate() - .flat_map(|(port, target)| { - target.pointstamps - .updates() - .map(move |(time, diff)| (index, port, time.clone(), -diff)) - }) - .collect::>(); - if !target_changes.is_empty() { - logger.log_target_updates(Box::new(target_changes)); + for (port, target) in per_operator.targets.iter_mut().enumerate() { + let location = Location::new_target(index, port); + let pointstamp_retractions = target.pointstamps + .updates() + .map(|(time, diff)| (location, time.clone(), -diff)); + pointstamp_changes.extend(pointstamp_retractions); + + let frontier = target.implications.frontier().to_owned(); + let frontier_retractions = frontier + .into_iter() + .map(|time| (location, time, -1)); + frontier_changes.extend(frontier_retractions); } + } - let source_changes = per_operator.sources - .iter_mut() - .enumerate() - .flat_map(|(port, source)| { - source.pointstamps - .updates() - .map(move |(time, diff)| (index, port, time.clone(), -diff)) - }) - .collect::>(); - if !source_changes.is_empty() { - logger.log_source_updates(Box::new(source_changes)); + for (index, per_operator) in self.per_operator.iter_mut().enumerate() { + for (port, source) in per_operator.sources.iter_mut().enumerate() { + let location = Location::new_source(index, port); + let pointstamp_retractions = source.pointstamps + .updates() + .map(|(time, diff)| (location, time.clone(), -diff)); + pointstamp_changes.extend(pointstamp_retractions); + + let frontier = source.implications.frontier().to_owned(); + let frontier_retractions = frontier + .into_iter() + .map(|time| (location, time, -1)); + frontier_changes.extend(frontier_retractions); } } + + if !pointstamp_changes.is_empty() { + logger.log_pointstamp_updates(Box::new(pointstamp_changes)); + } + if !frontier_changes.is_empty() { + logger.log_frontier_updates(Box::new(frontier_changes)); + } } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index f41fee190..20a17cdf1 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -459,10 +459,10 @@ where } // Propagate implications of progress changes. - self.pointstamp_tracker.propagate_all(); + let updates = self.pointstamp_tracker.propagate_all(); // Drain propagated information into shared progress structure. - for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() { + for ((location, time), diff) in updates { self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port {