Skip to content
Paul Caporn edited this page Jul 8, 2017 · 17 revisions

Timely dataflow is a framework for data-parallel computation.

In this document, we'll learn how to construct timely dataflow computations. By the end, you'll be able to easily write and run computations on your personal computer, and seamlessly scale them up to multiple threads, processes, and computers.

Hello dataflow

Let's start with a simple example computation.

This example shows off some of the moving parts of timely dataflow. After seeing them for the first time, we'll walk through each of the parts to see what is going on, and get a rough idea for how these computations look.

extern crate timely;

use timely::dataflow::*;
use timely::dataflow::operators::{Input, Inspect};

fn main() {

    // initializes and runs a timely dataflow computation
    timely::execute_from_args(std::env::args(), |root| {

        // create a new input and inspect its output
        let mut input = root.dataflow(|scope| {
            let (input, stream) = scope.new_input();
            stream.inspect(|x| println!("hello {}", x));
            input
        });

        // introduce data and watch!
        for round in 0..10 {
            input.send(round);
            input.advance_to(round + 1);
            root.step();
        }

        // seal the input
        input.close();

        // finish off any remaining work
        while root.step() { }

    });
}

Ok. That isn't so bad. There are definitely some things that need explanation, but at least it all fits on one screen. Let's talk through each of the parts.

Preamble

At the top of the example we see the following:

extern crate timely;

use timely::dataflow::*;
use timely::dataflow::operators::*;

The first line is Rust's way of saying that we have an external dependence on the timely crate, which is where all the code for timely dataflow exists. The other two lines explicitly import some types we will need in the program. I'll call them out as we get to them.

Main

A timely dataflow program is still just a Rust program, so it should have a main function somewhere in it. In our example, it doesn't really do anything other than make a call in to timely::execute_from_args:

fn main() {

    // initializes and runs a timely dataflow computation
    timely::execute_from_args(std::env::args(), |root| {

The call to timely::execute_from_args spins up the timely dataflow infrastructure. The method has two parameters: arguments for execution (the number of workers, where they live, etc) and what each worker should do once it is started up.

Worker

We now get in to writing the code each worker should execute. I like to think of this as just an extension of main, where I'm writing as if for a single-threaded computation and not stressing about the whole "deployed across a cluster of machines".

The worker logic is passed root as an argument. This argument wraps up some information about the worker and its environment, but for our purposes is the blank canvas on which we are going to define a dataflow computation.

Constructing a dataflow

The first thing we do is call scoped, which lets us define a group of dataflow operators in a new dataflow scope. In this group we place an input, inspect its output stream, and return a handle to the input back up.

        // create a new input and inspect its output
        let mut input = root.scoped(|scope| {
            let (input, stream) = scope.new_input();
            stream.inspect(|x| println!("hello {}", x));
            input
        });

Importantly, we haven't actually done any data processing yet. We've only described a dataflow computation in which data pushed in to input flow out through stream and in to the inspect logic.

Executing a dataflow

With a dataflow graph constructed, we can now start running it.

        // introduce data and watch!
        for round in 0..10 {
            input.send(round);
            input.advance_to(round + 1);
            root.step();
        }

Recall that input was our handle to the input stream. It takes data from outside the dataflow and introduces it in to the dataflow computation. Here we repeatedly use the send method to introduce records, the numbers 0 through 9.

After introducing each number we call advance_to. This is part of timely dataflow's advanced technology. This is a signal to the system that we are not going to produce any data with a timestamp less than or equal to round. This would be very useful if the dataflow we constructed needed to know when a group of records were complete, for example if we wanted to count the number of submitted records. We aren't doing that here, so it is a little hard to justify (we could have left it out, but advance_to is also what flushes internal buffers in input).

The call to root.step() tells timely dataflow to do some work. Internally, it will give each operator a chance to do some computation, which means sending records and applying operator logic. For example, this is where the println! in inspect will run. We could skip this step as well, but records will start to back up in the system. In a streaming system it is generally healthiest to keep records moving.

Completing a dataflow

The last thing we do is shut down the dataflow computation.

        // seal the input
        input.close();

        // finish off any remaining work
        while root.step() { }

    });
}

The call to input.close() indicates that we have no more data to supply through this input. It allows timely dataflow to reach the conclusion that we may be "done". To reach this conclusion, we may need to step the computation several more times (as records may still be in flight in the system). When step() returns false, it means that all inputs are closed, all messages are processed, and no operators have outstanding work.

Saying "hello"

To run the hello dataflow computation, we need a Rust project. The easiest way to do this is with Rust's cargo tool for package management. In some directory, type

% cargo new hello --bin
% cd hello

This directory contains a Cargo.toml file, which you will need to edit to look like the following (the first three lines should be there for you already):

% cat Cargo.toml
[package]
name = "hello"
version = "0.1.0"
authors = ["Your name <[email protected]>"]

[dependencies]
timely="*"

If you make sure to put the code we've walked through as the contents of src/main.rs, you should now be able to run your program:

% cargo run
Running `target/debug/hello`
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
% 

Welcome to timely dataflow!

Scaling and distributing

Your timely dataflow program can be run with multiple worker threads, and on multiple computers. Whenever you move data between operators, the system will take care of getting the data to the correct worker.

To run a program with multiple worker threads, use the -w or --workers option followed by a number of workers. This many threads will start up, and each will construct the dataflow graph you indicated.

To run a program with multiple processes, use the -n or --processes option followed by a number of processes. You will actually need to run the processes yourself, using the same -w and -n options for each, but providing each with a different -p or --process argument (from zero up to the number of workers minus one).

To run a program on multiple computers, you start multiple processes in the same way as above, but you must also use -h or --hostfile to indicate a text file whose lines are of the form

host:port

and which indicate the location each of the processes (in order) may be found. For example, to start up four processes with two workers each across three machines, you might plan a hosts.txt file like so:

% cat hosts.txt
host1:port1
host1:port2
host2:port3
host3:port4

and then execute the following on each of the machines:

host1% cargo run -- -w 2 -n 4 --hostfile hosts.txt -p 0
host1% cargo run -- -w 2 -n 4 --hostfile hosts.txt -p 1
host2% cargo run -- -w 2 -n 4 --hostfile hosts.txt -p 2
host3% cargo run -- -w 2 -n 4 --hostfile hosts.txt -p 3

This won't do anything especially interesting with the hello program we've written, because it doesn't exchange any data (each worker locally produces and consumes its own data). Let's make something more interesting!

Exchanging data between workers

Let's write a more interesting program that moves data between workers.

We will use essentially the same program as before, but we will introduce the exchange operator, which moves data between workers based on a function applied to the data: the function must produce an integer, and we use that integer to identify a worker (taking the modulus of the number if it exceeds the number of workers). Because we were exchanging integer records, we will just use the record itself to indicate the target worker.

extern crate timely;

use timely::dataflow::*;
use timely::dataflow::operators::*;

fn main() {

    // initializes and runs a timely dataflow computation
    timely::execute_from_args(std::env::args(), |root| {

        // create a new input and inspect its output
        let mut input = root.scoped(|scope| {
            let (input, stream) = scope.new_input();
            stream.exchange(|&x| x)
                  .inspect(|x| println!("hello {}", x));
            input
        });

        // introduce data and watch!
        for round in 0..10 {
            input.send(round);
            input.advance_to(round + 1);
            root.step();
        }

        // seal the input
        input.close();

        // finish off any remaining work
        while root.step() { }

    });
}

If we run this program with two processes on the same computer, we should see

% cargo run -- -n 2 -p 0
Running `target/debug/hello`
hello 0
hello 0
hello 2
hello 2
hello 4
hello 4
hello 6
hello 6
hello 8
hello 8
% 

and

% cargo run -- -n 2 -p 1
Running `target/debug/hello`
hello 1
hello 1
hello 3
hello 3
hello 5
hello 5
hello 7
hello 7
hello 9
hello 9
% 

Each worker introduces a copy of the data, and then after the shuffle we have all the even numbers at worker zero and the odd numbers at worker one.

More general operators

Although exchanging data can be great fun (you just had fun, right?), we would rather write higher-level operators that do the exchanging for us, and then do something interesting.

There is a fairly handy general operator unary, which can implement any unary operator (one input, one output) using logic that you provide. The unary operator comes in two flavors, unary_stream and unary_notify, distinguished by whether the your operator cares about about progress in the data stream or not.

unary_stream

The unary_stream operator is simpler, so let's start there. It takes three parameters:

  1. Instructions on whether and how its input should be exchanged.
  2. A descriptive name.
  3. Instructions for how to use its input to produce its output.

Let's just write an example

// this is actually the implementation of the `exchange` operator!
stream.unary_stream(Exchange::new(|&x| x), "Example", |input, output| {
    while let Some((time, data)) = input.next() {
        output.session(time).give_content(data);
    }
});

The first argument is a new instance of Exchange. This is a type that describes how data should be exchanged, and it just takes the same type of function we used in the exchange method earlier. The only other valid type is Pipeline, which says that we shouldn't bother exchanging data. There are many natural extensions that we might see in the future, like Broadcast, or RoundRobin, or others!

The second argument is the descriptive name. You can write whatever you like here.

The final argument is a Rust closure, where we write some code that is passed two parameters, input and output, and then executed. I have written the code that pulls each message (time and data) from the input, and sends them out the output. The only thing this operator does is exchange its input.

unary_notify

The unary_notify is like unary_stream, but it adds the ability to request and receive notifications from the timely dataflow system about when the operator will no longer receive messages with a in indicated timestamp.

The operator requires four parameters, the first two of which are the same as with unary_stream. We now need to provide any initial notification requests, a Vec<T>. Also, the operator logic is now specified as a function of three parameters, an input, an output, and a barrier object that supports notification.

  1. Instructions on whether and how its input should be exchanged.
  2. A descriptive name.
  3. A vector of initial notification requests.
  4. Instructions for how to use its input to produce its output, plus a barrier.

Let's write an example:

// behave like our `unary_stream` example ...
stream.unary_notify(Exchange::new(|&x| x), "Example", vec![], |input, output, barrier| {
    while let Some((time, data)) = input.next() {
        barrier.notify_at(time);
        output.session(time).give_content(data);
    }
    // ... except print out when each time completes.
    while let Some((time, count)) = barrier.next() {
        println!("time {:?} complete!", time);
    }
});