From 95792c53a9d4f884d9c77a30a142b5c0fc90aa2c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 28 Nov 2023 20:14:14 -0500 Subject: [PATCH] Remove Container: Clone + 'static Remove the requirement that all Container implementations are Clone and 'static. This makes implementing types simpler that depend on Container, and requires us to explicitly mark various places as `Data` such that they comply with Timely's type requirements. Signed-off-by: Moritz Hoffmann --- container/src/columnation.rs | 4 +- container/src/lib.rs | 7 ++- timely/examples/rc.rs | 4 +- timely/src/dataflow/channels/pact.rs | 14 +++--- .../src/dataflow/channels/pushers/exchange.rs | 2 +- timely/src/dataflow/channels/pushers/tee.rs | 2 +- timely/src/dataflow/operators/branch.rs | 2 +- .../src/dataflow/operators/capture/capture.rs | 8 ++-- .../src/dataflow/operators/capture/replay.rs | 4 +- timely/src/dataflow/operators/concat.rs | 8 ++-- timely/src/dataflow/operators/enterleave.rs | 6 +-- timely/src/dataflow/operators/feedback.rs | 14 +++--- .../dataflow/operators/generic/builder_rc.rs | 6 +-- .../dataflow/operators/generic/operator.rs | 44 +++++++++---------- timely/src/dataflow/operators/input.rs | 14 +++--- timely/src/dataflow/operators/inspect.rs | 4 +- timely/src/dataflow/operators/probe.rs | 4 +- timely/src/dataflow/operators/rc.rs | 4 +- timely/src/dataflow/operators/reclock.rs | 8 ++-- timely/src/dataflow/operators/to_stream.rs | 2 +- .../src/dataflow/operators/unordered_input.rs | 8 ++-- timely/src/synchronization/sequence.rs | 4 +- 22 files changed, 86 insertions(+), 87 deletions(-) diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 54a00ca4e3..e131e7548f 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -295,7 +295,7 @@ mod container { use crate::columnation::{Columnation, TimelyStack}; - impl Container for TimelyStack { + impl Container for TimelyStack { type Item = T; fn len(&self) -> usize { @@ -315,7 +315,7 @@ mod container { } } - impl PushPartitioned for TimelyStack { + impl PushPartitioned for TimelyStack { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where I: FnMut(&Self::Item) -> usize, diff --git a/container/src/lib.rs b/container/src/lib.rs index eb9b3ce77d..69358524c7 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -15,8 +15,7 @@ pub mod columnation; /// We require the container to be cloneable to enable efficient copies when providing references /// of containers to operators. Care must be taken that the type's `clone_from` implementation /// is efficient (which is not necessarily the case when deriving `Clone`.) -/// TODO: Don't require `Container: Clone` -pub trait Container: Default + Clone + 'static { +pub trait Container: Default { /// The type of elements this container holds. type Item; @@ -40,7 +39,7 @@ pub trait Container: Default + Clone + 'static { fn clear(&mut self); } -impl Container for Vec { +impl Container for Vec { type Item = T; fn len(&self) -> usize { @@ -132,7 +131,7 @@ pub trait PushPartitioned: Container { F: FnMut(usize, &mut Self); } -impl PushPartitioned for Vec { +impl PushPartitioned for Vec { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where I: FnMut(&Self::Item) -> usize, diff --git a/timely/examples/rc.rs b/timely/examples/rc.rs index 18f6fe972e..f79ef74729 100644 --- a/timely/examples/rc.rs +++ b/timely/examples/rc.rs @@ -8,7 +8,7 @@ use abomonation::Abomonation; #[derive(Debug, Clone)] pub struct Test { - field: Rc, + _field: Rc, } impl Abomonation for Test { @@ -32,7 +32,7 @@ fn main() { // introduce data and watch! for round in 0..10 { - input.send(Test { field: Rc::new(round) } ); + input.send(Test { _field: Rc::new(round) } ); input.advance_to(round + 1); worker.step_while(|| probe.less_than(input.time())); } diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19f..969bac91fa 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -10,9 +10,9 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; use timely_container::PushPartitioned; -use crate::communication::{Push, Pull, Data}; +use crate::communication::{Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; -use crate::Container; +use crate::{Container, ExchangeData}; use crate::worker::AsWorker; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; @@ -33,14 +33,14 @@ pub trait ParallelizationContractCore { /// A `ParallelizationContractCore` specialized for `Vec` containers /// TODO: Use trait aliases once stable. -pub trait ParallelizationContract: ParallelizationContractCore> { } -impl>> ParallelizationContract for P { } +pub trait ParallelizationContract: ParallelizationContractCore> { } +impl>> ParallelizationContract for P { } /// A direct connection #[derive(Debug)] pub struct Pipeline; -impl ParallelizationContractCore for Pipeline { +impl ParallelizationContractCore for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { @@ -69,9 +69,9 @@ implu64+'static> ExchangeCore { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContractCore for ExchangeCore +implu64+'static> ParallelizationContractCore for ExchangeCore where - C: Data + Container + PushPartitioned, + C: ExchangeData + Container + PushPartitioned, { type Pusher = ExchangePusher>>>, F>; type Puller = LogPuller>>>; diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9ea271d310..f8f0b2f673 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -40,7 +40,7 @@ impl>, H: FnMut(&D) -> } } -impl>, H: FnMut(&D) -> u64> Push> for Exchange +impl>, H: FnMut(&D) -> u64> Push> for Exchange where C: PushPartitioned { diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 1ec05e4e75..b15aa9a6f8 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -20,7 +20,7 @@ pub struct TeeCore { /// [TeeCore] specialized to `Vec`-based container. pub type Tee = TeeCore>; -impl Push> for TeeCore { +impl Push> for TeeCore { #[inline] fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 70e087abde..b3dacac8bb 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -94,7 +94,7 @@ pub trait BranchWhen: Sized { fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } -impl BranchWhen for StreamCore { +impl BranchWhen for StreamCore { fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index c62b95417b..1720ef8214 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; -use crate::Container; +use crate::{Container, Data}; use crate::progress::ChangeBatch; use crate::progress::Timestamp; use super::{EventCore, EventPusherCore}; /// Capture a stream of timestamped data for later replay. -pub trait Capture { +pub trait Capture { /// Captures a stream of timestamped data for later replay. /// /// # Examples @@ -113,7 +113,7 @@ pub trait Capture { } } -impl Capture for StreamCore { +impl Capture for StreamCore { fn capture_into+'static>(&self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); @@ -142,7 +142,7 @@ impl Capture for StreamCore { RefOrMut::Ref(reference) => (&reference.time, RefOrMut::Ref(&reference.data)), RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)), }; - let vector = data.replace(Default::default()); + let vector = data.take(); event_pusher.push(EventCore::Messages(time.clone(), vector)); } input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 2cc3254196..44a5462ecd 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -46,7 +46,7 @@ use crate::progress::Timestamp; use super::EventCore; use super::event::EventIteratorCore; -use crate::Container; +use crate::{Container, Data}; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { @@ -62,7 +62,7 @@ pub trait Replay : Sized { fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore; } -impl Replay for I +impl Replay for I where I : IntoIterator, ::Item: EventIteratorCore+'static { fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore{ diff --git a/timely/src/dataflow/operators/concat.rs b/timely/src/dataflow/operators/concat.rs index 449609f5bd..e9f1179eb7 100644 --- a/timely/src/dataflow/operators/concat.rs +++ b/timely/src/dataflow/operators/concat.rs @@ -1,7 +1,7 @@ //! Merges the contents of multiple streams. -use crate::Container; +use crate::{Container, Data}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{StreamCore, Scope}; @@ -23,7 +23,7 @@ pub trait Concat { fn concat(&self, _: &StreamCore) -> StreamCore; } -impl Concat for StreamCore { +impl Concat for StreamCore { fn concat(&self, other: &StreamCore) -> StreamCore { self.scope().concatenate([self.clone(), other.clone()]) } @@ -52,7 +52,7 @@ pub trait Concatenate { I: IntoIterator>; } -impl Concatenate for StreamCore { +impl Concatenate for StreamCore { fn concatenate(&self, sources: I) -> StreamCore where I: IntoIterator> @@ -62,7 +62,7 @@ impl Concatenate for StreamCore { } } -impl Concatenate for G { +impl Concatenate for G { fn concatenate(&self, sources: I) -> StreamCore where I: IntoIterator> diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index 24c8fa12eb..42e521ab7a 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -133,7 +133,7 @@ pub trait Leave { fn leave(&self) -> StreamCore; } -impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, D> { +impl<'a, G: Scope, D: Container + Data, T: Timestamp+Refines> Leave for StreamCore, D> { fn leave(&self) -> StreamCore { let scope = self.scope(); @@ -160,14 +160,14 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave } -struct IngressNub, TData: Container> { +struct IngressNub, TData: Container + Data> { targets: CounterCore>, phantom: ::std::marker::PhantomData, activator: crate::scheduling::Activator, active: bool, } -impl, TData: Container> Push> for IngressNub { +impl, TData: Container + Data> Push> for IngressNub { fn push(&mut self, element: &mut Option>) { if let Some(message) = element { let outer_message = message.as_mut(); diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index a7eb90c658..76bf98a557 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -59,7 +59,7 @@ pub trait Feedback { /// .connect_loop(handle); /// }); /// ``` - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); + fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); } /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. @@ -87,7 +87,7 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>); + fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>); } impl Feedback for G { @@ -95,7 +95,7 @@ impl Feedback for G { self.feedback_core(summary) } - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { + fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); @@ -105,13 +105,13 @@ impl Feedback for G { } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>) { + fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>) { self.feedback_core(Product::new(Default::default(), summary)) } } /// Connect a `Stream` to the input of a loop variable. -pub trait ConnectLoop { +pub trait ConnectLoop { /// Connect a `Stream` to be the input of a loop variable. /// /// # Examples @@ -132,7 +132,7 @@ pub trait ConnectLoop { fn connect_loop(&self, _: HandleCore); } -impl ConnectLoop for StreamCore { +impl ConnectLoop for StreamCore { fn connect_loop(&self, helper: HandleCore) { let mut builder = helper.builder; @@ -159,7 +159,7 @@ impl ConnectLoop for StreamCore { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct HandleCore { +pub struct HandleCore { builder: OperatorBuilder, summary: ::Summary, output: OutputWrapper>, diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index ca80e31828..a2a06ab571 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -8,7 +8,7 @@ use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; -use crate::Container; +use crate::{Container, Data}; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::TeeCore; use crate::dataflow::channels::pushers::CounterCore as PushCounter; @@ -92,7 +92,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -105,7 +105,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { let (tee, stream) = self.builder.new_output_connection(connection.clone()); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index afb7a25d11..05c8e7db38 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -12,7 +12,7 @@ use crate::dataflow::{Scope, StreamCore}; use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator}; -use crate::Container; +use crate::{Container, Data}; /// Methods to construct generic streaming and blocking operators. pub trait Operator { @@ -57,7 +57,7 @@ pub trait Operator { /// ``` fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + D2: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut OutputHandleCore>)+'static, @@ -92,7 +92,7 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_notify, &mut OutputHandleCore>, &mut Notificator)+'static, @@ -129,7 +129,7 @@ pub trait Operator { /// ``` fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + D2: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut OutputHandleCore>)+'static, @@ -187,8 +187,8 @@ pub trait Operator { /// ``` fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + D2: Container + Data, + D3: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, @@ -241,8 +241,8 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_notify, &mut InputHandleCore, &mut OutputHandleCore>, @@ -287,8 +287,8 @@ pub trait Operator { /// ``` fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + D2: Container + Data, + D3: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, @@ -325,11 +325,11 @@ pub trait Operator { P: ParallelizationContractCore; } -impl Operator for StreamCore { +impl Operator for StreamCore { fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + D2: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut OutputHandleCore>)+'static, @@ -355,7 +355,7 @@ impl Operator for StreamCore { stream } - fn unary_notify, &mut OutputHandleCore>, &mut Notificator)+'static, @@ -379,7 +379,7 @@ impl Operator for StreamCore { fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + D2: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut OutputHandleCore>)+'static, @@ -407,8 +407,8 @@ impl Operator for StreamCore { fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + D2: Container + Data, + D3: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, @@ -438,8 +438,8 @@ impl Operator for StreamCore { stream } - fn binary_notify, &mut InputHandleCore, &mut OutputHandleCore>, @@ -467,8 +467,8 @@ impl Operator for StreamCore { fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + D2: Container + Data, + D3: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, @@ -557,7 +557,7 @@ impl Operator for StreamCore { /// ``` pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore where - D: Container, + D: Container + Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut OutputHandleCore>)+'static { @@ -599,7 +599,7 @@ where /// /// }); /// ``` -pub fn empty(scope: &G) -> StreamCore { +pub fn empty(scope: &G) -> StreamCore { source(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 72719c5f85..a9d928b03b 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -93,7 +93,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); + fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); /// Create a new stream from a supplied interactive handle. /// @@ -157,7 +157,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; + fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; } use crate::order::TotalOrder; @@ -170,13 +170,13 @@ impl Input for G where ::Timestamp: TotalOrder { self.input_from_core(handle) } - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore) { + fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore) { let mut handle = HandleCore::new(); let stream = self.input_from_core(&mut handle); (handle, stream) } - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { + fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { let (output, registrar) = TeeCore::<::Timestamp, D>::new(); let counter = CounterCore::new(output); let produced = counter.produced().clone(); @@ -246,7 +246,7 @@ impl Operate for Operator { /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct HandleCore { +pub struct HandleCore { activate: Vec, progress: Vec>>>, pushers: Vec>>, @@ -258,7 +258,7 @@ pub struct HandleCore { /// A handle specialized to vector-based containers. pub type Handle = HandleCore>; -impl HandleCore { +impl HandleCore { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -506,7 +506,7 @@ impl Default for Handle { } } -impl Drop for HandleCore { +impl Drop for HandleCore { fn drop(&mut self) { self.close_epoch(); } diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/inspect.rs index 6e26856d18..fcbe72923b 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/inspect.rs @@ -99,7 +99,7 @@ impl Inspect> for StreamCore Inspect> for StreamCore> +impl Inspect> for StreamCore> where C: AsRef<[C::Item]> { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static { @@ -131,7 +131,7 @@ pub trait InspectCore { fn inspect_container(&self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl InspectCore for StreamCore { +impl InspectCore for StreamCore { fn inspect_container(&self, mut func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567ec..2208680bd2 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -13,7 +13,7 @@ use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::dataflow::{StreamCore, Scope}; -use crate::Container; +use crate::{Container, Data}; /// Monitors progress at a `Stream`. pub trait Probe { @@ -79,7 +79,7 @@ pub trait Probe { fn probe_with(&self, handle: &mut Handle) -> StreamCore; } -impl Probe for StreamCore { +impl Probe for StreamCore { fn probe(&self) -> Handle { // the frontier is shared state; scope updates, handle reads. diff --git a/timely/src/dataflow/operators/rc.rs b/timely/src/dataflow/operators/rc.rs index eaae55093e..e2be883814 100644 --- a/timely/src/dataflow/operators/rc.rs +++ b/timely/src/dataflow/operators/rc.rs @@ -3,7 +3,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::Operator; use crate::dataflow::{Scope, StreamCore}; -use crate::Container; +use crate::{Container, Data}; use std::rc::Rc; /// Convert a stream into a stream of shared containers @@ -24,7 +24,7 @@ pub trait SharedStream { fn shared(&self) -> StreamCore>; } -impl SharedStream for StreamCore { +impl SharedStream for StreamCore { fn shared(&self) -> StreamCore> { let mut container = Default::default(); self.unary(Pipeline, "Shared", move |_, _| { diff --git a/timely/src/dataflow/operators/reclock.rs b/timely/src/dataflow/operators/reclock.rs index b656e0aaf8..20562c9988 100644 --- a/timely/src/dataflow/operators/reclock.rs +++ b/timely/src/dataflow/operators/reclock.rs @@ -1,6 +1,6 @@ //! Extension methods for `Stream` based on record-by-record transformation. -use crate::Container; +use crate::{Container, Data}; use crate::order::PartialOrder; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock>(&self, clock: &StreamCore) -> Self; + fn reclock + Data>(&self, clock: &StreamCore) -> Self; } -impl Reclock for StreamCore { - fn reclock>(&self, clock: &StreamCore) -> StreamCore { +impl Reclock for StreamCore { + fn reclock + Data>(&self, clock: &StreamCore) -> StreamCore { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 597ec9163c..6d37e9fab5 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -83,7 +83,7 @@ pub trait ToStreamCore { fn to_stream_core>(self, scope: &mut S) -> StreamCore; } -impl ToStreamCore for I where I::Item: Container { +impl ToStreamCore for I where I::Item: Container + Data { fn to_stream_core>(self, scope: &mut S) -> StreamCore { source(scope, "ToStreamCore", |capability, info| { diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index c7e6002341..805d54c085 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -142,12 +142,12 @@ pub trait UnorderedInputCore { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); + fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); } impl UnorderedInputCore for G { - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { + fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { let (output, registrar) = TeeCore::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); @@ -215,11 +215,11 @@ impl Operate for UnorderedOperator { /// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct UnorderedHandleCore { +pub struct UnorderedHandleCore { buffer: PushBuffer>>, } -impl UnorderedHandleCore { +impl UnorderedHandleCore { fn new(pusher: PushCounter>) -> UnorderedHandleCore { UnorderedHandleCore { buffer: PushBuffer::new(pusher), diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 7525d33892..320a3dcb87 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -157,7 +157,7 @@ impl Sequencer { } let mut activator_borrow = activator_source.borrow_mut(); - let mut activator = activator_borrow.as_mut().unwrap(); + let activator = activator_borrow.as_mut().unwrap(); if let Some(t) = activator.catchup_until { if capability.time().less_than(&t) { @@ -190,7 +190,7 @@ impl Sequencer { if let Some(last) = recvd.last() { let mut activator_borrow = activator_sink.borrow_mut(); - let mut activator = activator_borrow.as_mut().unwrap(); + let activator = activator_borrow.as_mut().unwrap(); activator.catchup_until = Some((last.0).0); activator.activate();