From 6c5567799fe01fef0b6067943730c8ef59d5e26b Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 23 Mar 2024 03:13:49 -0400 Subject: [PATCH 1/3] Remove non-Core capture logic --- timely/examples/capture_recv.rs | 2 +- timely/examples/logging-recv.rs | 2 +- .../src/dataflow/operators/capture/capture.rs | 18 +-- .../src/dataflow/operators/capture/event.rs | 109 ++++++----------- .../src/dataflow/operators/capture/extract.rs | 112 +++++------------- timely/src/dataflow/operators/capture/mod.rs | 16 +-- .../src/dataflow/operators/capture/replay.rs | 18 +-- timely/src/dataflow/operators/core/rc.rs | 1 + timely/src/dataflow/operators/core/reclock.rs | 3 +- .../src/dataflow/operators/core/to_stream.rs | 2 +- timely/src/execute.rs | 8 +- timely/src/logging.rs | 6 +- 12 files changed, 110 insertions(+), 187 deletions(-) diff --git a/timely/examples/capture_recv.rs b/timely/examples/capture_recv.rs index d6f7291c11..5cb48b055f 100644 --- a/timely/examples/capture_recv.rs +++ b/timely/examples/capture_recv.rs @@ -17,7 +17,7 @@ fn main() { .collect::>() .into_iter() .map(|l| l.incoming().next().unwrap().unwrap()) - .map(|r| EventReader::<_,u64,_>::new(r)) + .map(|r| EventReader::<_,Vec,_>::new(r)) .collect::>(); worker.dataflow::(|scope| { diff --git a/timely/examples/logging-recv.rs b/timely/examples/logging-recv.rs index 1c2a5ccda8..4608cad3da 100644 --- a/timely/examples/logging-recv.rs +++ b/timely/examples/logging-recv.rs @@ -20,7 +20,7 @@ fn main() { .collect::>() .into_iter() .map(|l| l.incoming().next().unwrap().unwrap()) - .map(|r| EventReader::::new(r)) + .map(|r| EventReader::,_>::new(r)) .collect::>(); worker.dataflow(|scope| { diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index 9aa3827de2..b14d3a6b56 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -14,7 +14,7 @@ use crate::Container; use crate::progress::ChangeBatch; use crate::progress::Timestamp; -use super::{EventCore, EventPusherCore}; +use super::{Event, EventPusher}; /// Capture a stream of timestamped data for later replay. pub trait Capture { @@ -30,7 +30,7 @@ pub trait Capture { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -42,7 +42,7 @@ pub trait Capture { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLinkCore::new()); + /// let handle1 = Rc::new(EventLink::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -95,7 +95,7 @@ pub trait Capture { /// ); /// /// worker.dataflow::(|scope2| { - /// Some(EventReader::<_,u64,_>::new(recv)) + /// Some(EventReader::<_,Vec,_>::new(recv)) /// .replay_into(scope2) /// .capture_into(send0) /// }); @@ -103,10 +103,10 @@ pub trait Capture { /// /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); /// ``` - fn capture_into+'static>(&self, pusher: P); + fn capture_into+'static>(&self, pusher: P); /// Captures a stream using Rust's MPSC channels. - fn capture(&self) -> ::std::sync::mpsc::Receiver> { + fn capture(&self) -> ::std::sync::mpsc::Receiver> { let (send, recv) = ::std::sync::mpsc::channel(); self.capture_into(send); recv @@ -114,7 +114,7 @@ pub trait Capture { } impl Capture for StreamCore { - fn capture_into+'static>(&self, mut event_pusher: P) { + fn capture_into+'static>(&self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -131,7 +131,7 @@ impl Capture for StreamCore { if !progress.frontiers[0].is_empty() { // transmit any frontier progress. let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new()); - event_pusher.push(EventCore::Progress(to_send.into_inner())); + event_pusher.push(Event::Progress(to_send.into_inner())); } use crate::communication::message::RefOrMut; @@ -143,7 +143,7 @@ impl Capture for StreamCore { RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)), }; let vector = data.replace(Default::default()); - event_pusher.push(EventCore::Messages(time.clone(), vector)); + event_pusher.push(Event::Messages(time.clone(), vector)); } input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); false diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index a5b5f07b8c..3f7961e288 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -6,55 +6,33 @@ /// Data and progress events of the captured stream. #[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] -pub enum EventCore { +pub enum Event { /// Progress received via `push_external_progress`. Progress(Vec<(T, i64)>), /// Messages received via the data stream. Messages(T, C), } -/// Data and progress events of the captured stream, specialized to vector-based containers. -pub type Event = EventCore>; - -/// Iterates over contained `EventCore`. +/// Iterates over contained `Event`. /// /// The `EventIterator` trait describes types that can iterate over references to events, /// and which can be used to replay a stream into a new timely dataflow computation. /// /// This method is not simply an iterator because of the lifetime in the result. -pub trait EventIteratorCore { - /// Iterates over references to `EventCore` elements. - fn next(&mut self) -> Option<&EventCore>; -} - -/// A [EventIteratorCore] specialized to vector-based containers. -// TODO: use trait aliases once stable. -pub trait EventIterator: EventIteratorCore> { - /// Iterates over references to `Event` elements. - fn next(&mut self) -> Option<&Event>; +pub trait EventIterator { + /// Iterates over references to `Event` elements. + fn next(&mut self) -> Option<&Event>; } -impl>> EventIterator for E { - fn next(&mut self) -> Option<&Event> { - >::next(self) - } -} - -/// Receives `EventCore` events. -pub trait EventPusherCore { +/// Receives `Event` events. +pub trait EventPusher { /// Provides a new `Event` to the pusher. - fn push(&mut self, event: EventCore); + fn push(&mut self, event: Event); } -/// A [EventPusherCore] specialized to vector-based containers. -// TODO: use trait aliases once stable. -pub trait EventPusher: EventPusherCore> {} -impl>> EventPusher for E {} - - // implementation for the linked list behind a `Handle`. -impl EventPusherCore for ::std::sync::mpsc::Sender> { - fn push(&mut self, event: EventCore) { +impl EventPusher for ::std::sync::mpsc::Sender> { + fn push(&mut self, event: Event) { // NOTE: An Err(x) result just means "data not accepted" most likely // because the receiver is gone. No need to panic. let _ = self.send(event); @@ -67,40 +45,37 @@ pub mod link { use std::rc::Rc; use std::cell::RefCell; - use super::{EventCore, EventPusherCore, EventIteratorCore}; + use super::{Event, EventPusher, EventIterator}; - /// A linked list of EventCore. - pub struct EventLinkCore { + /// A linked list of Event. + pub struct EventLink { /// An event, if one exists. /// /// An event might not exist, if either we want to insert a `None` and have the output iterator pause, /// or in the case of the very first linked list element, which has no event when constructed. - pub event: Option>, + pub event: Option>, /// The next event, if it exists. - pub next: RefCell>>>, + pub next: RefCell>>>, } - /// A [EventLinkCore] specialized to vector-based containers. - pub type EventLink = EventLinkCore>; - - impl EventLinkCore { + impl EventLink { /// Allocates a new `EventLink`. - pub fn new() -> EventLinkCore { - EventLinkCore { event: None, next: RefCell::new(None) } + pub fn new() -> EventLink { + EventLink { event: None, next: RefCell::new(None) } } } // implementation for the linked list behind a `Handle`. - impl EventPusherCore for Rc> { - fn push(&mut self, event: EventCore) { - *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) })); + impl EventPusher for Rc> { + fn push(&mut self, event: Event) { + *self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) })); let next = self.next.borrow().as_ref().unwrap().clone(); *self = next; } } - impl EventIteratorCore for Rc> { - fn next(&mut self) -> Option<&EventCore> { + impl EventIterator for Rc> { + fn next(&mut self) -> Option<&Event> { let is_some = self.next.borrow().is_some(); if is_some { let next = self.next.borrow().as_ref().unwrap().clone(); @@ -114,7 +89,7 @@ pub mod link { } // Drop implementation to prevent stack overflow through naive drop impl. - impl Drop for EventLinkCore { + impl Drop for EventLink { fn drop(&mut self) { while let Some(link) = self.next.replace(None) { if let Ok(head) = Rc::try_unwrap(link) { @@ -124,7 +99,7 @@ pub mod link { } } - impl Default for EventLinkCore { + impl Default for EventLink { fn default() -> Self { Self::new() } @@ -132,10 +107,10 @@ pub mod link { #[test] fn avoid_stack_overflow_in_drop() { - let mut event1 = Rc::new(EventLinkCore::<(),()>::new()); + let mut event1 = Rc::new(EventLink::<(),()>::new()); let _event2 = event1.clone(); for _ in 0 .. 1_000_000 { - event1.push(EventCore::Progress(vec![])); + event1.push(Event::Progress(vec![])); } } } @@ -145,18 +120,15 @@ pub mod binary { use std::io::Write; use abomonation::Abomonation; - use super::{EventCore, EventPusherCore, EventIteratorCore}; + use super::{Event, EventPusher, EventIterator}; - /// A wrapper for `W: Write` implementing `EventPusherCore`. - pub struct EventWriterCore { + /// A wrapper for `W: Write` implementing `EventPusher`. + pub struct EventWriter { stream: W, phant: ::std::marker::PhantomData<(T, C)>, } - /// [EventWriterCore] specialized to vector-based containers. - pub type EventWriter = EventWriterCore, W>; - - impl EventWriterCore { + impl EventWriter { /// Allocates a new `EventWriter` wrapping a supplied writer. pub fn new(w: W) -> Self { Self { @@ -166,15 +138,15 @@ pub mod binary { } } - impl EventPusherCore for EventWriterCore { - fn push(&mut self, event: EventCore) { + impl EventPusher for EventWriter { + fn push(&mut self, event: Event) { // TODO: `push` has no mechanism to report errors, so we `unwrap`. unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); } } } /// A Wrapper for `R: Read` implementing `EventIterator`. - pub struct EventReaderCore { + pub struct EventReader { reader: R, bytes: Vec, buff1: Vec, @@ -184,10 +156,7 @@ pub mod binary { phant: ::std::marker::PhantomData<(T, C)>, } - /// [EventReaderCore] specialized to vector-based containers. - pub type EventReader = EventReaderCore, R>; - - impl EventReaderCore { + impl EventReader { /// Allocates a new `EventReader` wrapping a supplied reader. pub fn new(r: R) -> Self { Self { @@ -202,12 +171,12 @@ pub mod binary { } } - impl EventIteratorCore for EventReaderCore { - fn next(&mut self) -> Option<&EventCore> { + impl EventIterator for EventReader { + fn next(&mut self) -> Option<&Event> { // if we can decode something, we should just return it! :D - if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { - let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); + if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { + let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); self.consumed = self.valid - rest.len(); return Some(item); } diff --git a/timely/src/dataflow/operators/capture/extract.rs b/timely/src/dataflow/operators/capture/extract.rs index dcf57ae3bb..a496202855 100644 --- a/timely/src/dataflow/operators/capture/extract.rs +++ b/timely/src/dataflow/operators/capture/extract.rs @@ -1,14 +1,13 @@ //! Traits and types for extracting captured timely dataflow streams. -use super::EventCore; -use crate::Container; -use crate::Data; +use super::Event; +use crate::{container::{PushContainer, PushInto}}; /// Supports extracting a sequence of timestamp and data. -pub trait Extract { +pub trait Extract { /// Converts `self` into a sequence of timestamped data. /// - /// Currently this is only implemented for `Receiver>>`, and is used only + /// Currently this is only implemented for `Receiver>`, and is used only /// to easily pull data out of a timely dataflow computation once it has completed. /// /// # Examples @@ -18,7 +17,7 @@ pub trait Extract { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -30,7 +29,7 @@ pub trait Extract { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLinkCore::new()); + /// let handle1 = Rc::new(EventLink::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -44,88 +43,39 @@ pub trait Extract { /// }); /// }).unwrap(); /// - /// assert_eq!(recv.extract()[0].1, (0..10).collect::>()); + /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::>(), (0..10).collect::>()); /// ``` - fn extract(self) -> Vec<(T, Vec)>; + fn extract(self) -> Vec<(T, C)>; } -impl Extract for ::std::sync::mpsc::Receiver>> { - fn extract(self) -> Vec<(T, Vec)> { - let mut result = self.extract_core(); - - let mut current = 0; - for i in 1 .. result.len() { - if result[current].0 == result[i].0 { - let dataz = ::std::mem::replace(&mut result[i].1, Vec::new()); - result[current].1.extend(dataz); - } - else { - current = i; +impl Extract for ::std::sync::mpsc::Receiver> +where + for<'a> C::Item<'a>: PushInto + Ord, +{ + fn extract(self) -> Vec<(T, C)> { + let mut staged = std::collections::BTreeMap::new(); + for event in self { + if let Event::Messages(time, data) = event { + staged.entry(time) + .or_insert_with(Vec::new) + .push(data); } } - - for &mut (_, ref mut data) in &mut result { - data.sort(); - } - result.retain(|x| !x.1.is_empty()); - result - } -} - -/// Supports extracting a sequence of timestamp and data. -pub trait ExtractCore { - /// Converts `self` into a sequence of timestamped data. - /// - /// Currently this is only implemented for `Receiver>`, and is used only - /// to easily pull data out of a timely dataflow computation once it has completed. - /// - /// # Examples - /// - /// ```rust - /// use std::rc::Rc; - /// use std::sync::{Arc, Mutex}; - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, ExtractCore}; - /// - /// // get send and recv endpoints, wrap send to share - /// let (send, recv) = ::std::sync::mpsc::channel(); - /// let send = Arc::new(Mutex::new(send)); - /// - /// timely::execute(timely::Config::thread(), move |worker| { - /// - /// // this is only to validate the output. - /// let send = send.lock().unwrap().clone(); - /// - /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLinkCore::new()); - /// let handle2 = Some(handle1.clone()); - /// - /// worker.dataflow::(|scope1| - /// (0..10).to_stream(scope1) - /// .capture_into(handle1) - /// ); - /// - /// worker.dataflow(|scope2| { - /// handle2.replay_into(scope2) - /// .capture_into(send) - /// }); - /// }).unwrap(); - /// - /// assert_eq!(recv.extract_core().into_iter().flat_map(|x| x.1).collect::>(), (0..10).collect::>()); - /// ``` - fn extract_core(self) -> Vec<(T, C)>; -} - -impl ExtractCore for ::std::sync::mpsc::Receiver> { - fn extract_core(self) -> Vec<(T, C)> { let mut result = Vec::new(); - for event in self { - if let EventCore::Messages(time, data) = event { - result.push((time, data)); + for (time, mut dataz) in staged.into_iter() { + let mut to_sort = Vec::new(); + for data in dataz.iter_mut() { + to_sort.extend(data.drain()); + } + to_sort.sort(); + let mut sorted = C::default(); + for datum in to_sort.into_iter() { + sorted.push(datum); + } + if !sorted.is_empty() { + result.push((time, sorted)); } } - result.retain(|x| !x.1.is_empty()); result } } diff --git a/timely/src/dataflow/operators/capture/mod.rs b/timely/src/dataflow/operators/capture/mod.rs index 22d332ea02..84fb68aab9 100644 --- a/timely/src/dataflow/operators/capture/mod.rs +++ b/timely/src/dataflow/operators/capture/mod.rs @@ -22,10 +22,10 @@ //! use std::rc::Rc; //! use timely::dataflow::Scope; //! use timely::dataflow::operators::{Capture, ToStream, Inspect}; -//! use timely::dataflow::operators::capture::{EventLinkCore, Replay}; +//! use timely::dataflow::operators::capture::{EventLink, Replay}; //! //! timely::execute(timely::Config::thread(), |worker| { -//! let handle1 = Rc::new(EventLinkCore::new()); +//! let handle1 = Rc::new(EventLink::new()); //! let handle2 = Some(handle1.clone()); //! //! worker.dataflow::(|scope1| @@ -66,7 +66,7 @@ //! ); //! //! worker.dataflow::(|scope2| { -//! Some(EventReader::<_,u64,_>::new(recv)) +//! Some(EventReader::<_,Vec,_>::new(recv)) //! .replay_into(scope2) //! .inspect(|x| println!("replayed: {:?}", x)); //! }) @@ -75,11 +75,11 @@ pub use self::capture::Capture; pub use self::replay::Replay; -pub use self::extract::{Extract, ExtractCore}; -pub use self::event::{Event, EventCore, EventPusher, EventPusherCore}; -pub use self::event::link::{EventLink, EventLinkCore}; -pub use self::event::binary::{EventReader, EventReaderCore}; -pub use self::event::binary::{EventWriter, EventWriterCore}; +pub use self::extract::Extract; +pub use self::event::{Event, EventPusher}; +pub use self::event::link::EventLink; +pub use self::event::binary::EventReader; +pub use self::event::binary::EventWriter; pub mod capture; pub mod replay; diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 5fdb237040..bdf6558a07 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -44,17 +44,17 @@ use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; -use super::EventCore; -use super::event::EventIteratorCore; +use super::Event; +use super::event::EventIterator; use crate::Container; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { - /// Replays `self` into the provided scope, as a `Stream`. + /// Replays `self` into the provided scope, as a `StreamCore`. fn replay_into>(self, scope: &mut S) -> StreamCore { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } - /// Replays `self` into the provided scope, as a `Stream'. + /// Replays `self` into the provided scope, as a `StreamCore'. /// /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to @@ -63,8 +63,10 @@ pub trait Replay : Sized { } impl Replay for I -where I : IntoIterator, - ::Item: EventIteratorCore+'static { +where + I : IntoIterator, + ::Item: EventIterator+'static, +{ fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); @@ -93,10 +95,10 @@ where I : IntoIterator, for event_stream in event_streams.iter_mut() { while let Some(event) = event_stream.next() { match event { - EventCore::Progress(vec) => { + Event::Progress(vec) => { progress.internals[0].extend(vec.iter().cloned()); }, - EventCore::Messages(ref time, data) => { + Event::Messages(ref time, data) => { allocation.clone_from(data); output.session(time).give_container(&mut allocation); } diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index fcdc646cf2..c00108a7c3 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -72,6 +72,7 @@ mod test { } }), ]) + .container::>() .capture() }); let output = &mut output.extract()[0].1; diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 9510e87223..ec9617fb7f 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -19,7 +19,7 @@ pub trait Reclock { /// # Examples /// /// ``` - /// use timely::dataflow::operators::{ToStream, Delay, Map, Reclock, Capture}; + /// use timely::dataflow::operators::{Inspect, ToStream, Delay, Map, Reclock, Capture}; /// use timely::dataflow::operators::capture::Extract; /// /// let captured = timely::example(|scope| { @@ -36,6 +36,7 @@ pub trait Reclock { /// /// // reclock the data. /// data.reclock(&clock) + /// .inspect_batch(|t,x| println!("SEEN: {:?}, {:?}", t, x)) /// .capture() /// }); /// diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 3775e0267b..848adf3c1d 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -12,7 +12,7 @@ pub trait ToStream { /// # Examples /// /// ``` - /// use timely::dataflow::operators::core::ToStream; + /// use timely::dataflow::operators::ToStream; /// use timely::dataflow::operators::Capture; /// use timely::dataflow::operators::capture::Extract; /// diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 3564d35928..9a8340dbfe 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -235,12 +235,12 @@ where use ::std::net::TcpStream; use crate::logging::BatchLogger; - use crate::dataflow::operators::capture::EventWriterCore; + use crate::dataflow::operators::capture::EventWriter; eprintln!("enabled COMM logging to {}", addr); if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriterCore::new(stream); + let writer = EventWriter::new(stream); let mut logger = BatchLogger::new(writer); result = Some(crate::logging_core::Logger::new( ::std::time::Instant::now(), @@ -269,10 +269,10 @@ where use ::std::net::TcpStream; use crate::logging::{BatchLogger, TimelyEvent}; - use crate::dataflow::operators::capture::EventWriterCore; + use crate::dataflow::operators::capture::EventWriter; if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriterCore::new(stream); + let writer = EventWriter::new(stream); let mut logger = BatchLogger::new(writer); worker.log_register() .insert::("timely", move |time, data| diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ca9868fbe5..39944ade05 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -13,14 +13,14 @@ use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; /// Logs events as a timely stream, with progress statements. -pub struct BatchLogger where P: EventPusher { +pub struct BatchLogger where P: EventPusher> { // None when the logging stream is closed time: Duration, event_pusher: P, _phantom: ::std::marker::PhantomData<(E, T)>, } -impl BatchLogger where P: EventPusher { +impl BatchLogger where P: EventPusher> { /// Creates a new batch logger. pub fn new(event_pusher: P) -> Self { BatchLogger { @@ -42,7 +42,7 @@ impl BatchLogger where P: EventPusher Drop for BatchLogger where P: EventPusher { +impl Drop for BatchLogger where P: EventPusher> { fn drop(&mut self) { self.event_pusher.push(Event::Progress(vec![(self.time, -1)])); } From fea3459c92f6866c079e6f95fab790a76015c784 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 23 Mar 2024 03:15:52 -0400 Subject: [PATCH 2/3] Update Capture --- timely/src/dataflow/operators/{ => core}/capture/capture.rs | 0 timely/src/dataflow/operators/{ => core}/capture/event.rs | 0 timely/src/dataflow/operators/{ => core}/capture/extract.rs | 0 timely/src/dataflow/operators/{ => core}/capture/mod.rs | 0 timely/src/dataflow/operators/{ => core}/capture/replay.rs | 0 timely/src/dataflow/operators/core/mod.rs | 2 ++ timely/src/dataflow/operators/mod.rs | 3 +-- 7 files changed, 3 insertions(+), 2 deletions(-) rename timely/src/dataflow/operators/{ => core}/capture/capture.rs (100%) rename timely/src/dataflow/operators/{ => core}/capture/event.rs (100%) rename timely/src/dataflow/operators/{ => core}/capture/extract.rs (100%) rename timely/src/dataflow/operators/{ => core}/capture/mod.rs (100%) rename timely/src/dataflow/operators/{ => core}/capture/replay.rs (100%) diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs similarity index 100% rename from timely/src/dataflow/operators/capture/capture.rs rename to timely/src/dataflow/operators/core/capture/capture.rs diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/core/capture/event.rs similarity index 100% rename from timely/src/dataflow/operators/capture/event.rs rename to timely/src/dataflow/operators/core/capture/event.rs diff --git a/timely/src/dataflow/operators/capture/extract.rs b/timely/src/dataflow/operators/core/capture/extract.rs similarity index 100% rename from timely/src/dataflow/operators/capture/extract.rs rename to timely/src/dataflow/operators/core/capture/extract.rs diff --git a/timely/src/dataflow/operators/capture/mod.rs b/timely/src/dataflow/operators/core/capture/mod.rs similarity index 100% rename from timely/src/dataflow/operators/capture/mod.rs rename to timely/src/dataflow/operators/core/capture/mod.rs diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs similarity index 100% rename from timely/src/dataflow/operators/capture/replay.rs rename to timely/src/dataflow/operators/core/capture/replay.rs diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index d2ae145d97..19cba0afdc 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -1,6 +1,7 @@ //! Extension traits for `StreamCore` implementing various operators that //! are independent of specific container types. +pub mod capture; pub mod concat; pub mod enterleave; pub mod exchange; @@ -16,6 +17,7 @@ pub mod reclock; pub mod to_stream; pub mod unordered_input; +pub use capture::Capture; pub use concat::{Concat, Concatenate}; pub use enterleave::{Enter, Leave}; pub use exchange::Exchange; diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 53b504d3ba..2644e7ce06 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -17,7 +17,6 @@ pub use self::filter::Filter; pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; -pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::result::ResultStream; pub use self::to_stream::ToStream; @@ -46,7 +45,7 @@ pub use self::core::exchange; pub mod broadcast; pub use self::core::probe::{self, Probe}; pub mod to_stream; -pub mod capture; +pub use self::core::capture::{self, Capture}; pub mod branch; pub use self::core::ok_err::{self, OkErr}; pub use self::core::rc; From 0f42c3e243cff231e591dbf34e141a515ef6e581 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 23 Mar 2024 03:23:43 -0400 Subject: [PATCH 3/3] comment/doctest clean-up --- timely/src/dataflow/operators/core/capture/replay.rs | 2 +- timely/src/dataflow/operators/core/reclock.rs | 3 +-- timely/src/dataflow/operators/core/to_stream.rs | 9 ++++----- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index bdf6558a07..2ab843f860 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -50,7 +50,7 @@ use crate::Container; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { - /// Replays `self` into the provided scope, as a `StreamCore`. + /// Replays `self` into the provided scope, as a `StreamCore`. fn replay_into>(self, scope: &mut S) -> StreamCore { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index ec9617fb7f..9510e87223 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -19,7 +19,7 @@ pub trait Reclock { /// # Examples /// /// ``` - /// use timely::dataflow::operators::{Inspect, ToStream, Delay, Map, Reclock, Capture}; + /// use timely::dataflow::operators::{ToStream, Delay, Map, Reclock, Capture}; /// use timely::dataflow::operators::capture::Extract; /// /// let captured = timely::example(|scope| { @@ -36,7 +36,6 @@ pub trait Reclock { /// /// // reclock the data. /// data.reclock(&clock) - /// .inspect_batch(|t,x| println!("SEEN: {:?}, {:?}", t, x)) /// .capture() /// }); /// diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 848adf3c1d..a7d874be4e 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -12,13 +12,12 @@ pub trait ToStream { /// # Examples /// /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::Capture; - /// use timely::dataflow::operators::capture::Extract; + /// use timely::dataflow::operators::core::{ToStream, Capture}; + /// use timely::dataflow::operators::core::capture::Extract; /// /// let (data1, data2) = timely::example(|scope| { - /// let data1 = (0..3).to_stream(scope).capture(); - /// let data2 = vec![0,1,2].to_stream(scope).capture(); + /// let data1 = (0..3).to_stream(scope).container::>().capture(); + /// let data2 = vec![0,1,2].to_stream(scope).container::>().capture(); /// (data1, data2) /// }); ///