-
Notifications
You must be signed in to change notification settings - Fork 183
/
hello.rs
89 lines (69 loc) · 3.02 KB
/
hello.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use rand::{Rng, SeedableRng, StdRng};
use differential_dataflow::input::Input;
// use differential_dataflow::operators::Count;
use differential_dataflow::operators::count::CountTotal;
fn main() {
let mut args = std::env::args();
args.next();
let nodes: u32 = args.next().unwrap().parse().unwrap();
let edges: usize = args.next().unwrap().parse().unwrap();
let batch: u32 = args.next().unwrap().parse().unwrap();
let inspect: bool = args.next().unwrap() == "inspect";
// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args().skip(5), move |worker| {
let timer = ::std::time::Instant::now();
let index = worker.index();
let peers = worker.peers();
// create a degree counting differential dataflow
let (mut input, probe) = worker.dataflow::<u32,_,_>(|scope| {
// create edge input, count a few ways.
let (input, edges) = scope.new_collection::<_,i32>();
let out_degr_distr =
edges
.map(|(src, _dst)| src)
.count_total()
.map(|(_src, cnt)| cnt as usize)
.count_total();
// show us something about the collection, notice when done.
let probe =
out_degr_distr
.filter(move |_| inspect)
.inspect(|x| println!("observed: {:?}", x))
.probe();
(input, probe)
});
let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions
// Load up graph data. Round-robin among workers.
for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } {
input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1)
}
input.advance_to(1);
input.flush();
worker.step_while(|| probe.less_than(input.time()));
if index == 0 {
println!("round 0 finished after {:?} (loading)", timer.elapsed());
}
if batch > 0 {
// Just have worker zero drive input production.
if index == 0 {
let mut next = batch;
for round in 1 .. {
input.advance_to(round);
input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1);
input.update((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)), -1);
if round > next {
let timer = ::std::time::Instant::now();
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("round {} finished after {:?}", next, timer.elapsed());
next += batch;
}
}
}
}
}).unwrap();
}