Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 12, 2024
1 parent e639eb1 commit c634ee5
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 97 deletions.
91 changes: 26 additions & 65 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ pub trait Container: Default + Clone + 'static {
fn drain(&mut self) -> Self::DrainIter<'_>;
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
}

/// A container that can absorb items of a specific type.
pub trait PushInto<T> {
/// Push item into self.
Expand All @@ -80,7 +90,12 @@ pub trait PushInto<T> {
///
/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
/// needs to produce all outputs, even partial ones.
/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
/// to drain pending or finished data.
///
/// The caller is responsible to fully consume the containers returned by [`Self::extract`] and
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
/// any remaining elements.
///
/// For example, a consolidating builder can aggregate differences in-place, but it has
/// to ensure that it preserves the intended information.
Expand All @@ -90,8 +105,10 @@ pub trait PushInto<T> {
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container;
/// Extract assembled containers, potentially leaving unfinished data behind. Should
/// be called repeatedly until it returns `None`.
/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
/// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
fn extract(&mut self) -> Option<&mut Self::Container>;
/// Extract assembled containers and any unfinished data. Should
/// be called repeatedly until it returns `None`.
Expand All @@ -100,6 +117,9 @@ pub trait ContainerBuilder: Default + 'static {

/// A default container builder that uses length and preferred capacity to chunk data.
///
/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
/// across [`Self::finish`] to maintain a low memory footprint.
///
/// Maintains FIFO order.
#[derive(Default, Debug)]
pub struct CapacityContainerBuilder<C>{
Expand All @@ -111,16 +131,6 @@ pub struct CapacityContainerBuilder<C>{
pending: VecDeque<C>,
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
}

impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
fn push_into(&mut self, item: T) {
if self.current.capacity() == 0 {
Expand Down Expand Up @@ -162,10 +172,11 @@ impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {

#[inline]
fn finish(&mut self) -> Option<&mut C> {
if self.current.len() > 0 {
if !self.current.is_empty() {
self.pending.push_back(std::mem::take(&mut self.current));
}
self.extract()
self.empty = self.pending.pop_front();
self.empty.as_mut()
}
}

Expand All @@ -190,56 +201,6 @@ impl<C: Container> CapacityContainerBuilder<C> {
}
}

/// A container builder that absorbs entire containers. Maintains FIFO order.
pub struct BufferingContainerBuilder<C> {
/// Container that we're extracting.
current: Option<C>,
/// Completed containers pending to be sent.
pending: VecDeque<C>,
}

impl<C> Default for BufferingContainerBuilder<C> {
#[inline]
fn default() -> Self {
Self {
current: None,
pending: VecDeque::default(),
}
}
}

impl<C: Container> ContainerBuilder for BufferingContainerBuilder<C> {
type Container = C;

#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(container) = self.pending.pop_front() {
self.current = Some(container);
self.current.as_mut()
} else {
None
}
}

#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
self.extract()
}
}

impl<C: Container> PushInto<&mut C> for BufferingContainerBuilder<C> {
#[inline]
fn push_into(&mut self, item: &mut C) {
if !item.is_empty() {
// Grab the last returned container, or a default one, to pass back to the caller
let mut empty = self.current.take().unwrap_or_default();
empty.clear();
let container = std::mem::replace(item, empty);
self.pending.push_back(container);
}
}
}

impl<T: Clone + 'static> Container for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<T, C: Container> Message<T, C> {
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
/// leaves in place, or the container's default element. The buffer is cleared.
#[inline]
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

Expand Down
14 changes: 5 additions & 9 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<
self.autoflush_session_with_builder(cap)
}

/// Gives an entire container at the current time. Only provided for
/// buffers of [`CapacityContainerBuilder`]s. Other container builders
/// should use [`PushInto`] instead.
/// Gives an entire container at the current time.
fn give_container(&mut self, container: &mut C) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract();
self.extract_and_send();
}
}
}
Expand Down Expand Up @@ -96,7 +94,7 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P

/// Extract pending data from the builder, but not forcing a flush.
#[inline]
fn extract(&mut self) {
fn extract_and_send(&mut self) {
while let Some(container) = self.builder.extract() {
let time = self.time.as_ref().unwrap().clone();
Message::push_at(container, time, &mut self.pusher);
Expand All @@ -122,7 +120,7 @@ where
#[inline]
fn push_into(&mut self, item: D) {
self.builder.push_into(item);
self.extract();
self.extract_and_send();
}
}

Expand All @@ -140,9 +138,7 @@ where
T: Eq + Clone + 'a,
P: Push<Bundle<T, C>> + 'a,
{
/// Provide a container at the time specified by the [Session]. Only provided for
/// buffers of [`CapacityContainerBuilder`]s. Other container builders
/// should use [`PushInto`] instead.
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut C) {
self.buffer.give_container(container)
}
Expand Down
27 changes: 14 additions & 13 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ pub trait Input : Scope {
/// use std::rc::Rc;
/// use timely::*;
/// use timely::dataflow::operators::core::{Input, Inspect};
/// use timely::container::BufferingContainerBuilder;
/// use timely::container::CapacityContainerBuilder;
///
/// // construct and execute a timely dataflow
/// timely::execute(Config::thread(), |worker| {
///
/// // add an input and base computation off of it
/// let mut input = worker.dataflow(|scope| {
/// let (input, stream) = scope.new_input_with_builder::<BufferingContainerBuilder<Rc<Vec<_>>>>();
/// let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
/// stream.inspect(|x| println!("hello {:?}", x));
/// input
/// });
///
/// // introduce input, advance computation
/// for round in 0..10 {
/// input.send(&mut Rc::new(vec![round]));
/// input.send_batch(&mut Rc::new(vec![round]));
/// input.advance_to(round + 1);
/// worker.step();
/// }
Expand Down Expand Up @@ -274,12 +274,13 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
/// use timely::*;
/// use timely::dataflow::operators::core::{Input, Inspect};
/// use timely::dataflow::operators::core::input::Handle;
/// use timely_container::CapacityContainerBuilder;
///
/// // construct and execute a timely dataflow
/// timely::execute(Config::thread(), |worker| {
///
/// // add an input and base computation off of it
/// let mut input = Handle::new();
/// let mut input = Handle::<_, CapacityContainerBuilder<_>>::new_with_builder();
/// worker.dataflow(|scope| {
/// scope.input_from(&mut input)
/// .container::<Vec<_>>()
Expand Down Expand Up @@ -361,22 +362,23 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
#[inline]
fn extract(&mut self) {
while let Some(container) = self.builder.extract() {
Self::flush_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
}
}

/// Flush all contents and distribute to downstream operators.
#[inline]
fn flush(&mut self) {
while let Some(container) = self.builder.finish() {
Self::flush_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
}
}

/// flushes our buffer at each of the destinations. there can be more than one; clone if needed.
/// Sends a container at each of the destinations. There can be more than one; clone if needed.
/// Does not take `self` because `flush` and `extract` borrow `self` mutably.
#[inline(never)]
fn flush_container(
/// Clears the container.
// TODO: Find a better name for this function.
fn send_container(
container: &mut CB::Container,
buffer: &mut CB::Container,
pushers: &mut [Counter<T, CB::Container, Tee<T, CB::Container>>],
Expand All @@ -386,17 +388,16 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
if index < pushers.len() - 1 {
buffer.clone_from(container);
Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
debug_assert!(buffer.is_empty());
}
else {
Message::push_at(container, now_at.clone(), &mut pushers[index]);
debug_assert!(container.is_empty());
}
}
container.clear();
}

// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
/// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
// TODO: Find a better name for this function.
fn close_epoch(&mut self) {
self.flush();
for pusher in self.pushers.iter_mut() {
Expand Down Expand Up @@ -443,7 +444,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
if !buffer.is_empty() {
// flush buffered elements to ensure local fifo.
self.flush();
Self::flush_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
}
}

Expand Down
7 changes: 3 additions & 4 deletions timely/src/dataflow/operators/core/rc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Shared containers

use crate::container::BufferingContainerBuilder;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::Operator;
use crate::dataflow::{Scope, StreamCore};
Expand Down Expand Up @@ -28,13 +27,13 @@ pub trait SharedStream<S: Scope, C: Container> {
impl<S: Scope, C: Container> SharedStream<S, C> for StreamCore<S, C> {
fn shared(&self) -> StreamCore<S, Rc<C>> {
let mut container = Default::default();
self.unary::<BufferingContainerBuilder<_>, _, _, _>(Pipeline, "Shared", move |_, _| {
self.unary(Pipeline, "Shared", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut container);
output
.session_with_builder(&time)
.give(&mut Rc::new(std::mem::take(&mut container)));
.session(&time)
.give_container(&mut Rc::new(std::mem::take(&mut container)));
});
}
})
Expand Down
4 changes: 1 addition & 3 deletions timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I wh
let mut session = output.session_with_builder(capability.as_ref().unwrap());
session.give(element);
let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
for element in iterator.by_ref().take(n - 1) {
session.give(element);
}
session.give_iterator(iterator.by_ref().take(n - 1));
activator.activate();
}
else {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ pub trait ToStream<D: Data> {

impl<I: IntoIterator+'static> ToStream<I::Item> for I where I::Item: Data {
fn to_stream<S: Scope>(self, scope: &mut S) -> Stream<S, I::Item> {
ToStreamCore::to_stream(self, scope)
ToStreamCore::<_>::to_stream(self, scope)
}
}
}

0 comments on commit c634ee5

Please sign in to comment.