Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove use of timely_sort crate #313

Merged
merged 2 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ serde = "1.0"
serde_derive = "1.0"
abomonation = "0.7"
abomonation_derive = "0.5"
timely_sort="0.1.6"
#timely = { version = "0.11", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }
Expand Down
1 change: 0 additions & 1 deletion dogsdogsdogs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ license = "MIT"
abomonation = "0.7"
abomonation_derive = "0.5"
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
timely_sort = "0.1.6"
differential-dataflow = { path = "../", default-features = false }
serde = "1"
serde_derive = "1"
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
use timely::progress::Antichain;

use timely_sort::Unsigned;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::lattice::Lattice;
Expand Down Expand Up @@ -58,7 +56,7 @@ where
let mut key: Tr::Key = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().as_u64()
key.hashed().into()
});

let mut key1: Tr::Key = supplied_key1;
Expand Down
143 changes: 1 addition & 142 deletions src/hashable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
//! distributed integers can perhaps do something simpler (like report their own value).

use std::hash::Hasher;
use std::ops::Deref;

use abomonation::Abomonation;

use timely_sort::Unsigned;

/// Types with a `hashed` method, producing an unsigned output of some type.
///
Expand All @@ -28,7 +23,7 @@ use timely_sort::Unsigned;
/// can take advantage of the smaller size.
pub trait Hashable {
/// The type of the output value.
type Output: Unsigned+Copy;
type Output: Into<u64>+Copy;
/// A well-distributed integer derived from the data.
fn hashed(&self) -> Self::Output;
}
Expand All @@ -41,139 +36,3 @@ impl<T: ::std::hash::Hash> Hashable for T {
h.finish()
}
}

/// A marker trait for types whose `Ord` implementation orders first by `hashed()`.
///
/// Types implementing this trait *must* implement `Ord` and satisfy the property that two values
/// with different hashes have the same order as their hashes. This trait allows implementations
/// that sort by hash value to rely on the `Ord` implementation of the type.
pub trait HashOrdered : Ord+Hashable { }
impl<T: Ord+Hashable> HashOrdered for OrdWrapper<T> { }
impl<T: Ord+Hashable> HashOrdered for HashableWrapper<T> { }
impl<T: Unsigned+Copy> HashOrdered for UnsignedWrapper<T> { }

// It would be great to use the macros for these, but I couldn't figure out how to get it
// to work with constraints (i.e. `Hashable`) on the generic parameters.
impl<T: Ord+Hashable+Abomonation> Abomonation for OrdWrapper<T> {
#[inline] unsafe fn entomb<W: ::std::io::Write>(&self, write: &mut W) -> ::std::io::Result<()> {
self.item.entomb(write)
}
#[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
let temp = bytes;
bytes = self.item.exhume(temp)?;
Some(bytes)
}
}

// It would be great to use the macros for these, but I couldn't figure out how to get it
// to work with constraints (i.e. `Hashable`) on the generic parameters.
impl<T: Hashable+Abomonation> Abomonation for HashableWrapper<T> {

#[inline] unsafe fn entomb<W: ::std::io::Write>(&self, write: &mut W) -> ::std::io::Result<()> {
self.item.entomb(write)
}
#[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
let temp = bytes;
bytes = self.item.exhume(temp)?;
Some(bytes)
}
}

impl<T: Unsigned+Copy+Hashable+Abomonation> Abomonation for UnsignedWrapper<T> {

#[inline] unsafe fn entomb<W: ::std::io::Write>(&self, write: &mut W) -> ::std::io::Result<()> {
self.item.entomb(write)
}
#[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
let temp = bytes;
bytes = self.item.exhume(temp)?;
Some(bytes)
}
}


/// A wrapper around hashable types that ensures an implementation of `Ord` that compares
/// hash values first.
#[derive(Clone, Eq, PartialEq, Debug, Default)]
pub struct OrdWrapper<T: Ord+Hashable> {
/// The item, so you can grab it.
pub item: T
}

impl<T: Ord+Hashable> PartialOrd for OrdWrapper<T> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
(self.item.hashed(), &self.item).partial_cmp(&(other.item.hashed(), &other.item))
}
}
impl<T: Ord+Hashable> Ord for OrdWrapper<T> {
#[inline]
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
(self.item.hashed(), &self.item).cmp(&(other.item.hashed(), &other.item))
}
}

impl<T: Ord+Hashable> Hashable for OrdWrapper<T> {
type Output = T::Output;
fn hashed(&self) -> T::Output { self.item.hashed() }
}

impl<T: Ord+Hashable> Deref for OrdWrapper<T> {
type Target = T;
fn deref(&self) -> &T { &self.item }
}


/// Wrapper to stash hash value with the actual value.
#[derive(Clone, Default, Ord, PartialOrd, Eq, PartialEq, Debug, Copy)]
pub struct HashableWrapper<T: Hashable> {
hash: T::Output,
/// The item, for reference.
pub item: T,
}

impl<T: Hashable> Hashable for HashableWrapper<T> {
type Output = T::Output;
#[inline]
fn hashed(&self) -> T::Output { self.hash }
}

impl<T: Hashable> Deref for HashableWrapper<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T { &self.item }
}

impl<T: Hashable> From<T> for HashableWrapper<T> {
#[inline]
fn from(item: T) -> HashableWrapper<T> {
HashableWrapper {
hash: item.hashed(),
item,
}
}
}

/// A wrapper around an unsigned integer, providing `hashed` as the value itself.
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Default, Debug, Copy)]
pub struct UnsignedWrapper<T: Unsigned+Copy> {
/// The item.
pub item: T,
}

impl<T: Unsigned+Copy> Hashable for UnsignedWrapper<T> {
type Output = T;
#[inline]
fn hashed(&self) -> Self::Output { self.item }
}

impl<T: Unsigned+Copy> Deref for UnsignedWrapper<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T { &self.item }
}

impl<T: Unsigned+Copy> From<T> for UnsignedWrapper<T> {
#[inline]
fn from(item: T) -> Self { UnsignedWrapper { item } }
}
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ impl<T: timely::ExchangeData + Ord + Debug> ExchangeData for T { }

extern crate fnv;
extern crate timely;
extern crate timely_sort;

#[macro_use]
extern crate abomonation_derive;
Expand Down
3 changes: 0 additions & 3 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ where
/// use differential_dataflow::operators::reduce::Reduce;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
/// use differential_dataflow::hashable::OrdWrapper;
///
/// fn main() {
/// ::timely::execute(Config::thread(), |worker| {
Expand Down Expand Up @@ -247,7 +246,6 @@ where
/// use differential_dataflow::operators::reduce::Reduce;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
/// use differential_dataflow::hashable::OrdWrapper;
///
/// fn main() {
/// ::timely::execute(Config::thread(), |worker| {
Expand Down Expand Up @@ -361,7 +359,6 @@ where
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::TraceReader;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
/// use differential_dataflow::hashable::OrdWrapper;
/// use differential_dataflow::input::Input;
///
/// fn main() {
Expand Down
6 changes: 2 additions & 4 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use timely::progress::Timestamp;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::Capability;

use timely_sort::Unsigned;

use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
Expand Down Expand Up @@ -274,7 +272,7 @@ where
Tr: 'static,
{
// while the arrangement is already correctly distributed, the query stream may not be.
let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().as_u64());
let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().into());
queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {

let mut trace = Some(self.trace.clone());
Expand Down Expand Up @@ -484,7 +482,7 @@ where
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().as_u64());
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

Expand Down
4 changes: 1 addition & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ use timely::progress::Timestamp;
use timely::progress::Antichain;
use timely::dataflow::operators::Capability;

use timely_sort::Unsigned;

use ::{ExchangeData, Hashable};
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, Cursor};
Expand Down Expand Up @@ -157,7 +155,7 @@ where

let reader = &mut reader;

let exchange = Exchange::new(move |update: &(Tr::Key,Option<Tr::Val>,G::Timestamp)| (update.0).hashed().as_u64());
let exchange = Exchange::new(move |update: &(Tr::Key,Option<Tr::Val>,G::Timestamp)| (update.0).hashed().into());

stream.unary_frontier(exchange, name, move |_capability, info| {

Expand Down
1 change: 0 additions & 1 deletion src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
/// use differential_dataflow::operators::join::JoinCore;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
/// use differential_dataflow::hashable::OrdWrapper;
///
/// fn main() {
/// ::timely::example(|scope| {
Expand Down
1 change: 0 additions & 1 deletion src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
/// use differential_dataflow::operators::reduce::ReduceCore;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
/// use differential_dataflow::hashable::OrdWrapper;
///
/// fn main() {
/// ::timely::example(|scope| {
Expand Down
24 changes: 11 additions & 13 deletions tests/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,25 @@ use std::rc::Rc;
use timely::dataflow::operators::generic::OperatorInfo;
use timely::progress::{Antichain, frontier::AntichainRef};

use differential_dataflow::hashable::UnsignedWrapper;

use differential_dataflow::trace::implementations::ord::OrdValBatch;
use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher};
use differential_dataflow::trace::cursor::CursorDebug;
use differential_dataflow::trace::implementations::spine_fueled::Spine;

pub type OrdValSpine<K, V, T, R> = Spine<K, V, T, R, Rc<OrdValBatch<K, V, T, R>>>;

type IntegerTrace = OrdValSpine<UnsignedWrapper<u64>, u64, usize, i64>;
type IntegerTrace = OrdValSpine<u64, u64, usize, i64>;

fn get_trace() -> Spine<UnsignedWrapper<u64>, u64, usize, i64, Rc<OrdValBatch<UnsignedWrapper<u64>, u64, usize, i64>>> {
fn get_trace() -> Spine<u64, u64, usize, i64, Rc<OrdValBatch<u64, u64, usize, i64>>> {
let op_info = OperatorInfo::new(0, 0, &[]);
let mut trace = IntegerTrace::new(op_info, None, None);
{
let mut batcher = <<IntegerTrace as TraceReader>::Batch as Batch<UnsignedWrapper<u64>, u64, usize, i64>>::Batcher::new();
let mut batcher = <<IntegerTrace as TraceReader>::Batch as Batch<u64, u64, usize, i64>>::Batcher::new();

batcher.push_batch(&mut vec![
((1.into(), 2), 0, 1),
((2.into(), 3), 1, 1),
((2.into(), 3), 2, -1),
((1, 2), 0, 1),
((2, 3), 1, 1),
((2, 3), 2, -1),
]);

let batch_ts = &[1, 2, 3];
Expand All @@ -44,21 +42,21 @@ fn test_trace() {

let (mut cursor1, storage1) = trace.cursor_through(AntichainRef::new(&[1])).unwrap();
let vec_1 = cursor1.to_vec(&storage1);
assert_eq!(vec_1, vec![((1.into(), 2), vec![(0, 1)])]);
assert_eq!(vec_1, vec![((1, 2), vec![(0, 1)])]);

let (mut cursor2, storage2) = trace.cursor_through(AntichainRef::new(&[2])).unwrap();
let vec_2 = cursor2.to_vec(&storage2);
println!("--> {:?}", vec_2);
assert_eq!(vec_2, vec![
((1.into(), 2), vec![(0, 1)]),
((2.into(), 3), vec![(1, 1)]),
((1, 2), vec![(0, 1)]),
((2, 3), vec![(1, 1)]),
]);

let (mut cursor3, storage3) = trace.cursor_through(AntichainRef::new(&[3])).unwrap();
let vec_3 = cursor3.to_vec(&storage3);
assert_eq!(vec_3, vec![
((1.into(), 2), vec![(0, 1)]),
((2.into(), 3), vec![(1, 1), (2, -1)]),
((1, 2), vec![(0, 1)]),
((2, 3), vec![(1, 1), (2, -1)]),
]);

let (mut cursor4, storage4) = trace.cursor();
Expand Down