diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 7e63434f3..15e6eb69e 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -235,6 +235,11 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); self.push_buffer.session_with_builder(cap.time()) } + + /// Flushes all pending data and indicate that no more data immediately follows. + pub fn cease(&mut self) { + self.push_buffer.cease(); + } } impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { @@ -264,11 +269,6 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, pub fn session<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder, PushCounter> where 'a: 'b { self.session_with_builder(cap) } - - /// Flushes all pending data and indicate that no more data immediately follows. - pub fn cease(&mut self) { - self.push_buffer.cease(); - } } impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> Drop for OutputHandleCore<'a, T, CB, P> {