Skip to content

Commit

Permalink
Add Relation::projected
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Mar 28, 2019
1 parent aa93fad commit 1fb9a4e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
29 changes: 29 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,13 @@ where
/// A collection containing all tuples.
fn tuples(self) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize>;

/// A collection containing all tuples projected onto the
/// specified variables.
fn projected(
self,
target_variables: &[Var],
) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize>;

/// A collection with tuples partitioned by `variables`.
///
/// Variables present in `variables` are collected in order and populate a first "key"
Expand Down Expand Up @@ -592,6 +599,28 @@ where
self.tuples
}

fn projected(
self,
target_variables: &[Var],
) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize> {
if self.variables() == target_variables {
self.tuples
} else {
let relation_variables = self.variables();
let target_variables = target_variables.to_vec();

self.tuples().map(move |tuple| {
target_variables
.iter()
.map(|x| {
let idx = relation_variables.binds(*x).unwrap();
tuple[idx].clone()
})
.collect()
})
}
}

/// Separates tuple fields by those in `variables` and those not.
///
/// Each tuple is mapped to a pair `(Vec<Value>, Vec<Value>)`
Expand Down
7 changes: 1 addition & 6 deletions src/plan/antijoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ impl<P1: Implementable, P2: Implementable> Implementable for Antijoin<P1, P2> {
let tuples = left
.tuples_by_variables(&self.variables)
.distinct()
.antijoin(
&right
.tuples_by_variables(&self.variables)
.map(|(key, _)| key)
.distinct(),
)
.antijoin(&right.projected(&self.variables).distinct())
.map(|(key, tuple)| key.iter().cloned().chain(tuple.iter().cloned()).collect());

let shutdown_handle = ShutdownHandle::merge(shutdown_left, shutdown_right);
Expand Down
5 changes: 1 addition & 4 deletions src/plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ impl<P: Implementable> Implementable for Project<P> {
S: Scope<Timestamp = T>,
{
let (relation, shutdown_handle) = self.plan.implement(nested, local_arrangements, context);
let tuples = relation
.tuples_by_variables(&self.variables)
.map(|(key, _tuple)| key);

let projected = CollectionRelation {
variables: self.variables.to_vec(),
tuples,
tuples: relation.projected(&self.variables),
};

(projected, shutdown_handle)
Expand Down
11 changes: 2 additions & 9 deletions src/plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use timely::progress::Timestamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Threshold;

use crate::binding::Binding;
use crate::binding::{AsBinding, Binding};
use crate::plan::{Dependencies, ImplContext, Implementable};
use crate::{CollectionRelation, Relation, ShutdownHandle, Var, VariableMap};

Expand Down Expand Up @@ -63,14 +63,7 @@ impl<P: Implementable> Implementable for Union<P> {

shutdown_handle.merge_with(shutdown);

if relation.variables == self.variables {
relation.tuples().inner
} else {
relation
.tuples_by_variables(&self.variables)
.map(|(key, _vals)| key)
.inner
}
relation.projected(&self.variables).inner
});

let concat = nested.concatenate(streams).as_collection();
Expand Down

0 comments on commit 1fb9a4e

Please sign in to comment.