From 37a5f18e15f8bace793467b88e3a668b76d1a7a0 Mon Sep 17 00:00:00 2001 From: niko Date: Thu, 28 Mar 2019 23:53:07 +0100 Subject: [PATCH] Update labelprop experiment --- experiments/Cargo.toml | 2 +- experiments/src/bin/hector_wco.rs | 2 +- experiments/src/bin/labelprop.rs | 54 ++++++++++++++----------------- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/experiments/Cargo.toml b/experiments/Cargo.toml index c42a708..9976beb 100644 --- a/experiments/Cargo.toml +++ b/experiments/Cargo.toml @@ -12,4 +12,4 @@ graph_map = "0.1" [dependencies.declarative-dataflow] path = ".." -features = ["csv-source"] \ No newline at end of file +features = ["csv-source", "real-time"] \ No newline at end of file diff --git a/experiments/src/bin/hector_wco.rs b/experiments/src/bin/hector_wco.rs index 7d3d109..e39d0be 100644 --- a/experiments/src/bin/hector_wco.rs +++ b/experiments/src/bin/hector_wco.rs @@ -1,7 +1,7 @@ use graph_map::GraphMMap; use declarative_dataflow::server::Server; -use declarative_dataflow::{q, InputSemantics, Binding, Rule, TxData, Value}; +use declarative_dataflow::{q, Binding, InputSemantics, Rule, TxData, Value}; use Value::Eid; fn main() { diff --git a/experiments/src/bin/labelprop.rs b/experiments/src/bin/labelprop.rs index d9b7dc6..4af11ad 100644 --- a/experiments/src/bin/labelprop.rs +++ b/experiments/src/bin/labelprop.rs @@ -2,21 +2,21 @@ static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc; use std::sync::mpsc::channel; -use std::time::Instant; +use std::time::{Duration, Instant}; use timely::Configuration; use differential_dataflow::operators::{Consolidate, Count}; use declarative_dataflow::plan::{Aggregate, AggregationFn, Join, Union}; -use declarative_dataflow::server::{Register, RegisterSource, Server}; +use declarative_dataflow::server::{Register, Server}; use declarative_dataflow::sources::{CsvFile, Source}; use declarative_dataflow::{Plan, Rule, Value}; use Value::Eid; fn main() { timely::execute(Configuration::Thread, move |worker| { - let mut server = Server::::new(Default::default()); + let mut server = Server::::new(Default::default()); let (send_results, results) = channel(); let (x, y, z) = (0, 1, 2); @@ -47,34 +47,28 @@ fn main() { }, ]; - let edge_source = RegisterSource { - names: vec![":edge".to_string()], - source: Source::CsvFile(CsvFile { - has_headers: false, - delimiter: b' ', - path: "/Users/niko/data/labelprop/edges.httpd_df".to_string(), - eid_offset: 0, - timestamp_offset: None, - flexible: false, - comment: None, - schema: vec![(1, Eid(0))], - }), - }; - let node_source = RegisterSource { - names: vec![":node".to_string()], - source: Source::CsvFile(CsvFile { - has_headers: false, - delimiter: b' ', - path: "/Users/niko/data/labelprop/nodes.httpd_df".to_string(), - eid_offset: 0, - timestamp_offset: None, - flexible: false, - comment: None, - schema: vec![(1, Eid(0))], - }), - }; + let edge_source = Source::CsvFile(CsvFile { + has_headers: false, + delimiter: b' ', + path: "/Users/niko/data/labelprop/edges.httpd_df".to_string(), + eid_offset: 0, + timestamp_offset: None, + flexible: false, + comment: None, + schema: vec![(":edge".to_string(), (1, Eid(0)))], + }); + let node_source = Source::CsvFile(CsvFile { + has_headers: false, + delimiter: b' ', + path: "/Users/niko/data/labelprop/nodes.httpd_df".to_string(), + eid_offset: 0, + timestamp_offset: None, + flexible: false, + comment: None, + schema: vec![(":node".to_string(), (1, Eid(0)))], + }); - worker.dataflow::(|scope| { + worker.dataflow::(|scope| { server.register_source(edge_source, scope).unwrap(); server.register_source(node_source, scope).unwrap();