diff --git a/container/src/lib.rs b/container/src/lib.rs index d1cd022ad..eb5e98852 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -23,7 +23,7 @@ pub trait Container: Default + Clone + 'static { /// The type of elements when reading non-destructively from the container. type ItemRef<'a> where Self: 'a; - /// The type of elements when draining the continer. + /// The type of elements when draining the container. type Item<'a> where Self: 'a; /// Push `item` into self @@ -75,6 +75,9 @@ pub trait PushInto { /// chunked into individual containers, but is free to change the data representation to /// better fit the properties of the container. /// +/// Types implementing this trait should provide appropriate [`PushInto`] implementations such +/// that users can push the expected item types. +/// /// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns /// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`] /// needs to produce all outputs, even partial ones. @@ -83,23 +86,15 @@ pub trait PushInto { /// to ensure that it preserves the intended information. /// /// The trait does not prescribe any specific ordering guarantees, and each implementation can -/// decide to represent a `push`/`push_container` order for `extract` and `finish`, or not. -// TODO: Consider adding `push_iterator` to receive an iterator of data. +/// decide to represent a push order for `extract` and `finish`, or not. pub trait ContainerBuilder: Default + 'static { /// The container type we're building. type Container: Container; - /// Add an item to a container. - /// - /// The restriction to [`SizeableContainer`] only exists so that types - /// relying on [`CapacityContainerBuilder`] only need to constrain their container - /// to [`Container`] instead of [`SizableContainer`], which otherwise would be a pervasive - /// requirement. - fn push(&mut self, item: T) where Self::Container: SizableContainer + PushInto; - /// Push a pre-built container. - fn push_container(&mut self, container: &mut Self::Container); - /// Extract assembled containers, potentially leaving unfinished data behind. + /// Extract assembled containers, potentially leaving unfinished data behind. Should + /// be called repeatedly until it returns `None`. fn extract(&mut self) -> Option<&mut Self::Container>; - /// Extract assembled containers and any unfinished data. + /// Extract assembled containers and any unfinished data. Should + /// be called repeatedly until it returns `None`. fn finish(&mut self) -> Option<&mut Self::Container>; } @@ -126,11 +121,8 @@ pub trait SizableContainer: Container { fn reserve(&mut self, additional: usize); } -impl ContainerBuilder for CapacityContainerBuilder { - type Container = C; - - #[inline] - fn push(&mut self, item: T) where C: SizableContainer + PushInto { +impl> PushInto for CapacityContainerBuilder { + fn push_into(&mut self, item: T) { if self.current.capacity() == 0 { self.current = self.empty.take().unwrap_or_default(); // Discard any non-uniform capacity container. @@ -153,9 +145,35 @@ impl ContainerBuilder for CapacityContainerBuilder { self.pending.push_back(std::mem::take(&mut self.current)); } } +} + +impl ContainerBuilder for CapacityContainerBuilder { + type Container = C; + + #[inline] + fn extract(&mut self) -> Option<&mut C> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None + } + } #[inline] - fn push_container(&mut self, container: &mut Self::Container) { + fn finish(&mut self) -> Option<&mut C> { + if self.current.len() > 0 { + self.pending.push_back(std::mem::take(&mut self.current)); + } + self.extract() + } +} + +impl CapacityContainerBuilder { + /// Push a pre-formed container at this builder. This exists to maintain + /// API compatibility. + #[inline] + pub fn push_container(&mut self, container: &mut C) { if !container.is_empty() { // Flush to maintain FIFO ordering. if self.current.len() > 0 { @@ -170,26 +188,58 @@ impl ContainerBuilder for CapacityContainerBuilder { self.pending.push_back(std::mem::replace(container, empty)); } } +} + +/// A container builder that absorbs entire containers. Maintains FIFO order. +pub struct BufferingContainerBuilder { + /// Container that we're extracting. + current: Option, + /// Completed containers pending to be sent. + pending: VecDeque, +} +impl Default for BufferingContainerBuilder { #[inline] - fn extract(&mut self) -> Option<&mut C> { + fn default() -> Self { + Self { + current: None, + pending: VecDeque::default(), + } + } +} + +impl ContainerBuilder for BufferingContainerBuilder { + type Container = C; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { if let Some(container) = self.pending.pop_front() { - self.empty = Some(container); - self.empty.as_mut() + self.current = Some(container); + self.current.as_mut() } else { None } } #[inline] - fn finish(&mut self) -> Option<&mut C> { - if self.current.len() > 0 { - self.pending.push_back(std::mem::take(&mut self.current)); - } + fn finish(&mut self) -> Option<&mut Self::Container> { self.extract() } } +impl PushInto<&mut C> for BufferingContainerBuilder { + #[inline] + fn push_into(&mut self, item: &mut C) { + if !item.is_empty() { + // Grab the last returned container, or a default one, to pass back to the caller + let mut empty = self.current.take().unwrap_or_default(); + empty.clear(); + let container = std::mem::replace(item, empty); + self.pending.push_back(container); + } + } +} + impl Container for Vec { type ItemRef<'a> = &'a T where T: 'a; type Item<'a> = T where T: 'a; @@ -347,7 +397,7 @@ pub trait PushPartitioned: SizableContainer { F: FnMut(usize, &mut Self); } -impl PushPartitioned for T where for<'a> T: PushInto> { +impl PushPartitioned for C where for<'a> C: PushInto> { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where for<'a> I: FnMut(&Self::Item<'a>) -> usize, @@ -383,7 +433,7 @@ pub mod buffer { /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES] /// and 1, inclusively. pub const fn default_capacity() -> usize { - let size = ::std::mem::size_of::(); + let size = std::mem::size_of::(); if size == 0 { BUFFER_SIZE_BYTES } else if size <= BUFFER_SIZE_BYTES { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index ae878358f..d48d44772 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -2,7 +2,7 @@ //! with the performance of batched sends. use crate::communication::Push; -use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer, PushInto}; +use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto}; use crate::dataflow::channels::{Bundle, Message}; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; @@ -56,6 +56,16 @@ impl>> Buffer) -> AutoflushSession, P> where T: Timestamp { self.autoflush_session_with_builder(cap) } + + /// Gives an entire container at the current time. Only provided for + /// buffers of [`CapacityContainerBuilder`]s. Other container builders + /// should use [`PushInto`] instead. + fn give_container(&mut self, container: &mut C) { + if !container.is_empty() { + self.builder.push_container(container); + self.extract(); + } + } } impl>> Buffer where T: Eq+Clone { @@ -101,25 +111,18 @@ impl>> Buffer>> Buffer +impl Buffer where T: Eq+Clone, - CB::Container: SizableContainer, + CB: ContainerBuilder, + P: Push> { // Push a single item into the builder. Internal method for use by `Session`. #[inline] - fn give(&mut self, data: D) where CB::Container: PushInto { - self.builder.push(data); + fn give(&mut self, data: D) where CB: PushInto { + self.builder.push_into(data); self.extract(); } } @@ -133,33 +136,34 @@ pub struct Session<'a, T, CB, P> { buffer: &'a mut Buffer, } -impl<'a, T, CB, P> Session<'a, T, CB, P> +impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder, P> where T: Eq + Clone + 'a, - CB: ContainerBuilder + 'a, - P: Push> + 'a + P: Push> + 'a, { - /// Provide a container at the time specified by the [Session]. - pub fn give_container(&mut self, container: &mut CB::Container) { + /// Provide a container at the time specified by the [Session]. Only provided for + /// buffers of [`CapacityContainerBuilder`]s. Other container builders + /// should use [`PushInto`] instead. + pub fn give_container(&mut self, container: &mut C) { self.buffer.give_container(container) } +} +impl<'a, T, CB, P> Session<'a, T, CB, P> +where + T: Eq + Clone + 'a, + CB: ContainerBuilder + 'a, + P: Push> + 'a +{ /// Access the builder. Immutable access to prevent races with flushing /// the underlying buffer. pub fn builder(&self) -> &CB { self.buffer.builder() } -} -impl<'a, T, CB, P: Push>+'a> Session<'a, T, CB, P> -where - T: Eq + Clone + 'a, - CB: ContainerBuilder + 'a, - CB::Container: SizableContainer, -{ /// Provides one record at the time specified by the `Session`. #[inline] - pub fn give(&mut self, data: D) where CB::Container: PushInto { + pub fn give(&mut self, data: D) where CB: PushInto { self.buffer.give(data); } @@ -168,7 +172,7 @@ where pub fn give_iterator(&mut self, iter: I) where I: Iterator, - CB::Container: PushInto, + CB: PushInto, { for item in iter { self.give(item); @@ -197,15 +201,19 @@ where { /// Transmits a single record. #[inline] - pub fn give(&mut self, data: D) where CB::Container: SizableContainer + PushInto { + pub fn give(&mut self, data: D) + where + CB: PushInto, + { self.buffer.give(data); } + /// Transmits records produced by an iterator. #[inline] pub fn give_iterator(&mut self, iter: I) where I: Iterator, - CB::Container: SizableContainer + PushInto, + CB: PushInto, { for item in iter { self.give(item); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 40d42cf19..a9a24fdbe 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::container::{SizableContainer, PushInto}; +use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto}; use crate::scheduling::{Schedule, Activator}; @@ -60,7 +60,44 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore); + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore); + + /// Create a new [StreamCore] and [Handle] through which to supply input. + /// + /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used + /// immediately for timely dataflow construction, and the `Handle` is later used to introduce + /// data into the timely dataflow computation. + /// + /// The `Handle` also provides a means to indicate + /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely + /// to issue progress notifications. + /// + /// # Examples + /// ``` + /// use std::rc::Rc; + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::container::BufferingContainerBuilder; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = worker.dataflow(|scope| { + /// let (input, stream) = scope.new_input_with_builder::>>>(); + /// stream.inspect(|x| println!("hello {:?}", x)); + /// input + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(&mut Rc::new(vec![round])); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn new_input_with_builder(&mut self) -> (Handle<::Timestamp, CB>, StreamCore); /// Create a new stream from a supplied interactive handle. /// @@ -93,19 +130,25 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore; + fn input_from(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore) { + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore { - let (output, registrar) = Tee::<::Timestamp, C>::new(); + fn new_input_with_builder(&mut self) -> (Handle<::Timestamp, CB>, StreamCore) { + let mut handle = Handle::new_with_builder(); + let stream = self.input_from(&mut handle); + (handle, stream) + } + + fn input_from(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore { + let (output, registrar) = Tee::<::Timestamp, CB::Container>::new(); let counter = Counter::new(output); let produced = counter.produced().clone(); @@ -174,16 +217,16 @@ impl Operate for Operator { /// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct Handle { +pub struct Handle { activate: Vec, progress: Vec>>>, - pushers: Vec>>, - buffer1: C, - buffer2: C, + pushers: Vec>>, + builder: CB, + buffer: CB::Container, now_at: T, } -impl Handle { +impl Handle> { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -216,8 +259,48 @@ impl Handle { activate: Vec::new(), progress: Vec::new(), pushers: Vec::new(), - buffer1: Default::default(), - buffer2: Default::default(), + builder: CapacityContainerBuilder::default(), + buffer: Default::default(), + now_at: T::minimum(), + } + } +} + +impl Handle { + /// Allocates a new input handle, from which one can create timely streams. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::core::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .container::>() + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn new_with_builder() -> Self { + Self { + activate: Vec::new(), + progress: Vec::new(), + pushers: Vec::new(), + builder: CB::default(), + buffer: Default::default(), now_at: T::minimum(), } } @@ -249,21 +332,21 @@ impl Handle { /// } /// }); /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> StreamCore + pub fn to_stream(&mut self, scope: &mut G) -> StreamCore where T: TotalOrder, - G: ScopeParent, + G: Scope, { scope.input_from(self) } fn register( &mut self, - pusher: Counter>, + pusher: Counter>, progress: Rc>>, ) { // flush current contents, so new registrant does not see existing data. - if !self.buffer1.is_empty() { self.flush(); } + self.flush(); // we need to produce an appropriate update to the capabilities for `progress`, in case a // user has decided to drive the handle around a bit before registering it. @@ -274,26 +357,48 @@ impl Handle { self.pushers.push(pusher); } - // flushes our buffer at each of the destinations. there can be more than one; clone if needed. - #[inline(never)] + /// Extract all ready contents from the builder and distribute to downstream operators. + #[inline] + fn extract(&mut self) { + while let Some(container) = self.builder.extract() { + Self::flush_container(container, &mut self.buffer, &mut self.pushers, &self.now_at); + } + } + + /// Flush all contents and distribute to downstream operators. + #[inline] fn flush(&mut self) { - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.clone_from(&self.buffer1); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - debug_assert!(self.buffer2.is_empty()); + while let Some(container) = self.builder.finish() { + Self::flush_container(container, &mut self.buffer, &mut self.pushers, &self.now_at); + } + } + + /// flushes our buffer at each of the destinations. there can be more than one; clone if needed. + /// Does not take `self` because `flush` and `extract` borrow `self` mutably. + #[inline(never)] + fn flush_container( + container: &mut CB::Container, + buffer: &mut CB::Container, + pushers: &mut [Counter>], + now_at: &T + ) { + for index in 0 .. pushers.len() { + if index < pushers.len() - 1 { + buffer.clone_from(container); + Message::push_at(buffer, now_at.clone(), &mut pushers[index]); + debug_assert!(buffer.is_empty()); } else { - Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]); - debug_assert!(self.buffer1.is_empty()); + Message::push_at(container, now_at.clone(), &mut pushers[index]); + debug_assert!(container.is_empty()); } } - self.buffer1.clear(); + container.clear(); } // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. fn close_epoch(&mut self) { - if !self.buffer1.is_empty() { self.flush(); } + self.flush(); for pusher in self.pushers.iter_mut() { pusher.done(); } @@ -334,25 +439,11 @@ impl Handle { /// } /// }); /// ``` - pub fn send_batch(&mut self, buffer: &mut C) { - + pub fn send_batch(&mut self, buffer: &mut CB::Container) { if !buffer.is_empty() { // flush buffered elements to ensure local fifo. - if !self.buffer1.is_empty() { self.flush(); } - - // push buffer (or clone of buffer) at each destination. - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.clone_from(&buffer); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - assert!(self.buffer2.is_empty()); - } - else { - Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); - assert!(buffer.is_empty()); - } - } - buffer.clear(); + self.flush(); + Self::flush_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at); } } @@ -390,8 +481,19 @@ impl Handle { } } -impl Handle { +impl PushInto for Handle +where + T: Timestamp, + CB: ContainerBuilder + PushInto, +{ #[inline] + fn push_into(&mut self, item: D) { + self.builder.push_into(item); + self.extract(); + } +} + +impl Handle { /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. /// /// # Examples @@ -419,21 +521,19 @@ impl Handle { /// } /// }); /// ``` - pub fn send(&mut self, data: D) where C: PushInto { - self.buffer1.push(data); - if self.buffer1.len() == self.buffer1.capacity() { - self.flush(); - } + #[inline] + pub fn send(&mut self, data: D) where CB: PushInto { + self.push_into(data) } } -impl Default for Handle { +impl Default for Handle { fn default() -> Self { - Self::new() + Self::new_with_builder() } } -impl Drop for Handle { +impl Drop for Handle { fn drop(&mut self) { self.close_epoch(); } diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 19cba0afd..5074014bd 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -28,6 +28,6 @@ pub use inspect::{Inspect, InspectCore}; pub use map::Map; pub use ok_err::OkErr; pub use probe::Probe; -pub use to_stream::ToStream; +pub use to_stream::{ToStream, ToStreamBuilder}; pub use reclock::Reclock; pub use unordered_input::{UnorderedInput, UnorderedHandle}; diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index c00108a7c..34403f39c 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -1,5 +1,6 @@ //! Shared containers +use crate::container::BufferingContainerBuilder; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::Operator; use crate::dataflow::{Scope, StreamCore}; @@ -27,13 +28,13 @@ pub trait SharedStream { impl SharedStream for StreamCore { fn shared(&self) -> StreamCore> { let mut container = Default::default(); - self.unary(Pipeline, "Shared", move |_, _| { + self.unary::, _, _, _>(Pipeline, "Shared", move |_, _| { move |input, output| { input.for_each(|time, data| { data.swap(&mut container); output - .session(&time) - .give_container(&mut Rc::new(std::mem::take(&mut container))); + .session_with_builder(&time) + .give(&mut Rc::new(std::mem::take(&mut container))); }); } }) diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index dcb9aa684..c37092ea5 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,35 +1,40 @@ //! Conversion to the `StreamCore` type from iterators. -use crate::container::{SizableContainer, PushInto}; +use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto}; use crate::Container; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::{StreamCore, Scope}; -/// Converts to a timely [StreamCore]. -pub trait ToStream { - /// Converts to a timely [StreamCore]. +/// Converts to a timely [StreamCore], using a container builder. +pub trait ToStreamBuilder { + /// Converts to a timely [StreamCore], using the supplied container builder type. /// /// # Examples /// /// ``` - /// use timely::dataflow::operators::core::{ToStream, Capture}; + /// use timely::dataflow::operators::core::{ToStreamBuilder, Capture}; /// use timely::dataflow::operators::core::capture::Extract; + /// use timely::container::CapacityContainerBuilder; /// /// let (data1, data2) = timely::example(|scope| { - /// let data1 = (0..3).to_stream(scope).container::>().capture(); - /// let data2 = vec![0,1,2].to_stream(scope).container::>().capture(); + /// let data1 = ToStreamBuilder::>::to_stream_with_builder(0..3, scope) + /// .container::>() + /// .capture(); + /// let data2 = ToStreamBuilder::>::to_stream_with_builder(vec![0,1,2], scope) + /// .container::>() + /// .capture(); /// (data1, data2) /// }); /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream(self, scope: &mut S) -> StreamCore; + fn to_stream_with_builder(self, scope: &mut S) -> StreamCore; } -impl ToStream for I where C: PushInto { - fn to_stream(self, scope: &mut S) -> StreamCore { +impl ToStreamBuilder for I where CB: PushInto { + fn to_stream_with_builder(self, scope: &mut S) -> StreamCore { - source(scope, "ToStream", |capability, info| { + source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { // Acquire an activator, so that the operator can rescheduled itself. let activator = scope.activator_for(&info.address[..]); @@ -40,7 +45,7 @@ impl ToStream for I where C: Pu move |output| { if let Some(element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); + let mut session = output.session_with_builder(capability.as_ref().unwrap()); session.give(element); let n = 256 * crate::container::buffer::default_capacity::(); for element in iterator.by_ref().take(n - 1) { @@ -55,3 +60,31 @@ impl ToStream for I where C: Pu }) } } + +/// Converts to a timely [StreamCore]. Equivalent to [`ToStreamBuilder`] but +/// uses a [`CapacityContainerBuilder`]. +pub trait ToStream { + /// Converts to a timely [StreamCore]. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::operators::core::{ToStream, Capture}; + /// use timely::dataflow::operators::core::capture::Extract; + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = (0..3).to_stream(scope).container::>().capture(); + /// let data2 = vec![0,1,2].to_stream(scope).container::>().capture(); + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream(self, scope: &mut S) -> StreamCore; +} + +impl ToStream for I where C: PushInto { + fn to_stream(self, scope: &mut S) -> StreamCore { + ToStreamBuilder::>::to_stream_with_builder(self, scope) + } +} diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index d1e30280b..30d243ab4 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -1,6 +1,7 @@ //! Create new `Streams` connected to external inputs. use crate::Data; +use crate::container::CapacityContainerBuilder; use crate::dataflow::{Stream, ScopeParent, Scope}; use crate::dataflow::operators::core::{Input as InputCore}; @@ -93,4 +94,4 @@ impl Input for G where ::Timestamp: TotalOrder { } /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. -pub type Handle = crate::dataflow::operators::core::input::Handle>; +pub type Handle = crate::dataflow::operators::core::input::Handle>>;