Skip to content

Commit

Permalink
Introduce trait constraints; simplify elsewhere (#445)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Dec 23, 2023
1 parent cd3914e commit 30ac63d
Show file tree
Hide file tree
Showing 27 changed files with 132 additions and 371 deletions.
5 changes: 2 additions & 3 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ pub fn count<G, Tr, R, F, P>(
index: usize,
) -> Collection<G, (P, usize, usize), R>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<ValOwned=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<ValOwned=(), Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
16 changes: 6 additions & 10 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,11 @@ pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope,
G::Timestamp: Lattice,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Diff: Semigroup,
Tr: TraceReader+Clone+'static,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
Expand Down Expand Up @@ -131,13 +129,11 @@ pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
mut output_func: S,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Diff: Semigroup,
Tr: TraceReader+Clone+'static,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
Expand Down
5 changes: 2 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
supplied_key2: Tr::KeyOwned,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
Expand Down
10 changes: 4 additions & 6 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ pub fn propose<G, Tr, F, P>(
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down Expand Up @@ -51,9 +50,8 @@ pub fn propose_distinct<G, Tr, F, P>(
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
5 changes: 2 additions & 3 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ pub fn validate<G, K, V, Tr, F, P>(
key_selector: F,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=(), Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=()>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand Down
8 changes: 4 additions & 4 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ fn main() {
fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::KeyOwned: Debug + Clone,
Tr::ValOwned: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
Tr::KeyOwned: Debug,
Tr::ValOwned: Debug,
Tr::Time: Debug,
Tr::Diff: Debug,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use crate::operators::arrange::Arranged;
/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs_arranged<G, N, Tr>(edges: &Arranged<G, Tr>, roots: &Collection<G, N>) -> Collection<G, (N, u32)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ pub fn bidijkstra_arranged<G, N, Tr>(
goals: &Collection<G, (N,N)>
) -> Collection<G, ((N,N), u32)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ use crate::operators::arrange::arrangement::Arranged;
/// of `logic should be a number in the interval [0,64],
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
17 changes: 1 addition & 16 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use timely::progress::Timestamp;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::CapabilitySet;

use crate::lattice::Lattice;
use crate::trace::{Trace, TraceReader, Batch, BatchReader};
use crate::trace::wrappers::rc::TraceBox;

Expand All @@ -29,7 +28,6 @@ use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
pub struct TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
trace: Rc<RefCell<TraceBox<Tr>>>,
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
Expand All @@ -44,7 +42,6 @@ where
impl<Tr> TraceReader for TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
Expand Down Expand Up @@ -85,11 +82,7 @@ where
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
}

impl<Tr> TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Timestamp+Lattice,
{
impl<Tr: TraceReader> TraceAgent<Tr> {
/// Creates a new agent from a trace reader.
pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
where
Expand Down Expand Up @@ -177,7 +170,6 @@ where
impl<Tr> TraceAgent<Tr>
where
Tr: TraceReader+'static,
Tr::Time: Lattice+Ord+Clone+'static,
{
/// Copies an existing collection into the supplied scope.
///
Expand Down Expand Up @@ -233,7 +225,6 @@ where
pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
self.import_named(scope, "ArrangedSource")
}
Expand All @@ -242,7 +233,6 @@ where
pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
// Drop ShutdownButton and return only the arrangement.
self.import_core(scope, name).0
Expand Down Expand Up @@ -300,7 +290,6 @@ where
pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
let trace = self.clone();

Expand Down Expand Up @@ -418,7 +407,6 @@ where
pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
// This frontier describes our only guarantee on the compaction frontier.
Expand All @@ -437,7 +425,6 @@ where
pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
let trace = self.clone();
Expand Down Expand Up @@ -541,7 +528,6 @@ impl<T> Drop for ShutdownDeadmans<T> {
impl<Tr> Clone for TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
fn clone(&self) -> Self {

Expand Down Expand Up @@ -571,7 +557,6 @@ where
impl<Tr> Drop for TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
fn drop(&mut self) {

Expand Down
Loading

0 comments on commit 30ac63d

Please sign in to comment.