Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrange cleanup #472

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 38 additions & 51 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,90 +489,70 @@ where
}
}

/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`.
///
/// This trait is primarily implemented by `Collection<G,(K,V),R>`.
///
/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained.
/// This allows e.g. for `Vec<u8>` inputs to present as `&[u8]` when read, but that relationship is not
/// constrained by this trait.
pub trait Arrange<G, K, V, R>
/// A type that can be arranged as if a collection of updates.
pub trait Arrange<G, C>
where
G: Scope,
G::Timestamp: Lattice,
{
/// Arranges a stream of `(Key, Val)` updates by `Key`.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// Arranges updates into a shared trace.
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=C>,
{
self.arrange_named("Arrange")
}

/// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// Arranges updates into a shared trace, with a supplied name.
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}
Tr::Batcher: Batcher<Input=C>,
;

/// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
/// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name.
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
K: Clone,
V: Clone,
R: Clone,
P: ParallelizationContract<G::Timestamp, C>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=C>,
;
}

impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
where
G: Scope,
G::Timestamp: Lattice,
K: Clone + 'static,
V: Clone + 'static,
R: Semigroup,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
{
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K, V), G::Timestamp, R)>>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>>,
{
arrange_core(&self.inner, pact, name)
}
}

/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
Expand All @@ -584,9 +564,7 @@ where
P: ParallelizationContract<G::Timestamp, <Tr::Batcher as Batcher>::Input>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Time = G::Timestamp>,
<Tr::Batcher as Batcher>::Input: timely::Container,
Tr::Builder: Builder<Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -737,17 +715,26 @@ where
Arranged { stream, trace: reader.unwrap() }
}

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, K, (), R> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
where
G::Timestamp: Lattice+Ord,
{
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K, ()), G::Timestamp, R)>>,
{
let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,()),G::Timestamp,R)>>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=Vec<((K,()),G::Timestamp,R)>>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down