Skip to content

Commit f2b2657

Browse files
committed
cardinality example test
1 parent 76e501a commit f2b2657

3 files changed

+54
-27
lines changed

hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_dot.snap

+17-13
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,43 @@ digraph {
88
n1v1 [label="(n1v1) union()", shape=invhouse, fillcolor="#88aaff"]
99
n2v1 [label="(n2v1) map(|v| (v, ()))", shape=invhouse, fillcolor="#88aaff"]
1010
n3v1 [label="(n3v1) source_iter(vec![0])", shape=invhouse, fillcolor="#88aaff"]
11-
n4v1 [label="(n4v1) join()", shape=invhouse, fillcolor="#88aaff"]
12-
n5v1 [label="(n5v1) map(|(_src, ((), dst))| dst)", shape=invhouse, fillcolor="#88aaff"]
13-
n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"]
14-
n7v1 [label="(n7v1) source_stream(pairs_recv)", shape=invhouse, fillcolor="#88aaff"]
15-
n8v1 [label="(n8v1) for_each(|x| println!(\"Reached: {}\", x))", shape=house, fillcolor="#ffff88"]
16-
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
11+
n4v1 [label="(n4v1) join::<'static, 'static>()", shape=invhouse, fillcolor="#88aaff"]
12+
n5v1 [label="(n5v1) inspect(|_| { my_card_inner.set(1 + my_card_inner.get()) })", shape=invhouse, fillcolor="#88aaff"]
13+
n6v1 [label="(n6v1) map(|(_src, ((), dst))| dst)", shape=invhouse, fillcolor="#88aaff"]
14+
n7v1 [label="(n7v1) tee()", shape=house, fillcolor="#ffff88"]
15+
n8v1 [label="(n8v1) source_stream(pairs_recv)", shape=invhouse, fillcolor="#88aaff"]
16+
n9v1 [label="(n9v1) for_each(|x| println!(\"Reached: {}\", x))", shape=house, fillcolor="#ffff88"]
17+
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
1718
n1v1 -> n2v1
1819
n3v1 -> n1v1 [label="0"]
20+
n6v1 -> n7v1
1921
n5v1 -> n6v1
2022
n4v1 -> n5v1
2123
n2v1 -> n4v1 [label="0"]
22-
n7v1 -> n4v1 [label="1"]
23-
n6v1 -> n9v1 [label="0"]
24-
n6v1 -> n8v1 [label="1"]
25-
n9v1 -> n1v1 [label="1"]
24+
n8v1 -> n4v1 [label="1"]
25+
n7v1 -> n10v1 [label="0"]
26+
n7v1 -> n9v1 [label="1"]
27+
n10v1 -> n1v1 [label="1"]
2628
subgraph "cluster n1v1" {
2729
fillcolor="#dddddd"
2830
style=filled
2931
label = "sg_1v1\nstratum 0"
30-
n9v1
32+
n10v1
3133
n3v1
3234
n1v1
3335
n2v1
34-
n7v1
36+
n8v1
3537
n4v1
3638
n5v1
3739
n6v1
38-
n8v1
40+
n7v1
41+
n9v1
3942
subgraph "cluster_sg_1v1_var_my_join_tee" {
4043
label="var my_join_tee"
4144
n4v1
4245
n5v1
4346
n6v1
47+
n7v1
4448
}
4549
subgraph "cluster_sg_1v1_var_reached_vertices" {
4650
label="var reached_vertices"

hydroflow/tests/snapshots/surface_codegen__surface_syntax_reachability_generated@graphvis_mermaid.snap

+17-13
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,39 @@ linkStyle default stroke:#aaa
1111
1v1[\"(1v1) <code>union()</code>"/]:::pullClass
1212
2v1[\"(2v1) <code>map(|v| (v, ()))</code>"/]:::pullClass
1313
3v1[\"(3v1) <code>source_iter(vec![0])</code>"/]:::pullClass
14-
4v1[\"(4v1) <code>join()</code>"/]:::pullClass
15-
5v1[\"(5v1) <code>map(|(_src, ((), dst))| dst)</code>"/]:::pullClass
16-
6v1[/"(6v1) <code>tee()</code>"\]:::pushClass
17-
7v1[\"(7v1) <code>source_stream(pairs_recv)</code>"/]:::pullClass
18-
8v1[/"(8v1) <code>for_each(|x| println!(&quot;Reached: {}&quot;, x))</code>"\]:::pushClass
19-
9v1["(9v1) <code>handoff</code>"]:::otherClass
14+
4v1[\"(4v1) <code>join::&lt;'static, 'static&gt;()</code>"/]:::pullClass
15+
5v1[\"(5v1) <code>inspect(|_| { my_card_inner.set(1 + my_card_inner.get()) })</code>"/]:::pullClass
16+
6v1[\"(6v1) <code>map(|(_src, ((), dst))| dst)</code>"/]:::pullClass
17+
7v1[/"(7v1) <code>tee()</code>"\]:::pushClass
18+
8v1[\"(8v1) <code>source_stream(pairs_recv)</code>"/]:::pullClass
19+
9v1[/"(9v1) <code>for_each(|x| println!(&quot;Reached: {}&quot;, x))</code>"\]:::pushClass
20+
10v1["(10v1) <code>handoff</code>"]:::otherClass
2021
1v1-->2v1
2122
3v1-->|0|1v1
23+
6v1-->7v1
2224
5v1-->6v1
2325
4v1-->5v1
2426
2v1-->|0|4v1
25-
7v1-->|1|4v1
26-
6v1-->|0|9v1
27-
6v1-->|1|8v1
28-
9v1-->|1|1v1
27+
8v1-->|1|4v1
28+
7v1-->|0|10v1
29+
7v1-->|1|9v1
30+
10v1-->|1|1v1
2931
subgraph sg_1v1 ["sg_1v1 stratum 0"]
30-
9v1
32+
10v1
3133
3v1
3234
1v1
3335
2v1
34-
7v1
36+
8v1
3537
4v1
3638
5v1
3739
6v1
38-
8v1
40+
7v1
41+
9v1
3942
subgraph sg_1v1_var_my_join_tee ["var <tt>my_join_tee</tt>"]
4043
4v1
4144
5v1
4245
6v1
46+
7v1
4347
end
4448
subgraph sg_1v1_var_reached_vertices ["var <tt>reached_vertices</tt>"]
4549
1v1

hydroflow/tests/surface_codegen.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -574,11 +574,18 @@ pub fn test_surface_syntax_reachability_generated() {
574574
// An edge in the input data = a pair of `usize` vertex IDs.
575575
let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();
576576

577+
let my_card = std::rc::Rc::new(std::cell::Cell::new(0));
578+
let my_card_inner = std::rc::Rc::clone(&my_card);
579+
577580
let mut df: Hydroflow = hydroflow_syntax! {
578581
reached_vertices = union() -> map(|v| (v, ()));
579582
source_iter(vec![0]) -> [0]reached_vertices;
580583

581-
my_join_tee = join() -> map(|(_src, ((), dst))| dst) -> tee();
584+
my_join_tee = join::<'static, 'static>()
585+
-> inspect(|_| {
586+
my_card_inner.set(1 + my_card_inner.get())
587+
})
588+
-> map(|(_src, ((), dst))| dst) -> tee();
582589
reached_vertices -> [0]my_join_tee;
583590
source_stream(pairs_recv) -> [1]my_join_tee;
584591

@@ -588,22 +595,34 @@ pub fn test_surface_syntax_reachability_generated() {
588595
assert_graphvis_snapshots!(df);
589596
df.run_available();
590597

598+
println!("{} {} {}", line!(), df.current_tick(), my_card.get());
599+
591600
pairs_send.send((0, 1)).unwrap();
592601
df.run_available();
593602

603+
println!("{} {} {}", line!(), df.current_tick(), my_card.get());
604+
594605
pairs_send.send((2, 4)).unwrap();
595606
pairs_send.send((3, 4)).unwrap();
596607
df.run_available();
597608

609+
println!("{} {} {}", line!(), df.current_tick(), my_card.get());
610+
598611
pairs_send.send((1, 2)).unwrap();
599612
df.run_available();
600613

614+
println!("{} {} {}", line!(), df.current_tick(), my_card.get());
615+
601616
pairs_send.send((0, 3)).unwrap();
602617
df.run_available();
603618

619+
println!("{} {} {}", line!(), df.current_tick(), my_card.get());
620+
604621
pairs_send.send((0, 3)).unwrap();
605622
df.run_available();
606623

624+
println!("{} {} {}", line!(), df.current_tick(), my_card.get());
625+
607626
// Reached: 1
608627
// Reached: 2
609628
// Reached: 4

0 commit comments

Comments
 (0)