Skip to content

Commit

Permalink
Rework container builder to use push into
Browse files Browse the repository at this point in the history
Previously, `ContainerBuilder` had `push` and `push_container` functions,
which were odd in the presence of the `PushInto` trait. This change removes
the functions and instead relies on a `PushInto` implementation. As a
bonus, this removes several `SizableContainer` bounds, which are now
up to the caller to enforce should they push into a capacity-based
container builder.

Specifically, it adds the following new types or APIs:
* ContainerBuilder: remove `push` and `push_container`. Replaces the fromer
  with a `PushInto` implementation, and moves the latter into a function
  on `CapacityContainerBuilder`. All uses of `give_container` are now
  special-cased to said builder. Other builders can accept containers
  through their `PushInto` implementation.
* A `BufferingContainerBuilder` that only accepts complete buffers. Could
  back out that change because it's similar (but not equal!) to the
  capacity-based variant.
* core::Input learns `new_input_with_buider` that allows the user to
  specify a customer builder. Existing APIs should function unchanged and
  use the capacity-based builder.
* core::SharedStream uses the buffering container builder.
* core::to_stream gains a `ToStreamBuilder` trait that allows the user to
  specify the builder. `ToStream` uses that but fixes the builder to the
  capacity-based variant.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 7, 2024
1 parent 83a32d5 commit c3ce08b
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 129 deletions.
108 changes: 79 additions & 29 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait Container: Default + Clone + 'static {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the continer.
/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Push `item` into self
Expand Down Expand Up @@ -75,6 +75,9 @@ pub trait PushInto<T> {
/// chunked into individual containers, but is free to change the data representation to
/// better fit the properties of the container.
///
/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
/// that users can push the expected item types.
///
/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
/// needs to produce all outputs, even partial ones.
Expand All @@ -83,23 +86,15 @@ pub trait PushInto<T> {
/// to ensure that it preserves the intended information.
///
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
/// decide to represent a `push`/`push_container` order for `extract` and `finish`, or not.
// TODO: Consider adding `push_iterator` to receive an iterator of data.
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container;
/// Add an item to a container.
///
/// The restriction to [`SizeableContainer`] only exists so that types
/// relying on [`CapacityContainerBuilder`] only need to constrain their container
/// to [`Container`] instead of [`SizableContainer`], which otherwise would be a pervasive
/// requirement.
fn push<T>(&mut self, item: T) where Self::Container: SizableContainer + PushInto<T>;
/// Push a pre-built container.
fn push_container(&mut self, container: &mut Self::Container);
/// Extract assembled containers, potentially leaving unfinished data behind.
/// Extract assembled containers, potentially leaving unfinished data behind. Should
/// be called repeatedly until it returns `None`.
fn extract(&mut self) -> Option<&mut Self::Container>;
/// Extract assembled containers and any unfinished data.
/// Extract assembled containers and any unfinished data. Should
/// be called repeatedly until it returns `None`.
fn finish(&mut self) -> Option<&mut Self::Container>;
}

Expand All @@ -126,11 +121,8 @@ pub trait SizableContainer: Container {
fn reserve(&mut self, additional: usize);
}

impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
fn push<T>(&mut self, item: T) where C: SizableContainer + PushInto<T> {
impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
fn push_into(&mut self, item: T) {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
// Discard any non-uniform capacity container.
Expand All @@ -153,9 +145,35 @@ impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
self.pending.push_back(std::mem::take(&mut self.current));
}
}
}

impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
fn extract(&mut self) -> Option<&mut C> {
if let Some(container) = self.pending.pop_front() {
self.empty = Some(container);
self.empty.as_mut()
} else {
None
}
}

#[inline]
fn push_container(&mut self, container: &mut Self::Container) {
fn finish(&mut self) -> Option<&mut C> {
if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}
self.extract()
}
}

impl<C: Container> CapacityContainerBuilder<C> {
/// Push a pre-formed container at this builder. This exists to maintain
/// API compatibility.
#[inline]
pub fn push_container(&mut self, container: &mut C) {
if !container.is_empty() {
// Flush to maintain FIFO ordering.
if self.current.len() > 0 {
Expand All @@ -170,26 +188,58 @@ impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
self.pending.push_back(std::mem::replace(container, empty));
}
}
}

/// A container builder that absorbs entire containers. Maintains FIFO order.
pub struct BufferingContainerBuilder<C> {
/// Container that we're extracting.
current: Option<C>,
/// Completed containers pending to be sent.
pending: VecDeque<C>,
}

impl<C> Default for BufferingContainerBuilder<C> {
#[inline]
fn extract(&mut self) -> Option<&mut C> {
fn default() -> Self {
Self {
current: None,
pending: VecDeque::default(),
}
}
}

impl<C: Container> ContainerBuilder for BufferingContainerBuilder<C> {
type Container = C;

#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(container) = self.pending.pop_front() {
self.empty = Some(container);
self.empty.as_mut()
self.current = Some(container);
self.current.as_mut()
} else {
None
}
}

#[inline]
fn finish(&mut self) -> Option<&mut C> {
if self.current.len() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}
fn finish(&mut self) -> Option<&mut Self::Container> {
self.extract()
}
}

impl<C: Container> PushInto<&mut C> for BufferingContainerBuilder<C> {
#[inline]
fn push_into(&mut self, item: &mut C) {
if !item.is_empty() {
// Grab the last returned container, or a default one, to pass back to the caller
let mut empty = self.current.take().unwrap_or_default();
empty.clear();
let container = std::mem::replace(item, empty);
self.pending.push_back(container);
}
}
}

impl<T: Clone + 'static> Container for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;
Expand Down Expand Up @@ -347,7 +397,7 @@ pub trait PushPartitioned: SizableContainer {
F: FnMut(usize, &mut Self);
}

impl<T: SizableContainer> PushPartitioned for T where for<'a> T: PushInto<T::Item<'a>> {
impl<C: SizableContainer> PushPartitioned for C where for<'a> C: PushInto<C::Item<'a>> {
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
for<'a> I: FnMut(&Self::Item<'a>) -> usize,
Expand Down Expand Up @@ -383,7 +433,7 @@ pub mod buffer {
/// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
/// and 1, inclusively.
pub const fn default_capacity<T>() -> usize {
let size = ::std::mem::size_of::<T>();
let size = std::mem::size_of::<T>();
if size == 0 {
BUFFER_SIZE_BYTES
} else if size <= BUFFER_SIZE_BYTES {
Expand Down
66 changes: 37 additions & 29 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! with the performance of batched sends.

use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer, PushInto};
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
Expand Down Expand Up @@ -56,6 +56,16 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<
pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<T, CapacityContainerBuilder<C>, P> where T: Timestamp {
self.autoflush_session_with_builder(cap)
}

/// Gives an entire container at the current time. Only provided for
/// buffers of [`CapacityContainerBuilder`]s. Other container builders
/// should use [`PushInto`] instead.
fn give_container(&mut self, container: &mut C) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract();
}
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
Expand Down Expand Up @@ -101,25 +111,18 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
Message::push_at(container, time, &mut self.pusher);
}
}

/// Gives an entire container at the current time.
fn give_container(&mut self, container: &mut CB::Container) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract();
}
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P>
impl<T, CB, P> Buffer<T, CB, P>
where
T: Eq+Clone,
CB::Container: SizableContainer,
CB: ContainerBuilder,
P: Push<Bundle<T, CB::Container>>
{
// Push a single item into the builder. Internal method for use by `Session`.
#[inline]
fn give<D>(&mut self, data: D) where CB::Container: PushInto<D> {
self.builder.push(data);
fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.builder.push_into(data);
self.extract();
}
}
Expand All @@ -133,33 +136,34 @@ pub struct Session<'a, T, CB, P> {
buffer: &'a mut Buffer<T, CB, P>,
}

impl<'a, T, CB, P> Session<'a, T, CB, P>
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
P: Push<Bundle<T, C>> + 'a,
{
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut CB::Container) {
/// Provide a container at the time specified by the [Session]. Only provided for
/// buffers of [`CapacityContainerBuilder`]s. Other container builders
/// should use [`PushInto`] instead.
pub fn give_container(&mut self, container: &mut C) {
self.buffer.give_container(container)
}
}

impl<'a, T, CB, P> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
{
/// Access the builder. Immutable access to prevent races with flushing
/// the underlying buffer.
pub fn builder(&self) -> &CB {
self.buffer.builder()
}
}

impl<'a, T, CB, P: Push<Bundle<T, CB::Container>>+'a> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
CB::Container: SizableContainer,
{
/// Provides one record at the time specified by the `Session`.
#[inline]
pub fn give<D>(&mut self, data: D) where CB::Container: PushInto<D> {
pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.buffer.give(data);
}

Expand All @@ -168,7 +172,7 @@ where
pub fn give_iterator<I>(&mut self, iter: I)
where
I: Iterator,
CB::Container: PushInto<I::Item>,
CB: PushInto<I::Item>,
{
for item in iter {
self.give(item);
Expand Down Expand Up @@ -197,15 +201,19 @@ where
{
/// Transmits a single record.
#[inline]
pub fn give<D>(&mut self, data: D) where CB::Container: SizableContainer + PushInto<D> {
pub fn give<D>(&mut self, data: D)
where
CB: PushInto<D>,
{
self.buffer.give(data);
}

/// Transmits records produced by an iterator.
#[inline]
pub fn give_iterator<I, D>(&mut self, iter: I)
where
I: Iterator<Item=D>,
CB::Container: SizableContainer + PushInto<D>,
CB: PushInto<D>,
{
for item in iter {
self.give(item);
Expand Down
Loading

0 comments on commit c3ce08b

Please sign in to comment.