diff --git a/container/src/lib.rs b/container/src/lib.rs index 83b3c0e09..a86c09b5a 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -93,7 +93,7 @@ pub trait PushInto { /// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions /// to drain pending or finished data. /// -/// The caller is responsible to fully consume the containers returned by [`Self::extract`] and +/// The caller should consume the containers returned by [`Self::extract`] and /// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear /// any remaining elements. /// @@ -109,9 +109,11 @@ pub trait ContainerBuilder: Default + 'static { /// be called repeatedly, for example while the caller can send data. /// /// Returns a `Some` if there is data ready to be shipped, and `None` otherwise. + #[must_use] fn extract(&mut self) -> Option<&mut Self::Container>; /// Extract assembled containers and any unfinished data. Should /// be called repeatedly until it returns `None`. + #[must_use] fn finish(&mut self) -> Option<&mut Self::Container>; } @@ -132,6 +134,7 @@ pub struct CapacityContainerBuilder{ } impl> PushInto for CapacityContainerBuilder { + #[inline] fn push_into(&mut self, item: T) { if self.current.capacity() == 0 { self.current = self.empty.take().unwrap_or_default(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 298cd9c4b..68553fadc 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -360,7 +360,7 @@ impl Handle { /// Extract all ready contents from the builder and distribute to downstream operators. #[inline] - fn extract(&mut self) { + fn extract_and_send(&mut self) { while let Some(container) = self.builder.extract() { Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at); } @@ -378,6 +378,7 @@ impl Handle { /// Does not take `self` because `flush` and `extract` borrow `self` mutably. /// Clears the container. // TODO: Find a better name for this function. + #[inline] fn send_container( container: &mut CB::Container, buffer: &mut CB::Container, @@ -396,7 +397,7 @@ impl Handle { container.clear(); } - /// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. + /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. // TODO: Find a better name for this function. fn close_epoch(&mut self) { self.flush(); @@ -490,7 +491,7 @@ where #[inline] fn push_into(&mut self, item: D) { self.builder.push_into(item); - self.extract(); + self.extract_and_send(); } } diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 5e062f194..a85677dc0 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -27,6 +27,6 @@ pub trait ToStream { impl ToStream for I where I::Item: Data { fn to_stream(self, scope: &mut S) -> Stream { - ToStreamCore::<_>::to_stream(self, scope) + ToStreamCore::to_stream(self, scope) } }