Skip to content

Commit

Permalink
DNM: Revisiting ownership of key and value types (#429)
Browse files Browse the repository at this point in the history
* WIP checkin

* Add support for ReduceCore

* Support values as ToOwned also

* Correct merge skew

* Relax Arrange trait to input types

* Frame join and reduce as free functions

* Restore layers and self-arrangement
  • Loading branch information
frankmcsherry authored Nov 27, 2023
1 parent 32a312f commit df03c4e
Show file tree
Hide file tree
Showing 13 changed files with 877 additions and 663 deletions.
17 changes: 17 additions & 0 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k,&(),&()| Option::<()>::None)
.probe_with(&mut probe);
},
_ => {
println!("unreconized mode: {:?}", mode)
}
Expand Down
76 changes: 26 additions & 50 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,45 +434,43 @@ where
}
}

/// A type that can be arranged into a trace of type `T`.
/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`.
///
/// This trait is implemented for appropriately typed collections and all traces that might accommodate them,
/// as well as by arranged data for their corresponding trace type.
pub trait Arrange<G: Scope, K, V, R: Semigroup>
/// This trait is primarily implemented by `Collection<G,(K,V),R>`.
///
/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained.
/// This allows e.g. for `Vec<u8>` inputs to present as `&[u8]` when read, but that relationship is not
/// constrained by this trait.
pub trait Arrange<G, K, V, R>
where
G: Scope,
G::Timestamp: Lattice,
K: Data,
V: Data,
{
/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
/// Arranges a stream of `(Key, Val)` updates by `Key`.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times marked completed in the output stream, and probing this stream
/// is the correct way to determine that times in the shared trace are committed.
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData+Hashable,
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}

/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
/// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times marked completed in the output stream, and probing this stream
/// is the correct way to determine that times in the shared trace are committed.
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData+Hashable,
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Expand All @@ -481,15 +479,18 @@ where
self.arrange_core(exchange, name)
}

/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
/// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times marked completed in the output stream, and probing this stream
/// is the correct way to determine that times in the shared trace are committed.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
K: Clone,
V: Clone,
R: Clone,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Expand All @@ -499,40 +500,15 @@ where
impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
K: Data,
V: Data,
G::Timestamp: Lattice,
K: Clone + 'static,
V: Clone + 'static,
R: Semigroup,
{
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}

fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Expand Down Expand Up @@ -697,7 +673,7 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Expand Down
Loading

0 comments on commit df03c4e

Please sign in to comment.