Skip to content

Commit

Permalink
Update labelprop experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Mar 28, 2019
1 parent 1fb9a4e commit 37a5f18
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 32 deletions.
2 changes: 1 addition & 1 deletion experiments/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ graph_map = "0.1"

[dependencies.declarative-dataflow]
path = ".."
features = ["csv-source"]
features = ["csv-source", "real-time"]
2 changes: 1 addition & 1 deletion experiments/src/bin/hector_wco.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use graph_map::GraphMMap;

use declarative_dataflow::server::Server;
use declarative_dataflow::{q, InputSemantics, Binding, Rule, TxData, Value};
use declarative_dataflow::{q, Binding, InputSemantics, Rule, TxData, Value};
use Value::Eid;

fn main() {
Expand Down
54 changes: 24 additions & 30 deletions experiments/src/bin/labelprop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc;

use std::sync::mpsc::channel;
use std::time::Instant;
use std::time::{Duration, Instant};

use timely::Configuration;

use differential_dataflow::operators::{Consolidate, Count};

use declarative_dataflow::plan::{Aggregate, AggregationFn, Join, Union};
use declarative_dataflow::server::{Register, RegisterSource, Server};
use declarative_dataflow::server::{Register, Server};
use declarative_dataflow::sources::{CsvFile, Source};
use declarative_dataflow::{Plan, Rule, Value};
use Value::Eid;

fn main() {
timely::execute(Configuration::Thread, move |worker| {
let mut server = Server::<u64, u64>::new(Default::default());
let mut server = Server::<Duration, u64>::new(Default::default());
let (send_results, results) = channel();

let (x, y, z) = (0, 1, 2);
Expand Down Expand Up @@ -47,34 +47,28 @@ fn main() {
},
];

let edge_source = RegisterSource {
names: vec![":edge".to_string()],
source: Source::CsvFile(CsvFile {
has_headers: false,
delimiter: b' ',
path: "/Users/niko/data/labelprop/edges.httpd_df".to_string(),
eid_offset: 0,
timestamp_offset: None,
flexible: false,
comment: None,
schema: vec![(1, Eid(0))],
}),
};
let node_source = RegisterSource {
names: vec![":node".to_string()],
source: Source::CsvFile(CsvFile {
has_headers: false,
delimiter: b' ',
path: "/Users/niko/data/labelprop/nodes.httpd_df".to_string(),
eid_offset: 0,
timestamp_offset: None,
flexible: false,
comment: None,
schema: vec![(1, Eid(0))],
}),
};
let edge_source = Source::CsvFile(CsvFile {
has_headers: false,
delimiter: b' ',
path: "/Users/niko/data/labelprop/edges.httpd_df".to_string(),
eid_offset: 0,
timestamp_offset: None,
flexible: false,
comment: None,
schema: vec![(":edge".to_string(), (1, Eid(0)))],
});
let node_source = Source::CsvFile(CsvFile {
has_headers: false,
delimiter: b' ',
path: "/Users/niko/data/labelprop/nodes.httpd_df".to_string(),
eid_offset: 0,
timestamp_offset: None,
flexible: false,
comment: None,
schema: vec![(":node".to_string(), (1, Eid(0)))],
});

worker.dataflow::<u64, _, _>(|scope| {
worker.dataflow::<Duration, _, _>(|scope| {
server.register_source(edge_source, scope).unwrap();
server.register_source(node_source, scope).unwrap();

Expand Down

0 comments on commit 37a5f18

Please sign in to comment.