Skip to content

Commit

Permalink
More refining
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 12, 2024
1 parent c634ee5 commit 4b18162
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
5 changes: 4 additions & 1 deletion container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub trait PushInto<T> {
/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
/// to drain pending or finished data.
///
/// The caller is responsible to fully consume the containers returned by [`Self::extract`] and
/// The caller should consume the containers returned by [`Self::extract`] and
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
/// any remaining elements.
///
Expand All @@ -109,9 +109,11 @@ pub trait ContainerBuilder: Default + 'static {
/// be called repeatedly, for example while the caller can send data.
///
/// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
#[must_use]
fn extract(&mut self) -> Option<&mut Self::Container>;
/// Extract assembled containers and any unfinished data. Should
/// be called repeatedly until it returns `None`.
#[must_use]
fn finish(&mut self) -> Option<&mut Self::Container>;
}

Expand All @@ -132,6 +134,7 @@ pub struct CapacityContainerBuilder<C>{
}

impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
#[inline]
fn push_into(&mut self, item: T) {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
Expand Down
7 changes: 4 additions & 3 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {

/// Extract all ready contents from the builder and distribute to downstream operators.
#[inline]
fn extract(&mut self) {
fn extract_and_send(&mut self) {
while let Some(container) = self.builder.extract() {
Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
}
Expand All @@ -378,6 +378,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
/// Does not take `self` because `flush` and `extract` borrow `self` mutably.
/// Clears the container.
// TODO: Find a better name for this function.
#[inline]
fn send_container(
container: &mut CB::Container,
buffer: &mut CB::Container,
Expand All @@ -396,7 +397,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
container.clear();
}

/// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
/// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
// TODO: Find a better name for this function.
fn close_epoch(&mut self) {
self.flush();
Expand Down Expand Up @@ -490,7 +491,7 @@ where
#[inline]
fn push_into(&mut self, item: D) {
self.builder.push_into(item);
self.extract();
self.extract_and_send();
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ pub trait ToStream<D: Data> {

impl<I: IntoIterator+'static> ToStream<I::Item> for I where I::Item: Data {
fn to_stream<S: Scope>(self, scope: &mut S) -> Stream<S, I::Item> {
ToStreamCore::<_>::to_stream(self, scope)
ToStreamCore::to_stream(self, scope)
}
}

0 comments on commit 4b18162

Please sign in to comment.