Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible bug in delta queries #183

Closed
ryzhyk opened this issue Jun 24, 2019 · 4 comments · Fixed by #534
Closed

Possible bug in delta queries #183

ryzhyk opened this issue Jun 24, 2019 · 4 comments · Fixed by #534

Comments

@ryzhyk
Copy link
Contributor

ryzhyk commented Jun 24, 2019

Below (in a separate comment) is a modified delta_query.rs that

  • uses a fixed number of workers (32)
  • does not read the graph from files, but instead inserts six hard-coded edges, so that the resulting graph should have two triangles (<1,2,3> and <3,4,1>). All inserts are done from thread 0.
  • IMPORTANT (this is what triggers the bug): after each insert, propagates changes through the pipeline:
    input.insert(v.clone());
    input.advance_to(i+1);
    input.flush();
    while probe.less_than(&(i+1)) {
         worker.step();
    }
  • workers other than 0 simply step until reaching the last timestamp:
    input.advance_to(6);
    input.flush();
    while probe.less_than(&6) {
        worker.step();
    }

when I run the modified example cargo run --example delta_query, the program randomly computes 0, 1, or 2 triangles:

# 0 triangles
$cargo run --example delta_query
    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
     Running `/home/lryzhyk/projects/differential-dataflow/dogsdogsdogs/target/debug/examples/delta_query
worker0 at timestamp 1
worker0 at timestamp 2
worker0 at timestamp 3
worker0 at timestamp 4
worker0 at timestamp 5
worker0 at timestamp 6

# 1 triangle
$ cargo run --example delta_query
    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
     Running `/home/lryzhyk/projects/differential-dataflow/dogsdogsdogs/target/debug/examples/delta_query`
worker0 at timestamp 1
worker0 at timestamp 2
worker0 at timestamp 3
worker0 at timestamp 4
worker0 at timestamp 5
	Triangle: ((3, 4, 1), 5, 1)
worker0 at timestamp 6

If instead of looping on worker.step() after each insert I first insert all values (while advancing input timestamp) and then do

while probe.less_than(&6) {
      worker.step();
}

then it computes two triangles as expected.

Am I using the API incorrectly or is this behavior indeed problematic?

Thanks!

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Jun 24, 2019

Here is the complete modified delta_query.rs

extern crate timely;
extern crate graph_map;
extern crate differential_dataflow;

extern crate dogsdogsdogs;

use timely::dataflow::Scope;
use timely::dataflow::operators::probe::Handle;
use timely::communication::initialize::Configuration;
use differential_dataflow::input::Input;
use graph_map::GraphMMap;

use dogsdogsdogs::altneu::AltNeu;

fn main() {

    // snag a filename to use for the input graph.
    //let filename = std::env::args().nth(1).unwrap();
    //let batching = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
    //let inspect = std::env::args().any(|x| x == "inspect");

    timely::execute(Configuration::Process(32), move |worker| {

        //let timer = std::time::Instant::now();
        //let graph = GraphMMap::new(&filename);

        //let peers = worker.peers();
        //let index = worker.index();

        let mut probe = Handle::new();

        let mut input = worker.dataflow::<usize,_,_>(|scope| {

            let (edges_input, edges) = scope.new_collection();

            // Graph oriented both ways, indexed by key.
            use differential_dataflow::operators::arrange::ArrangeByKey;
            let forward_key = edges.arrange_by_key();
            let reverse_key = edges.map(|(x,y)| (y,x))
                                   .arrange_by_key();

            // Graph oriented both ways, indexed by (key, val).
            use differential_dataflow::operators::arrange::ArrangeBySelf;
            let forward_self = edges.arrange_by_self();
            let reverse_self = edges.map(|(x,y)| (y,x))
                                    .arrange_by_self();

            // // Graph oriented both ways, counts of distinct vals for each key.
            // // Not required without worst-case-optimal join strategy.
            // let forward_count = edges.map(|(x,y)| x).arrange_by_self();
            // let reverse_count = edges.map(|(x,y)| y).arrange_by_self();

            // Q(a,b,c) :=  E1(a,b),  E2(b,c),  E3(a,c)
            let triangles = scope.scoped::<AltNeu<usize>,_,_>("DeltaQuery (Triangles)", |inner| {

                // Grab the stream of changes.
                let changes = edges.enter(inner);

                // Each relation we'll need.
                let forward_key_alt = forward_key.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
                let reverse_key_alt = reverse_key.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
                let forward_key_neu = forward_key.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));
                // let reverse_key_neu = reverse_key.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));

                // let forward_self_alt = forward_self.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
                let reverse_self_alt = reverse_self.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
                let forward_self_neu = forward_self.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));
                let reverse_self_neu = reverse_self.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));

                // For each relation, we form a delta query driven by changes to that relation.
                //
                // The sequence of joined relations are such that we only introduce relations
                // which share some bound attributes with the current stream of deltas.
                // Each joined relation is delayed { alt -> neu } if its position in the
                // sequence is greater than the delta stream.
                // Each joined relation is directed { forward, reverse } by whether the
                // bound variable occurs in the first or second position.

                let key1 = |x: &(u32, u32)| x.0;
                let key2 = |x: &(u32, u32)| x.1;

                use dogsdogsdogs::operators::propose;
                use dogsdogsdogs::operators::validate;

                //   dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
                let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
                let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
                let changes1 = changes1.map(|((a,b),c)| (a,b,c));

                //   dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
                let changes2 = propose(&changes, reverse_key_alt, key1.clone());
                let changes2 = validate(&changes2, reverse_self_neu, key2.clone());
                let changes2 = changes2.map(|((b,c),a)| (a,b,c));

                //   dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
                let changes3 = propose(&changes, forward_key_alt, key1.clone());
                let changes3 = validate(&changes3, reverse_self_alt, key2.clone());
                let changes3 = changes3.map(|((a,c),b)| (a,b,c));

                changes1.concat(&changes2).concat(&changes3).leave()
            });

            triangles
                //.filter(move |_| inspect)
                .inspect(|x| println!("\tTriangle: {:?}", x))
                .probe_with(&mut probe);

            edges_input
        });


        if worker.index() == 0 {
            for (i,v) in vec![(1,2), (2,3), (1,3), (3,4), (4,1), (3,1)].iter().enumerate() {
                input.insert(v.clone());
                input.advance_to(i+1);
                input.flush();
                while probe.less_than(&(i+1)) {
                    worker.step();
                }
                println!("worker0 at timestamp {}", i+1);
            }
        } else {
            input.advance_to(6);
            input.flush();
            while probe.less_than(&6) {
                worker.step();
            }
        }

        /* Doing this instead of spinning in `worker.step()` above
         * yields correct behavior */
        /* while probe.less_than(&6) {
              worker.step();
        }*/

/*        let mut index = index;
        while index < graph.nodes() {
            input.advance_to(index);
            for &edge in graph.edges(index) {
                input.insert((index as u32, edge));
            }
            index += peers;
            input.advance_to(index);
            input.flush();
            if (index / peers) % batching == 0 {
                while probe.less_than(input.time()) {
                    worker.step();
                }
                println!("{:?}\tRound {} complete", timer.elapsed(), index);
            }
        }
*/
    }).unwrap();
}

@frankmcsherry
Copy link
Member

I will investigate, thanks! :D

@frankmcsherry
Copy link
Member

Non-deterministic output is almost certainly a bug. I accept responsibility!

@frankmcsherry
Copy link
Member

I'm pretty sure #184 fixes this. At least, it fixes what was certainly a bug and seems to cause your example to consistently produce correct output.

Thanks very much for the easily reproduced error, and the bug report (so embarrassing!).

I apologize that the stuff in this repo is still very DIY, and I'll try to bring a bit more helpful structure as I start using it more myself (we're running TPCH plans now, and will want to start thinking about these plans soon).

This was referenced Oct 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants