From 1fb9a4edfba328e51059aca2137f0ceee7de3c89 Mon Sep 17 00:00:00 2001 From: niko Date: Thu, 28 Mar 2019 23:03:48 +0100 Subject: [PATCH] Add Relation::projected --- src/lib.rs | 29 +++++++++++++++++++++++++++++ src/plan/antijoin.rs | 7 +------ src/plan/project.rs | 5 +---- src/plan/union.rs | 11 ++--------- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8fbb669..0456184 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -535,6 +535,13 @@ where /// A collection containing all tuples. fn tuples(self) -> Collection, Vec, isize>; + /// A collection containing all tuples projected onto the + /// specified variables. + fn projected( + self, + target_variables: &[Var], + ) -> Collection, Vec, isize>; + /// A collection with tuples partitioned by `variables`. /// /// Variables present in `variables` are collected in order and populate a first "key" @@ -592,6 +599,28 @@ where self.tuples } + fn projected( + self, + target_variables: &[Var], + ) -> Collection, Vec, isize> { + if self.variables() == target_variables { + self.tuples + } else { + let relation_variables = self.variables(); + let target_variables = target_variables.to_vec(); + + self.tuples().map(move |tuple| { + target_variables + .iter() + .map(|x| { + let idx = relation_variables.binds(*x).unwrap(); + tuple[idx].clone() + }) + .collect() + }) + } + } + /// Separates tuple fields by those in `variables` and those not. /// /// Each tuple is mapped to a pair `(Vec, Vec)` diff --git a/src/plan/antijoin.rs b/src/plan/antijoin.rs index 41257e9..9f4cbbd 100644 --- a/src/plan/antijoin.rs +++ b/src/plan/antijoin.rs @@ -77,12 +77,7 @@ impl Implementable for Antijoin { let tuples = left .tuples_by_variables(&self.variables) .distinct() - .antijoin( - &right - .tuples_by_variables(&self.variables) - .map(|(key, _)| key) - .distinct(), - ) + .antijoin(&right.projected(&self.variables).distinct()) .map(|(key, tuple)| key.iter().cloned().chain(tuple.iter().cloned()).collect()); let shutdown_handle = ShutdownHandle::merge(shutdown_left, shutdown_right); diff --git a/src/plan/project.rs b/src/plan/project.rs index 637b29a..e8a3424 100644 --- a/src/plan/project.rs +++ b/src/plan/project.rs @@ -59,13 +59,10 @@ impl Implementable for Project

{ S: Scope, { let (relation, shutdown_handle) = self.plan.implement(nested, local_arrangements, context); - let tuples = relation - .tuples_by_variables(&self.variables) - .map(|(key, _tuple)| key); let projected = CollectionRelation { variables: self.variables.to_vec(), - tuples, + tuples: relation.projected(&self.variables), }; (projected, shutdown_handle) diff --git a/src/plan/union.rs b/src/plan/union.rs index 0a8055a..d867528 100644 --- a/src/plan/union.rs +++ b/src/plan/union.rs @@ -8,7 +8,7 @@ use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::Threshold; -use crate::binding::Binding; +use crate::binding::{AsBinding, Binding}; use crate::plan::{Dependencies, ImplContext, Implementable}; use crate::{CollectionRelation, Relation, ShutdownHandle, Var, VariableMap}; @@ -63,14 +63,7 @@ impl Implementable for Union

{ shutdown_handle.merge_with(shutdown); - if relation.variables == self.variables { - relation.tuples().inner - } else { - relation - .tuples_by_variables(&self.variables) - .map(|(key, _vals)| key) - .inner - } + relation.projected(&self.variables).inner }); let concat = nested.concatenate(streams).as_collection();