From 03692226f48e9ea8e7eaaf45dee3fffdffb6232d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 3 Feb 2024 21:37:54 -0500 Subject: [PATCH 1/3] Bump dependency versions Signed-off-by: Moritz Hoffmann --- .github/workflows/deploy.yml | 2 +- .github/workflows/test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 7a6b87b18..b9c4638c7 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - - run: cargo install mdbook --version 0.4.27 + - run: cargo install mdbook --version 0.4.36 - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9a1a6be95..b5431e2f2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - - run: rustup update 1.64 --no-self-update && rustup default 1.64 + - run: rustup update 1.65 --no-self-update && rustup default 1.65 - run: cargo build - name: test mdBook # rustdoc doesn't build dependencies, so it needs to run after `cargo build`, From 20d335d5d0908df3ea620d9e8d62fb8d73846f7e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 1 Mar 2024 10:44:10 -0500 Subject: [PATCH 2/3] Introduce GATs for containers Containers are generic over their contents, and over reading and draining. Signed-off-by: Moritz Hoffmann --- container/src/columnation.rs | 74 +++++---- container/src/lib.rs | 156 ++++++++++++++---- mdbook/src/chapter_5/chapter_5_3.md | 82 +++------ timely/src/dataflow/channels/pact.rs | 33 ++-- .../src/dataflow/channels/pushers/buffer.rs | 20 ++- .../src/dataflow/channels/pushers/exchange.rs | 23 ++- .../dataflow/operators/{ => core}/exchange.rs | 14 +- .../dataflow/operators/{ => core}/inspect.rs | 39 ++--- timely/src/dataflow/operators/core/mod.rs | 10 ++ .../dataflow/operators/{ => core}/reclock.rs | 4 +- timely/src/dataflow/operators/mod.rs | 8 +- 11 files changed, 276 insertions(+), 187 deletions(-) rename timely/src/dataflow/operators/{ => core}/exchange.rs (78%) rename timely/src/dataflow/operators/{ => core}/inspect.rs (79%) create mode 100644 timely/src/dataflow/operators/core/mod.rs rename timely/src/dataflow/operators/{ => core}/reclock.rs (94%) diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 54a00ca4e..83d986d62 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -124,6 +124,24 @@ impl TimelyStack { }); (length, capacity) } + + /// The length in items. + #[inline] + pub fn len(&self) -> usize { + self.local.len() + } + + /// The capacity of the local vector. + #[inline] + pub fn capacity(&self) -> usize { + self.local.capacity() + } + + /// Reserve space for `additional` elements. + #[inline] + pub fn reserve(&mut self, additional: usize) { + self.local.reserve(additional) + } } impl TimelyStack<(A, B)> { @@ -199,7 +217,7 @@ impl Eq for TimelyStack {} impl std::fmt::Debug for TimelyStack { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - (&self[..]).fmt(f) + self[..].fmt(f) } } @@ -291,12 +309,14 @@ mod serde { } mod container { - use crate::{Container, PushPartitioned}; + use std::ops::Deref; + use crate::{Container, PushContainer}; use crate::columnation::{Columnation, TimelyStack}; impl Container for TimelyStack { - type Item = T; + type ItemRef<'a> = &'a T where Self: 'a; + type Item<'a> = &'a T where Self: 'a; fn len(&self) -> usize { self.local.len() @@ -306,38 +326,34 @@ mod container { self.local.is_empty() } - fn capacity(&self) -> usize { - self.local.capacity() - } - fn clear(&mut self) { TimelyStack::clear(self) } + + type Iter<'a> = std::slice::Iter<'a, T>; + + fn iter(&self) -> Self::Iter<'_> { + self.deref().iter() + } + + type DrainIter<'a> = std::slice::Iter<'a, T>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + (*self).iter() + } } - impl PushPartitioned for TimelyStack { - fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) - where - I: FnMut(&Self::Item) -> usize, - F: FnMut(usize, &mut Self), - { - fn ensure_capacity(this: &mut TimelyStack) { - let capacity = this.local.capacity(); - let desired_capacity = crate::buffer::default_capacity::(); - if capacity < desired_capacity { - this.local.reserve(desired_capacity - capacity); - } - } + impl PushContainer for TimelyStack { + fn capacity(&self) -> usize { + self.capacity() + } - for datum in &self[..] { - let index = index(&datum); - ensure_capacity(&mut buffers[index]); - buffers[index].copy(datum); - if buffers[index].len() == buffers[index].local.capacity() { - flush(index, &mut buffers[index]); - } - } - self.clear(); + fn preferred_capacity() -> usize { + crate::buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional) } } } diff --git a/container/src/lib.rs b/container/src/lib.rs index eb9b3ce77..6a56295f9 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -17,14 +17,18 @@ pub mod columnation; /// is efficient (which is not necessarily the case when deriving `Clone`.) /// TODO: Don't require `Container: Clone` pub trait Container: Default + Clone + 'static { - /// The type of elements this container holds. - type Item; + /// The type of elements when reading non-destructively from the container. + type ItemRef<'a> where Self: 'a; + + /// The type of elements when draining the continer. + type Item<'a> where Self: 'a; /// The number of elements in this container /// /// The length of a container must be consistent between sending and receiving it. /// When exchanging a container and partitioning it into pieces, the sum of the length - /// of all pieces must be equal to the length of the original container. + /// of all pieces must be equal to the length of the original container. When combining + /// containers, the length of the result must be the sum of the individual parts. fn len(&self) -> usize; /// Determine if the container contains any elements, corresponding to `len() == 0`. @@ -32,16 +36,55 @@ pub trait Container: Default + Clone + 'static { self.len() == 0 } - /// The capacity of the underlying container - fn capacity(&self) -> usize; - /// Remove all contents from `self` while retaining allocated memory. /// After calling `clear`, `is_empty` must return `true` and `len` 0. fn clear(&mut self); + + /// Iterator type when reading from the container. + type Iter<'a>: Iterator>; + + /// Returns an iterator that reads the contents of this container. + fn iter(&self) -> Self::Iter<'_>; + + /// Iterator type when draining the container. + type DrainIter<'a>: Iterator>; + + /// Returns an iterator that drains the contents of this container. + /// Drain leaves the container in an undefined state. + fn drain(&mut self) -> Self::DrainIter<'_>; +} + +/// A type that can push itself into a container. +pub trait PushInto { + /// Push self into the target container. + fn push_into(self, target: &mut C); +} + +/// A type that has the necessary infrastructure to push elements, without specifying how pushing +/// itself works. For this, pushable types should implement [`PushInto`]. +// TODO: Reconsider this interface because it assumes +// * Containers have a capacity +// * Push presents single elements. +// * Instead of testing `len == cap`, we could have a `is_full` to test that we might +// not be able to absorb more data. +// * Example: A FlatStack with optimized offsets and deduplication can absorb many elements without reallocation. What does capacity mean in this context? +pub trait PushContainer: Container { + /// Push `item` into self + #[inline] + fn push>(&mut self, item: T) { + item.push_into(self) + } + /// Return the capacity of the container. + fn capacity(&self) -> usize; + /// Return the preferred capacity of the container. + fn preferred_capacity() -> usize; + /// Reserve space for `additional` elements, possibly increasing the capacity of the container. + fn reserve(&mut self, additional: usize); } impl Container for Vec { - type Item = T; + type ItemRef<'a> = &'a T where T: 'a; + type Item<'a> = T where T: 'a; fn len(&self) -> usize { Vec::len(self) @@ -51,20 +94,58 @@ impl Container for Vec { Vec::is_empty(self) } + fn clear(&mut self) { Vec::clear(self) } + + type Iter<'a> = std::slice::Iter<'a, T>; + + fn iter(&self) -> Self::Iter<'_> { + self.as_slice().iter() + } + + type DrainIter<'a> = std::vec::Drain<'a, T>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.drain(..) + } +} + +impl PushContainer for Vec { fn capacity(&self) -> usize { - Vec::capacity(self) + self.capacity() } - fn clear(&mut self) { Vec::clear(self) } + fn preferred_capacity() -> usize { + buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } +} + +impl PushInto> for T { + #[inline] + fn push_into(self, target: &mut Vec) { + target.push(self) + } +} + +impl<'a, T: Clone> PushInto> for &'a T { + #[inline] + fn push_into(self, target: &mut Vec) { + target.push(self.clone()) + } } mod rc { + use std::ops::Deref; use std::rc::Rc; use crate::Container; impl Container for Rc { - type Item = T::Item; + type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; + type Item<'a> = T::ItemRef<'a> where Self: 'a; fn len(&self) -> usize { std::ops::Deref::deref(self).len() @@ -74,10 +155,6 @@ mod rc { std::ops::Deref::deref(self).is_empty() } - fn capacity(&self) -> usize { - std::ops::Deref::deref(self).capacity() - } - fn clear(&mut self) { // Try to reuse the allocation if possible if let Some(inner) = Rc::get_mut(self) { @@ -86,16 +163,30 @@ mod rc { *self = Self::default(); } } + + type Iter<'a> = T::Iter<'a>; + + fn iter(&self) -> Self::Iter<'_> { + self.deref().iter() + } + + type DrainIter<'a> = T::Iter<'a>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.iter() + } } } mod arc { + use std::ops::Deref; use std::sync::Arc; use crate::Container; impl Container for Arc { - type Item = T::Item; + type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; + type Item<'a> = T::ItemRef<'a> where Self: 'a; fn len(&self) -> usize { std::ops::Deref::deref(self).len() @@ -105,10 +196,6 @@ mod arc { std::ops::Deref::deref(self).is_empty() } - fn capacity(&self) -> usize { - std::ops::Deref::deref(self).capacity() - } - fn clear(&mut self) { // Try to reuse the allocation if possible if let Some(inner) = Arc::get_mut(self) { @@ -117,43 +204,56 @@ mod arc { *self = Self::default(); } } + + type Iter<'a> = T::Iter<'a>; + + fn iter(&self) -> Self::Iter<'_> { + self.deref().iter() + } + + type DrainIter<'a> = T::Iter<'a>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.iter() + } } } /// A container that can partition itself into pieces. -pub trait PushPartitioned: Container { +pub trait PushPartitioned: PushContainer { /// Partition and push this container. /// /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to /// append an element to. Call `flush` with an index and a buffer to send the data downstream. fn push_partitioned(&mut self, buffers: &mut [Self], index: I, flush: F) where - I: FnMut(&Self::Item) -> usize, + for<'a> I: FnMut(&Self::Item<'a>) -> usize, F: FnMut(usize, &mut Self); } -impl PushPartitioned for Vec { +impl PushPartitioned for T where for<'a> T::Item<'a>: PushInto { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where - I: FnMut(&Self::Item) -> usize, + for<'a> I: FnMut(&Self::Item<'a>) -> usize, F: FnMut(usize, &mut Self), { - fn ensure_capacity(this: &mut Vec) { + let ensure_capacity = |this: &mut Self| { let capacity = this.capacity(); - let desired_capacity = buffer::default_capacity::(); + let desired_capacity = Self::preferred_capacity(); if capacity < desired_capacity { this.reserve(desired_capacity - capacity); } - } + }; - for datum in self.drain(..) { + for datum in self.drain() { let index = index(&datum); ensure_capacity(&mut buffers[index]); buffers[index].push(datum); - if buffers[index].len() == buffers[index].capacity() { + if buffers[index].len() >= buffers[index].capacity() { flush(index, &mut buffers[index]); } } + self.clear(); } } diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md index 5cd38763e..fceba4b47 100644 --- a/mdbook/src/chapter_5/chapter_5_3.md +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -1,76 +1,34 @@ # Containers -Timely's core isn't tied to a specific representation of data that flows along dataflow edges. -While some convenience operators make assumptions about the type of batches, the core infrastructure is generic in what containers are exchanged. -This section explains what containers are and what their contract with the Timely APIs is. +On a high level, Timely sends along dataflow edges. Underneath however, it needs to send and process batches of data to amortize processing overhead. In this section, we walk through the container abstraction, which is how Timely interfaces with batches of data. -Many parts of Timely assume that data is organized into `Vec`, i.e., batches store data as consecutive elements in memory. -This abstractions works well for many cases but precludes some advanced techniques, such as transferring translated or columnar data between operators. -With the container abstraction, Timely specifies a minimal interface it requires tracking progress and provide data to operators. +A container represents a batch of data with an interface to enable progress tracking and extracting data. Progress tracking requires that containers have a length, which does not change independent of container modifications. Reading data out of a container is useful for several high-level operators and sessions, but not otherwise required. -## Core operators - -In Timely, we provide a set of `Core` operators that are generic on the container type they can handle. -In most cases, the `Core` operators are a immediate generalization of their non-core variant, providing the semantically equivalent functionality. - -## Limitations - -A challenge when genericizing Timely operators is that all interfaces need to be declared independent of a concrete type, for example as part of a trait. -For this reason, Timely doesn't currently support operators that require knowledge of the elements of a container or how to partition a container, with the only exception being the `Vec` type. +Progress tracking requires that the length of a container stays the same between sending and receiving a container. The length must be additive, meaning that two containers combined must result in a container that has the sum of the lengths of the original containers. (Vectors satisfy the properties, but sets do not.) -## A custom container +The container trait defines two associated types, `Item` for moving data, and `ItemRef` for reading data. Both are lifetimed, meaning an implelemntation can select to return owned data or references, depending on the constraints imposed by its data representation. Related, the trait defines an `iter()` function that reads the contents, and a `drain()` that removes the contents. -Let's walk through an example container that resembles a `Result` type, but moves the storage to within the result. +In timely, we decouple write- and read-halves of a containers, as expressed by the `PushInto` trait. It describes how a target can absorb a datum as self, without requiring that the container knows about self. This allows a container to absorb an owned type and merely present dereferenced variants of the owned type when reading. For example, a container could accept `String` objects, but only permit reading `&str`. -```rust -extern crate timely_container; +## Capacity considerations -use timely_container::Container; +Timely wants to send containers of consistent size. In the past, this was a vector of 1024 elements, but this concept scales poorly to data that has a different layout. -#[derive(Clone)] -enum ResultContainer { - Ok(Vec), - Err(E), -} +What we want to achieve is: +* Timely makes progress when processing large amounts of data. An implementation that buffers all data and only releases it once time advances does not have this property. +* Containers have consistent size to minimize stress for the memory allocator. -impl Default for ResultContainer { - fn default() -> Self { - Self::Ok(Default::default()) - } -} - -impl Container for ResultContainer { - type Item = Result; - - fn len(&self) -> usize { - match self { - ResultContainer::Ok(data) => data.len(), - ResultContainer::Err(_) => 1, - } - } +## Core operators - fn is_empty(&self) -> bool { - match self { - ResultContainer::Ok(data) => data.is_empty(), - ResultContainer::Err(_) => false, - } - } +In Timely, we provide a set of `core` operators that are generic on the container type they can handle. +In most cases, the `core` operators are an immediate generalization of their non-core variant, providing the semantically equivalent functionality. - fn capacity(&self) -> usize { - match self { - ResultContainer::Ok(data) => data.capacity(), - ResultContainer::Err(_) => 1, - } - } +## Limitations - fn clear(&mut self) { - match self { - ResultContainer::Ok(data) => data.clear(), - ResultContainer::Err(_) => {}, - } - } -} -``` +* Explain why it's hard to build container-generic operators from smaller operators (unless we have higher-kinded types). -The type can either store a vector of data, or a single error. -Its length is the length of the vector, or 1 if it represents an error. +Each operator over arbitrary containers either returns an equal container, or needs to be parameterized to indicate the desired output type. +This is problematic when composing a high-level operator from smaller building blocks, such as the broadcast operator. +The broadcast operator for vectors maps each datum to a `(target, datum)` pair, where it repeats `datum` for each target. +Subsequent operators exchange the data and unwrap the `datum` from the tuple. +Defining this with specific containers in mind is simple, however, we do not have a mechanism to express this for arbitrary containers. diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19..19648b285 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -8,18 +8,16 @@ //! The progress tracking logic assumes that this number is independent of the pact used. use std::{fmt::{self, Debug}, marker::PhantomData}; -use timely_container::PushPartitioned; -use crate::communication::{Push, Pull, Data}; -use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::Container; - -use crate::worker::AsWorker; +use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; +use crate::communication::{Push, Pull, Data}; +use crate::container::PushPartitioned; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use super::{BundleCore, Message}; - +use crate::dataflow::channels::{BundleCore, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; +use crate::worker::AsWorker; /// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContractCore { @@ -53,14 +51,18 @@ impl ParallelizationContractCore for Pipeline { } /// An exchange between multiple observers by data -pub struct ExchangeCore { hash_func: F, phantom: PhantomData<(C, D)> } +pub struct ExchangeCore { hash_func: F, phantom: PhantomData } /// [ExchangeCore] specialized to vector-based containers. -pub type Exchange = ExchangeCore, D, F>; +pub type Exchange = ExchangeCore, F>; -implu64+'static> ExchangeCore { +impl ExchangeCore +where + C: PushPartitioned, + for<'a> F: FnMut(&C::Item<'a>)->u64 +{ /// Allocates a new `Exchange` pact from a distribution function. - pub fn new(func: F) -> ExchangeCore { + pub fn new(func: F) -> ExchangeCore { ExchangeCore { hash_func: func, phantom: PhantomData, @@ -69,11 +71,12 @@ implu64+'static> ExchangeCore { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContractCore for ExchangeCore +impl ParallelizationContractCore for ExchangeCore where - C: Data + Container + PushPartitioned, + C: Data + PushPartitioned, + for<'a> H: FnMut(&C::Item<'a>) -> u64 { - type Pusher = ExchangePusher>>>, F>; + type Pusher = ExchangePusher>>>, H>; type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { @@ -83,7 +86,7 @@ where } } -impl Debug for ExchangeCore { +impl Debug for ExchangeCore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Exchange").finish() } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index d18a0f84d..5a118e8ae 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -1,10 +1,11 @@ //! Buffering and session mechanisms to provide the appearance of record-at-a-time sending, //! with the performance of batched sends. -use crate::dataflow::channels::{Bundle, BundleCore, Message}; -use crate::progress::Timestamp; -use crate::dataflow::operators::Capability; use crate::communication::Push; +use crate::container::{PushContainer, PushInto}; +use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::operators::Capability; +use crate::progress::Timestamp; use crate::{Container, Data}; /// Buffers data sent at the same time, for efficient communication. @@ -81,21 +82,22 @@ impl>> BufferCore where T: Eq } } -impl>> Buffer where T: Eq+Clone { +impl>> BufferCore where T: Eq+Clone { // internal method for use by `Session`. #[inline] - fn give(&mut self, data: D) { - if self.buffer.capacity() < crate::container::buffer::default_capacity::() { - let to_reserve = crate::container::buffer::default_capacity::() - self.buffer.capacity(); + fn give>(&mut self, data: D) { + if self.buffer.capacity() < C::preferred_capacity() { + let to_reserve = C::preferred_capacity() - self.buffer.capacity(); self.buffer.reserve(to_reserve); } self.buffer.push(data); - // assert!(self.buffer.capacity() == Message::::default_length()); - if self.buffer.len() == self.buffer.capacity() { + if self.buffer.len() >= C::preferred_capacity() { self.flush(); } } +} +impl>>> Buffer where T: Eq+Clone { // Gives an entire message at a specific time. fn give_vec(&mut self, vector: &mut Vec) { // flush to ensure fifo-ness diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9ea271d31..e1b49cbed 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,23 +1,28 @@ //! The exchange pattern distributes pushed data between many target pushees. -use timely_container::PushPartitioned; -use crate::{Container, Data}; use crate::communication::Push; +use crate::container::PushPartitioned; use crate::dataflow::channels::{BundleCore, Message}; +use crate::{Container, Data}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&D) -> u64> { +pub struct Exchange>, H> +where + for<'a> H: FnMut(&C::Item<'a>) -> u64 +{ pushers: Vec

, buffers: Vec, current: Option, hash_func: H, - phantom: std::marker::PhantomData, } -impl>, H: FnMut(&D) -> u64> Exchange { +impl>, H> Exchange +where + for<'a> H: FnMut(&C::Item<'a>) -> u64 +{ /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> Exchange { + pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { buffers.push(Default::default()); @@ -27,7 +32,6 @@ impl>, H: FnMut(&D) -> hash_func: key, buffers, current: None, - phantom: std::marker::PhantomData, } } #[inline] @@ -40,9 +44,10 @@ impl>, H: FnMut(&D) -> } } -impl>, H: FnMut(&D) -> u64> Push> for Exchange +impl>, H, > Push> for Exchange where - C: PushPartitioned + C: PushPartitioned, + for<'a> H: FnMut(&C::Item<'a>) -> u64 { #[inline(never)] fn push(&mut self, message: &mut Option>) { diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs similarity index 78% rename from timely/src/dataflow/operators/exchange.rs rename to timely/src/dataflow/operators/core/exchange.rs index 1f603df25..499e44165 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -7,7 +7,7 @@ use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::{Scope, StreamCore}; /// Exchange records between workers. -pub trait Exchange { +pub trait Exchange { /// Exchange records between workers. /// /// The closure supplied should map a reference to a record to a `u64`, @@ -23,15 +23,19 @@ pub trait Exchange { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn exchange(&self, route: impl FnMut(&D) -> u64 + 'static) -> Self; + fn exchange(&self, route: F) -> Self + where + for<'a> F: FnMut(&C::Item<'a>) -> u64; } -impl Exchange for StreamCore +impl Exchange for StreamCore where C: PushPartitioned + ExchangeData, - C::Item: ExchangeData, { - fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> StreamCore { + fn exchange(&self, route: F) -> StreamCore + where + for<'a> F: FnMut(&C::Item<'a>) -> u64, + { let mut container = Default::default(); self.unary(ExchangeCore::new(route), "Exchange", |_, _| { move |input, output| { diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs similarity index 79% rename from timely/src/dataflow/operators/inspect.rs rename to timely/src/dataflow/operators/core/inspect.rs index 6e26856d1..d26ff1446 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -1,9 +1,6 @@ //! Extension trait and implementation for observing and action on streamed data. -use std::rc::Rc; -use timely_container::columnation::{Columnation, TimelyStack}; use crate::Container; -use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; @@ -21,7 +18,10 @@ pub trait Inspect: InspectCore + Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self { + fn inspect(&self, mut func: F) -> Self + where + for<'a> F: FnMut(C::ItemRef<'a>) + { self.inspect_batch(move |_, data| { for datum in data.iter() { func(datum); } }) @@ -38,10 +38,13 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x)); /// }); /// ``` - fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self { + fn inspect_time(&self, mut func: F) -> Self + where + for <'a> F: FnMut(&G::Timestamp, C::ItemRef<'a>), + { self.inspect_batch(move |time, data| { for datum in data.iter() { - func(&time, &datum); + func(&time, datum); } }) } @@ -57,7 +60,7 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len())); /// }); /// ``` - fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self { + fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self { self.inspect_core(move |event| { if let Ok((time, data)) = event { func(time, data); @@ -84,26 +87,12 @@ pub trait Inspect: InspectCore + Sized { /// }); /// }); /// ``` - fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static; -} - -impl Inspect> for StreamCore> { - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) - } -} - -impl Inspect> for StreamCore> { - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) - } + fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl Inspect> for StreamCore> - where C: AsRef<[C::Item]> -{ - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref())))) +impl Inspect for StreamCore { + fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { + self.inspect_container(move |r| func(r)) } } diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs new file mode 100644 index 000000000..c70c5d945 --- /dev/null +++ b/timely/src/dataflow/operators/core/mod.rs @@ -0,0 +1,10 @@ +//! Extension traits for `Stream` implementing various operators that +//! are independent of specific container types. + +pub mod exchange; +pub mod inspect; +pub mod reclock; + +pub use exchange::Exchange; +pub use inspect::{Inspect, InspectCore}; +pub use reclock::Reclock; diff --git a/timely/src/dataflow/operators/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs similarity index 94% rename from timely/src/dataflow/operators/reclock.rs rename to timely/src/dataflow/operators/core/reclock.rs index b656e0aaf..9510e8722 100644 --- a/timely/src/dataflow/operators/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock>(&self, clock: &StreamCore) -> Self; + fn reclock(&self, clock: &StreamCore) -> Self; } impl Reclock for StreamCore { - fn reclock>(&self, clock: &StreamCore) -> StreamCore { + fn reclock(&self, clock: &StreamCore) -> StreamCore { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 1eec15cba..5650f6e97 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -33,6 +33,8 @@ pub use self::generic::{Notificator, FrontierNotificator}; pub use self::reclock::Reclock; pub use self::count::Accumulate; +pub mod core; + pub mod enterleave; pub mod input; pub mod flow_controlled; @@ -41,10 +43,10 @@ pub mod feedback; pub mod concat; pub mod partition; pub mod map; -pub mod inspect; +pub use self::core::inspect; pub mod filter; pub mod delay; -pub mod exchange; +pub use self::core::exchange; pub mod broadcast; pub mod probe; pub mod to_stream; @@ -57,7 +59,7 @@ pub mod result; pub mod aggregation; pub mod generic; -pub mod reclock; +pub use self::core::reclock; pub mod count; // keep "mint" module-private From 6e1a918bedafd3ecb5d38210204447362e057042 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 19 Mar 2024 14:15:24 -0400 Subject: [PATCH 3/3] Session::give for containers Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pushers/buffer.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 5a118e8ae..e241587df 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -125,19 +125,26 @@ impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> wh } } -impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T, C, P: Push>+'a> Session<'a, T, C, P> +where + T: Eq+Clone+'a, + C: 'a + PushContainer, +{ /// Provides one record at the time specified by the `Session`. #[inline] - pub fn give(&mut self, data: D) { + pub fn give>(&mut self, data: D) { self.buffer.give(data); } /// Provides an iterator of records at the time specified by the `Session`. #[inline] - pub fn give_iterator>(&mut self, iter: I) { + pub fn give_iterator, D: PushInto>(&mut self, iter: I) { for item in iter { self.give(item); } } +} + +impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Provides a fully formed `Content` message for senders which can use this type. /// /// The `Content` type is the backing memory for communication in timely, and it can