-
Notifications
You must be signed in to change notification settings - Fork 184
/
pagerank.rs
134 lines (110 loc) · 4.16 KB
/
pagerank.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use timely::order::Product;
use timely::dataflow::{*, operators::Filter};
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::{*, iterate::Variable};
use differential_dataflow::input::InputSession;
use differential_dataflow::AsCollection;
use graph_map::GraphMMap;
type Node = u32;
type Edge = (Node, Node);
type Time = u32;
type Iter = u32;
type Diff = isize;
fn main() {
// snag a filename to use for the input graph.
let filename = std::env::args().nth(1).unwrap();
let iterations: Iter = std::env::args().nth(2).unwrap().parse().unwrap();
let inspect = std::env::args().nth(3) == Some("inspect".to_string());
timely::execute_from_args(std::env::args().skip(2), move |worker| {
let peers = worker.peers();
let index = worker.index();
let timer = worker.timer();
let mut input = InputSession::new();
let mut probe = ProbeHandle::new();
worker.dataflow::<Time,_,_>(|scope| {
let edges = input.to_collection(scope);
pagerank(iterations, &edges)
.filter(move |_| inspect)
.consolidate()
.inspect(|x| println!("{:?}", x))
.probe_with(&mut probe);
});
// // What you might do if you used GraphMMap:
let graph = GraphMMap::new(&filename);
let mut node = index;
while node < graph.nodes() {
for &edge in graph.edges(node) {
input.update((node as Node, edge as Node), 1);
}
node += peers;
}
input.advance_to(1);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("{:?}\tinitial compute complete", timer.elapsed());
for node in 1 .. graph.nodes() {
if node % peers == index {
if !graph.edges(node).is_empty() {
input.update((node as Node, graph.edges(node)[0] as Node), -1);
}
}
input.advance_to((node + 1) as Time);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("{:?}\tround {} complete", timer.elapsed(), node);
}
}).unwrap();
}
// Returns a weighted collection in which the weight of each node is proportional
// to its PageRank in the input graph `edges`.
fn pagerank<G>(iters: Iter, edges: &Collection<G, Edge, Diff>) -> Collection<G, Node, Diff>
where
G: Scope,
G::Timestamp: Lattice,
{
// initialize many surfers at each node.
let nodes =
edges.flat_map(|(x,y)| Some(x).into_iter().chain(Some(y)))
.distinct();
// snag out-degrees for each node.
let degrs = edges.map(|(src,_dst)| src)
.count();
edges.scope().iterative::<Iter,_,_>(|inner| {
// Bring various collections into the scope.
let edges = edges.enter(inner);
let nodes = nodes.enter(inner);
let degrs = degrs.enter(inner);
// Initial and reset numbers of surfers at each node.
let inits = nodes.explode(|node| Some((node, 6_000_000)));
let reset = nodes.explode(|node| Some((node, 1_000_000)));
// Define a recursive variable to track surfers.
// We start from `inits` and cycle only `iters`.
let ranks = Variable::new_from(inits, Product::new(Default::default(), 1));
// Match each surfer with the degree, scale numbers down.
let to_push =
degrs.semijoin(&ranks)
.threshold(|(_node, degr), rank| (5 * rank) / (6 * degr))
.map(|(node, _degr)| node);
// Propagate surfers along links, blend in reset surfers.
let mut pushed =
edges.semijoin(&to_push)
.map(|(_node, dest)| dest)
.concat(&reset)
.consolidate();
if iters > 0 {
pushed =
pushed
.inner
.filter(move |(_d,t,_r)| t.inner < iters)
.as_collection();
}
// Bind the recursive variable, return its limit.
ranks.set(&pushed);
pushed.leave()
})
}