From 9705303c2f090a19f5475d23754264517ee45f25 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 28 Nov 2023 13:49:48 +0100 Subject: [PATCH 1/3] Return implication changes from `propagate_all` This commit makes the reachability tracker directly return changes to implications from `propagate_all`, rather than stashing them and expecting callers to access them through a separate `pushed` method. This API changes allows `propagate_all` to assume that the caller will consume the changes before the next call to `propagate_all`, which in turn guarantees that `pushed_changes` only contains changes from the last `propagate_all` call, rather than possibly changes from previous calls as well. --- timely/src/progress/reachability.rs | 27 ++++++++++----------------- timely/src/progress/subgraph.rs | 4 ++-- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 5a1f1f6f6..3e2ba2e97 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -33,12 +33,9 @@ //! tracker.update_source(Source::new(0, 0), 17, 1); //! //! // Propagate changes; until this call updates are simply buffered. -//! tracker.propagate_all(); +//! let updates = tracker.propagate_all(); //! -//! let mut results = -//! tracker -//! .pushed() -//! .drain() +//! let mut results = updates //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); //! @@ -55,12 +52,9 @@ //! tracker.update_source(Source::new(0, 0), 17, -1); //! //! // Propagate changes; until this call updates are simply buffered. -//! tracker.propagate_all(); +//! let updates = tracker.propagate_all(); //! -//! let mut results = -//! tracker -//! .pushed() -//! .drain() +//! let mut results = updates //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); //! @@ -564,8 +558,10 @@ impl Tracker { /// Propagates all pending updates. /// /// The method drains `self.input_changes` and circulates their implications - /// until we cease deriving new implications. - pub fn propagate_all(&mut self) { + /// until we cease deriving new implications. It returns an iterator over updates + /// to implications. + pub fn propagate_all(&mut self) -> impl Iterator + '_ { + self.pushed_changes.clear(); // Step 0: If logging is enabled, construct and log inbound changes. if let Some(logger) = &mut self.logger { @@ -699,6 +695,8 @@ impl Tracker { }; } } + + self.pushed_changes.drain() } /// Implications of maintained capabilities projected to each output. @@ -706,11 +704,6 @@ impl Tracker { &mut self.output_changes[..] } - /// A mutable reference to the pushed results of changes. - pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> { - &mut self.pushed_changes - } - /// Reveals per-operator frontier state. pub fn node_state(&self, index: usize) -> &PerOperator { &self.per_operator[index] diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index f41fee190..20a17cdf1 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -459,10 +459,10 @@ where } // Propagate implications of progress changes. - self.pointstamp_tracker.propagate_all(); + let updates = self.pointstamp_tracker.propagate_all(); // Drain propagated information into shared progress structure. - for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() { + for ((location, time), diff) in updates { self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port { From a92df780ac951c0ce4e4010440ce879895cfc93c Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 27 Nov 2023 15:53:52 +0100 Subject: [PATCH 2/3] Log frontiers in progress tracking --- timely/src/progress/reachability.rs | 186 +++++++++++++++++++++------- 1 file changed, 144 insertions(+), 42 deletions(-) diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 3e2ba2e97..67f3a20c8 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -573,7 +573,7 @@ impl Tracker { .collect::>(); if !target_changes.is_empty() { - logger.log_target_updates(Box::new(target_changes)); + logger.log_target_pointstamp_updates(Box::new(target_changes)); } let source_changes = @@ -583,7 +583,7 @@ impl Tracker { .collect::>(); if !source_changes.is_empty() { - logger.log_source_updates(Box::new(source_changes)); + logger.log_source_pointstamp_updates(Box::new(source_changes)); } } @@ -670,6 +670,7 @@ impl Tracker { } self.pushed_changes.update((location, time), diff); } + } // Update to an operator output. // Propagate any changes forward along outgoing edges. @@ -696,6 +697,30 @@ impl Tracker { } } + // Step 3: If logging is enabled, construct and log outbound changes. + if let Some(logger) = &mut self.logger { + let mut target_changes = Vec::new(); + let mut source_changes = Vec::new(); + + for ((location, time), diff) in self.pushed_changes.iter() { + match location.port { + Port::Target(port) => { + target_changes.push((location.node, port, time.clone(), *diff)) + } + Port::Source(port) => { + source_changes.push((location.node, port, time.clone(), *diff)) + } + } + } + + if !target_changes.is_empty() || !source_changes.is_empty() { + logger.log_frontier_updates( + Box::new(target_changes), + Box::new(source_changes), + ); + } + } + self.pushed_changes.drain() } @@ -839,56 +864,113 @@ pub mod logging { Self { path, logger } } - /// Log source update events with additional identifying information. - pub fn log_source_updates(&mut self, updates: Box) { + /// Log source pointstamp update events with additional identifying information. + pub fn log_source_pointstamp_updates(&mut self, updates: Box) { self.logger.log({ - SourceUpdate { + SourcePointstampUpdate { tracker_id: self.path.clone(), updates, } }) } - /// Log target update events with additional identifying information. - pub fn log_target_updates(&mut self, updates: Box) { + /// Log target pointstamp update events with additional identifying information. + pub fn log_target_pointstamp_updates(&mut self, updates: Box) { self.logger.log({ - TargetUpdate { + TargetPointstampUpdate { tracker_id: self.path.clone(), updates, } }) } + + /// Log frontier update events with additional identifying information. + /// + /// We want to log source and target updates at the same time to ensure callers observe + /// consistent frontiers at any point in time. + pub fn log_frontier_updates( + &mut self, + source_updates: Box, + target_updates: Box, + ) { + let source_event: TrackerEvent = SourceFrontierUpdate { + tracker_id: self.path.clone(), + updates: source_updates, + }.into(); + let target_event: TrackerEvent = TargetFrontierUpdate { + tracker_id: self.path.clone(), + updates: target_updates, + }.into(); + + self.logger.log_many([source_event, target_event]); + } } /// Events that the tracker may record. pub enum TrackerEvent { - /// Updates made at a source of data. - SourceUpdate(SourceUpdate), - /// Updates made at a target of data. - TargetUpdate(TargetUpdate), + /// Pointstamp updates made at a source of data. + SourcePointstampUpdate(SourcePointstampUpdate), + /// Pointstamp updates made at a target of data. + TargetPointstampUpdate(TargetPointstampUpdate), + /// Frontier updates made at a source of data. + SourceFrontierUpdate(SourceFrontierUpdate), + /// Frontier updates made at a target of data. + TargetFrontierUpdate(TargetFrontierUpdate), + } + + /// A pointstamp update made at a source of data. + pub struct SourcePointstampUpdate { + /// An identifier for the tracker. + pub tracker_id: Vec, + /// Updates themselves, as `(node, port, time, diff)`. + pub updates: Box, + } + + /// A pointstamp update made at a target of data. + pub struct TargetPointstampUpdate { + /// An identifier for the tracker. + pub tracker_id: Vec, + /// Updates themselves, as `(node, port, time, diff)`. + pub updates: Box, } - /// An update made at a source of data. - pub struct SourceUpdate { + /// A frontier update at a source of data. + pub struct SourceFrontierUpdate { /// An identifier for the tracker. pub tracker_id: Vec, /// Updates themselves, as `(node, port, time, diff)`. pub updates: Box, } - /// An update made at a target of data. - pub struct TargetUpdate { + /// A frontier update at a target of data. + pub struct TargetFrontierUpdate { /// An identifier for the tracker. pub tracker_id: Vec, /// Updates themselves, as `(node, port, time, diff)`. pub updates: Box, } - impl From for TrackerEvent { - fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) } + impl From for TrackerEvent { + fn from(v: SourcePointstampUpdate) -> Self { + Self::SourcePointstampUpdate(v) + } + } + + impl From for TrackerEvent { + fn from(v: TargetPointstampUpdate) -> Self { + Self::TargetPointstampUpdate(v) + } + } + + impl From for TrackerEvent { + fn from(v: SourceFrontierUpdate) -> Self { + Self::SourceFrontierUpdate(v) + } } - impl From for TrackerEvent { - fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) } + impl From for TrackerEvent { + fn from(v: TargetFrontierUpdate) -> Self { + Self::TargetFrontierUpdate(v) + } } } @@ -906,32 +988,52 @@ impl Drop for Tracker { }; // Retract pending data that `propagate_all` would normally log. + let mut target_pointstamp_changes = Vec::new(); + let mut source_pointstamp_changes = Vec::new(); + let mut target_frontier_changes = Vec::new(); + let mut source_frontier_changes = Vec::new(); + for (index, per_operator) in self.per_operator.iter_mut().enumerate() { - let target_changes = per_operator.targets - .iter_mut() - .enumerate() - .flat_map(|(port, target)| { - target.pointstamps - .updates() - .map(move |(time, diff)| (index, port, time.clone(), -diff)) - }) - .collect::>(); - if !target_changes.is_empty() { - logger.log_target_updates(Box::new(target_changes)); + for (port, target) in per_operator.targets.iter_mut().enumerate() { + let pointstamp_retractions = target.pointstamps + .updates() + .map(|(time, diff)| (index, port, time.clone(), -diff)); + target_pointstamp_changes.extend(pointstamp_retractions); + + let frontier = target.implications.frontier().to_owned(); + let frontier_retractions = frontier + .into_iter() + .map(|time| (index, port, time, -1)); + target_frontier_changes.extend(frontier_retractions); } + } - let source_changes = per_operator.sources - .iter_mut() - .enumerate() - .flat_map(|(port, source)| { - source.pointstamps - .updates() - .map(move |(time, diff)| (index, port, time.clone(), -diff)) - }) - .collect::>(); - if !source_changes.is_empty() { - logger.log_source_updates(Box::new(source_changes)); + for (index, per_operator) in self.per_operator.iter_mut().enumerate() { + for (port, source) in per_operator.sources.iter_mut().enumerate() { + let pointstamp_retractions = source.pointstamps + .updates() + .map(|(time, diff)| (index, port, time.clone(), -diff)); + source_pointstamp_changes.extend(pointstamp_retractions); + + let frontier = source.implications.frontier().to_owned(); + let frontier_retractions = frontier + .into_iter() + .map(|time| (index, port, time, -1)); + source_frontier_changes.extend(frontier_retractions); } } + + if !target_pointstamp_changes.is_empty() { + logger.log_target_pointstamp_updates(Box::new(target_pointstamp_changes)); + } + if !source_pointstamp_changes.is_empty() { + logger.log_source_pointstamp_updates(Box::new(source_pointstamp_changes)); + } + if !source_frontier_changes.is_empty() || !target_frontier_changes.is_empty() { + logger.log_frontier_updates( + Box::new(source_frontier_changes), + Box::new(target_frontier_changes), + ); + } } } From 7b5407e1ebaadc0829a69868197e237ca994c896 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 29 Nov 2023 11:33:40 +0100 Subject: [PATCH 3/3] Restructure `TrackerEvent` This commit changes the shape of `TrackerEvent` to include the source/target distinction in a `Location` rather than separate enum variants. Apart from less boilerplate, this speeds up the frontier logging in `propagate_all` as we can directly use the contents of `pushed_changes`, instead of having to split them into target and source changes. A side effect of this is that the definition of `ProgressEventTimestampVec` changes, which also affects the `TimelyProgressEvent` type. --- timely/examples/logging-send.rs | 8 +- timely/src/logging.rs | 22 ++-- timely/src/progress/broadcast.rs | 25 ++-- timely/src/progress/reachability.rs | 194 +++++++++------------------- 4 files changed, 86 insertions(+), 163 deletions(-) diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 23228f4e8..518fce32f 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -29,13 +29,13 @@ fn main() { println!("PROGRESS: {:?}", x); let (_, _, ev) = x; print!("PROGRESS: TYPED MESSAGES: "); - for (n, p, t, d) in ev.messages.iter() { - print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + for (l, t, d) in ev.messages.iter() { + print!("{:?}, ", (l, t.as_any().downcast_ref::(), d)); } println!(); print!("PROGRESS: TYPED INTERNAL: "); - for (n, p, t, d) in ev.internal.iter() { - print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + for (l, t, d) in ev.internal.iter() { + print!("{:?}, ", (l, t.as_any().downcast_ref::(), d)); } println!(); }) diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ca9868fbe..54cdaa6de 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -11,6 +11,7 @@ pub type TimelyProgressLogger = Logger; use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; +use crate::progress::Location; /// Logs events as a timely stream, with progress statements. pub struct BatchLogger where P: EventPusher { @@ -80,10 +81,15 @@ pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any { /// /// # Example /// ```rust - /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)]; + /// use timely::progress::Location; + /// + /// let ts = vec![ + /// (Location::new_target(0, 0), (23u64, 10u64), -4i64), + /// (Location::new_target(0, 0), (23u64, 11u64), 1i64), + /// ]; /// let ts: &timely::logging::ProgressEventTimestampVec = &ts; - /// for (n, p, t, d) in ts.iter() { - /// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d)); + /// for (l, t, d) in ts.iter() { + /// print!("{:?}, ", (l, t.as_any().downcast_ref::<(u64, u64)>(), d)); /// } /// println!(); /// ``` @@ -114,14 +120,14 @@ impl ProgressEventTimestamp fo /// for each dynamically typed element). pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any { /// Iterate over the contents of the vector - fn iter<'a>(&'a self) -> Box+'a>; + fn iter<'a>(&'a self) -> Box+'a>; } -impl ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> { - fn iter<'a>(&'a self) -> Box+'a> { - Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| { +impl ProgressEventTimestampVec for Vec<(Location, T, i64)> { + fn iter<'a>(&'a self) -> Box+'a> { + Box::new(<[_]>::iter(&self[..]).map(|(l, t, d)| { let t: &dyn ProgressEventTimestamp = t; - (n, p, t, d) + (l, t, d) })) } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 7c0b95dfe..6b9c8fda0 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,7 +1,7 @@ //! Broadcasts progress information among workers. use crate::progress::{ChangeBatch, Timestamp}; -use crate::progress::{Location, Port}; +use crate::progress::Location; use crate::communication::{Message, Push, Pull}; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; @@ -68,13 +68,10 @@ impl Progcaster { let mut internal = Box::new(Vec::with_capacity(changes.len())); for ((location, time), diff) in changes.iter() { - match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) - }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) - } + if location.is_target() { + messages.push((*location, time.clone(), *diff)) + } else { + internal.push((*location, time.clone(), *diff)) } } @@ -137,14 +134,10 @@ impl Progcaster { let mut internal = Box::new(Vec::with_capacity(changes.len())); for ((location, time), diff) in recv_changes.iter() { - - match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) - }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) - } + if location.is_target() { + messages.push((*location, time.clone(), *diff)) + } else { + internal.push((*location, time.clone(), *diff)) } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 67f3a20c8..fb4038dd0 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -565,25 +565,18 @@ impl Tracker { // Step 0: If logging is enabled, construct and log inbound changes. if let Some(logger) = &mut self.logger { + let changes_count = self.target_changes.len() + self.source_changes.len(); + let mut changes = Vec::with_capacity(changes_count); - let target_changes = - self.target_changes - .iter() - .map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff)) - .collect::>(); - - if !target_changes.is_empty() { - logger.log_target_pointstamp_updates(Box::new(target_changes)); + for ((target, time), diff) in self.target_changes.iter() { + changes.push((Location::from(*target), time.clone(), *diff)); + } + for ((source, time), diff) in self.source_changes.iter() { + changes.push((Location::from(*source), time.clone(), *diff)); } - let source_changes = - self.source_changes - .iter() - .map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff)) - .collect::>(); - - if !source_changes.is_empty() { - logger.log_source_pointstamp_updates(Box::new(source_changes)); + if !changes.is_empty() { + logger.log_pointstamp_updates(Box::new(changes)); } } @@ -699,25 +692,14 @@ impl Tracker { // Step 3: If logging is enabled, construct and log outbound changes. if let Some(logger) = &mut self.logger { - let mut target_changes = Vec::new(); - let mut source_changes = Vec::new(); - - for ((location, time), diff) in self.pushed_changes.iter() { - match location.port { - Port::Target(port) => { - target_changes.push((location.node, port, time.clone(), *diff)) - } - Port::Source(port) => { - source_changes.push((location.node, port, time.clone(), *diff)) - } - } - } + let changes: Vec<_> = self + .pushed_changes + .iter() + .map(|((location, time), diff)| (*location, time.clone(), *diff)) + .collect(); - if !target_changes.is_empty() || !source_changes.is_empty() { - logger.log_frontier_updates( - Box::new(target_changes), - Box::new(source_changes), - ); + if !changes.is_empty() { + logger.log_frontier_updates(Box::new(changes)); } } @@ -864,112 +846,60 @@ pub mod logging { Self { path, logger } } - /// Log source pointstamp update events with additional identifying information. - pub fn log_source_pointstamp_updates(&mut self, updates: Box) { - self.logger.log({ - SourcePointstampUpdate { + /// Log pointstamp update events with additional identifying information. + pub fn log_pointstamp_updates(&mut self, updates: Box) { + self.logger.log( + PointstampUpdates { tracker_id: self.path.clone(), updates, } - }) + ) } - /// Log target pointstamp update events with additional identifying information. - pub fn log_target_pointstamp_updates(&mut self, updates: Box) { - self.logger.log({ - TargetPointstampUpdate { + + /// Log frontier update events with additional identifying information. + pub fn log_frontier_updates(&mut self, updates: Box) { + self.logger.log( + FrontierUpdates { tracker_id: self.path.clone(), updates, } - }) - } - - /// Log frontier update events with additional identifying information. - /// - /// We want to log source and target updates at the same time to ensure callers observe - /// consistent frontiers at any point in time. - pub fn log_frontier_updates( - &mut self, - source_updates: Box, - target_updates: Box, - ) { - let source_event: TrackerEvent = SourceFrontierUpdate { - tracker_id: self.path.clone(), - updates: source_updates, - }.into(); - let target_event: TrackerEvent = TargetFrontierUpdate { - tracker_id: self.path.clone(), - updates: target_updates, - }.into(); - - self.logger.log_many([source_event, target_event]); + ) } } /// Events that the tracker may record. pub enum TrackerEvent { - /// Pointstamp updates made at a source of data. - SourcePointstampUpdate(SourcePointstampUpdate), - /// Pointstamp updates made at a target of data. - TargetPointstampUpdate(TargetPointstampUpdate), - /// Frontier updates made at a source of data. - SourceFrontierUpdate(SourceFrontierUpdate), - /// Frontier updates made at a target of data. - TargetFrontierUpdate(TargetFrontierUpdate), - } - - /// A pointstamp update made at a source of data. - pub struct SourcePointstampUpdate { - /// An identifier for the tracker. - pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. - pub updates: Box, - } - - /// A pointstamp update made at a target of data. - pub struct TargetPointstampUpdate { - /// An identifier for the tracker. - pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. - pub updates: Box, + /// Pointstamp updates made. + PointstampUpdates(PointstampUpdates), + /// Frontier updates made. + FrontierUpdates(FrontierUpdates), } - /// A frontier update at a source of data. - pub struct SourceFrontierUpdate { + /// Pointstamp updates reported by a tracker. + pub struct PointstampUpdates { /// An identifier for the tracker. pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. + /// Updates themselves, as `(location, time, diff)`. pub updates: Box, } - /// A frontier update at a target of data. - pub struct TargetFrontierUpdate { + /// Frontier updates reported by a tracker. + pub struct FrontierUpdates { /// An identifier for the tracker. pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. + /// Updates themselves, as `(location, time, diff)`. pub updates: Box, } - impl From for TrackerEvent { - fn from(v: SourcePointstampUpdate) -> Self { - Self::SourcePointstampUpdate(v) - } - } - - impl From for TrackerEvent { - fn from(v: TargetPointstampUpdate) -> Self { - Self::TargetPointstampUpdate(v) + impl From for TrackerEvent { + fn from(v: PointstampUpdates) -> Self { + Self::PointstampUpdates(v) } } - impl From for TrackerEvent { - fn from(v: SourceFrontierUpdate) -> Self { - Self::SourceFrontierUpdate(v) - } - } - - impl From for TrackerEvent { - fn from(v: TargetFrontierUpdate) -> Self { - Self::TargetFrontierUpdate(v) + impl From for TrackerEvent { + fn from(v: FrontierUpdates) -> Self { + Self::FrontierUpdates(v) } } } @@ -988,52 +918,46 @@ impl Drop for Tracker { }; // Retract pending data that `propagate_all` would normally log. - let mut target_pointstamp_changes = Vec::new(); - let mut source_pointstamp_changes = Vec::new(); - let mut target_frontier_changes = Vec::new(); - let mut source_frontier_changes = Vec::new(); + let mut pointstamp_changes = Vec::new(); + let mut frontier_changes = Vec::new(); for (index, per_operator) in self.per_operator.iter_mut().enumerate() { for (port, target) in per_operator.targets.iter_mut().enumerate() { + let location = Location::new_target(index, port); let pointstamp_retractions = target.pointstamps .updates() - .map(|(time, diff)| (index, port, time.clone(), -diff)); - target_pointstamp_changes.extend(pointstamp_retractions); + .map(|(time, diff)| (location, time.clone(), -diff)); + pointstamp_changes.extend(pointstamp_retractions); let frontier = target.implications.frontier().to_owned(); let frontier_retractions = frontier .into_iter() - .map(|time| (index, port, time, -1)); - target_frontier_changes.extend(frontier_retractions); + .map(|time| (location, time, -1)); + frontier_changes.extend(frontier_retractions); } } for (index, per_operator) in self.per_operator.iter_mut().enumerate() { for (port, source) in per_operator.sources.iter_mut().enumerate() { + let location = Location::new_source(index, port); let pointstamp_retractions = source.pointstamps .updates() - .map(|(time, diff)| (index, port, time.clone(), -diff)); - source_pointstamp_changes.extend(pointstamp_retractions); + .map(|(time, diff)| (location, time.clone(), -diff)); + pointstamp_changes.extend(pointstamp_retractions); let frontier = source.implications.frontier().to_owned(); let frontier_retractions = frontier .into_iter() - .map(|time| (index, port, time, -1)); - source_frontier_changes.extend(frontier_retractions); + .map(|time| (location, time, -1)); + frontier_changes.extend(frontier_retractions); } } - if !target_pointstamp_changes.is_empty() { - logger.log_target_pointstamp_updates(Box::new(target_pointstamp_changes)); - } - if !source_pointstamp_changes.is_empty() { - logger.log_source_pointstamp_updates(Box::new(source_pointstamp_changes)); + if !pointstamp_changes.is_empty() { + logger.log_pointstamp_updates(Box::new(pointstamp_changes)); } - if !source_frontier_changes.is_empty() || !target_frontier_changes.is_empty() { - logger.log_frontier_updates( - Box::new(source_frontier_changes), - Box::new(target_frontier_changes), - ); + if !frontier_changes.is_empty() { + logger.log_frontier_updates(Box::new(frontier_changes)); } } }