From eed1bec542829e8ecb1202eec14176834dcaa9f2 Mon Sep 17 00:00:00 2001 From: frankmcsherry Date: Mon, 9 Sep 2024 14:31:52 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20@=20TimelyDa?= =?UTF-8?q?taflow/timely-dataflow@f54796a83c9ff215a96c1ca1c264a850c9a3c0d5?= =?UTF-8?q?=20=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- chapter_4/chapter_4_5.html | 21 +++++---------------- print.html | 21 +++++---------------- searchindex.js | 2 +- searchindex.json | 2 +- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/chapter_4/chapter_4_5.html b/chapter_4/chapter_4_5.html index 73e8d49bc..bd575e954 100644 --- a/chapter_4/chapter_4_5.html +++ b/chapter_4/chapter_4_5.html @@ -182,23 +182,12 @@

The Data<
#[derive(Clone)]
 struct YourStruct { .. }

The ExchangeData trait

-

The ExchangeData trait is more complicated, and is established in the communication/ module. There are two options for this trait, which are determined by whether you use the --bincode feature at compilation, or not.

-
    -
  • -

    If you use --bincode then the trait is a synonym for

    +

    The ExchangeData trait is more complicated, and is established in the communication/ module. The trait is a synonym for

    Send+Sync+Any+serde::Serialize+for<'a>serde::Deserialize<'a>+'static

    where serde is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:

    #[derive(Serialize, Deserialize)]

    You must include the serde crate, and if not on Rust 2018 the serde_derive crate.

    -

    The downside to the --bincode flag is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.

    -
  • -
  • -

    If you do not use the --bincode feature, then the Serialize and Deserialize requirements are replaced by Abomonation, from the abomonation crate. This trait allows in-place deserialization, but is implemented for fewer types, and has the potential to be a bit scarier (due to in-place pointer correction).

    -

    Your types likely do not implement Abomonation by default, but you can similarly use

    -
    #[derive(Abomonation)]
    -

    You must include the abomonation and abomonation_derive crate for this to work correctly.

    -
  • -
+

The downside to is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.

An example

Let's imagine you would like to play around with a tree data structure as something you might send around in timely dataflow. I've written the following candidate example:

extern crate timely;
@@ -278,7 +267,7 @@ 

Exchanging da } }

We get a new error. A not especially helpful error. It says that it cannot find an exchange method, or more specifically that one exists but it doesn't apply to our type at hand. This is because the data need to satisfy the ExchangeData trait but do not. It would be better if this were clearer in the error messages, I agree.

-

We can fix the problem two ways. First, if you would like to use bincode, then we update the source like so:

+

The fix is to update the source like so:

#[macro_use]
 extern crate serde_derive;
 extern crate serde;
@@ -288,8 +277,8 @@ 

Exchanging da data: D, children: Vec<TreeNode<D>>, }

-

and make sure to include the serde_derive and serde crates. Now when we run things (notice the --features flag) we see:

-
    Echidnatron% cargo run --example types --features bincode
+

and make sure to include the serde_derive and serde crates.

+
    Echidnatron% cargo run --example types
         Finished dev [unoptimized + debuginfo] target(s) in 0.07s
          Running `target/debug/examples/types`
     seen: TreeNode { data: 0, children: [] }
diff --git a/print.html b/print.html
index 1c42b11c2..ccd0b5a39 100644
--- a/print.html
+++ b/print.html
@@ -1848,23 +1848,12 @@ 

The Data<
#[derive(Clone)]
 struct YourStruct { .. }

The ExchangeData trait

-

The ExchangeData trait is more complicated, and is established in the communication/ module. There are two options for this trait, which are determined by whether you use the --bincode feature at compilation, or not.

-
    -
  • -

    If you use --bincode then the trait is a synonym for

    +

    The ExchangeData trait is more complicated, and is established in the communication/ module. The trait is a synonym for

    Send+Sync+Any+serde::Serialize+for<'a>serde::Deserialize<'a>+'static

    where serde is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:

    #[derive(Serialize, Deserialize)]

    You must include the serde crate, and if not on Rust 2018 the serde_derive crate.

    -

    The downside to the --bincode flag is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.

    -
  • -
  • -

    If you do not use the --bincode feature, then the Serialize and Deserialize requirements are replaced by Abomonation, from the abomonation crate. This trait allows in-place deserialization, but is implemented for fewer types, and has the potential to be a bit scarier (due to in-place pointer correction).

    -

    Your types likely do not implement Abomonation by default, but you can similarly use

    -
    #[derive(Abomonation)]
    -

    You must include the abomonation and abomonation_derive crate for this to work correctly.

    -
  • -
+

The downside to is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.

An example

Let's imagine you would like to play around with a tree data structure as something you might send around in timely dataflow. I've written the following candidate example:

extern crate timely;
@@ -1944,7 +1933,7 @@ 

Exchanging da } }

We get a new error. A not especially helpful error. It says that it cannot find an exchange method, or more specifically that one exists but it doesn't apply to our type at hand. This is because the data need to satisfy the ExchangeData trait but do not. It would be better if this were clearer in the error messages, I agree.

-

We can fix the problem two ways. First, if you would like to use bincode, then we update the source like so:

+

The fix is to update the source like so:

#[macro_use]
 extern crate serde_derive;
 extern crate serde;
@@ -1954,8 +1943,8 @@ 

Exchanging da data: D, children: Vec<TreeNode<D>>, }

-

and make sure to include the serde_derive and serde crates. Now when we run things (notice the --features flag) we see:

-
    Echidnatron% cargo run --example types --features bincode
+

and make sure to include the serde_derive and serde crates.

+
    Echidnatron% cargo run --example types
         Finished dev [unoptimized + debuginfo] target(s) in 0.07s
          Running `target/debug/examples/types`
     seen: TreeNode { data: 0, children: [] }
diff --git a/searchindex.js b/searchindex.js
index f05c422ee..2758193f2 100644
--- a/searchindex.js
+++ b/searchindex.js
@@ -1 +1 @@
-Object.assign(window.search, {"doc_urls":["introduction.html#timely-dataflow","chapter_0/chapter_0.html#motivation","chapter_0/chapter_0_0.html#a-simplest-example","chapter_0/chapter_0_1.html#an-example","chapter_0/chapter_0_2.html#when-to-use-timely-dataflow","chapter_0/chapter_0_2.html#generality","chapter_0/chapter_0_3.html#when-not-to-use-timely-dataflow","chapter_1/chapter_1.html#chapter-1-core-concepts","chapter_1/chapter_1.html#dataflow","chapter_1/chapter_1.html#timestamps","chapter_1/chapter_1.html#progress","chapter_1/chapter_1_1.html#dataflow-programming","chapter_1/chapter_1_1.html#an-example","chapter_1/chapter_1_2.html#logical-timestamps","chapter_1/chapter_1_2.html#an-example","chapter_1/chapter_1_2.html#timestamps-for-dataflow-operators","chapter_1/chapter_1_3.html#tracking-progress","chapter_1/chapter_1_3.html#input-capabilities","chapter_1/chapter_1_3.html#output-possibilities","chapter_1/chapter_1_3.html#responding-to-progress-information","chapter_2/chapter_2.html#building-timely-dataflows","chapter_2/chapter_2_1.html#creating-inputs","chapter_2/chapter_2_1.html#other-sources","chapter_2/chapter_2_2.html#observing-outputs","chapter_2/chapter_2_2.html#inspecting-batches","chapter_2/chapter_2_2.html#capturing-streams","chapter_2/chapter_2_3.html#introducing-operators","chapter_2/chapter_2_3.html#mapping","chapter_2/chapter_2_3.html#map-variants","chapter_2/chapter_2_3.html#filtering","chapter_2/chapter_2_3.html#logical-partitioning","chapter_2/chapter_2_3.html#physical-partitioning","chapter_2/chapter_2_3.html#other-operators","chapter_2/chapter_2_4.html#creating-operators","chapter_2/chapter_2_4.html#other-shapes","chapter_2/chapter_2_4.html#capabilities","chapter_2/chapter_2_4.html#stateful-operators","chapter_2/chapter_2_4.html#frontiered-operators","chapter_2/chapter_2_5.html#a-worked-example","chapter_2/chapter_2_5.html#starting-out-with-text-streams","chapter_2/chapter_2_5.html#breaking-text-into-words","chapter_2/chapter_2_5.html#maintaining-word-counts","chapter_2/chapter_2_5.html#the-finished-product","chapter_3/chapter_3.html#running-timely-dataflows","chapter_3/chapter_3_1.html#providing-input","chapter_3/chapter_3_1.html#controlling-capabilities","chapter_3/chapter_3_2.html#monitoring-probes","chapter_3/chapter_3_3.html#operator-execution","chapter_3/chapter_3_4.html#extending-dataflows","chapter_4/chapter_4.html#advanced-timely-dataflow","chapter_4/chapter_4_1.html#scopes","chapter_4/chapter_4_1.html#entering-and-exiting-scopes","chapter_4/chapter_4_1.html#regions","chapter_4/chapter_4_1.html#iteration","chapter_4/chapter_4_2.html#iteration","chapter_4/chapter_4_2.html#mutual-recursion","chapter_4/chapter_4_2.html#scopes","chapter_4/chapter_4_3.html#flow-control","chapter_4/chapter_4_4.html#capture-and-replay","chapter_4/chapter_4_4.html#capturing-streams","chapter_4/chapter_4_4.html#replaying-streams","chapter_4/chapter_4_4.html#an-example","chapter_4/chapter_4_4.html#capture-types","chapter_4/chapter_4_5.html#custom-datatypes","chapter_4/chapter_4_5.html#the-data-trait","chapter_4/chapter_4_5.html#the-exchangedata-trait","chapter_4/chapter_4_5.html#an-example","chapter_4/chapter_4_5.html#exchanging-data","chapter_5/chapter_5.html#internals","chapter_5/chapter_5_1.html#communication","chapter_5/chapter_5_1.html#configuration","chapter_5/chapter_5_1.html#channel-allocators","chapter_5/chapter_5_1.html#the-data-trait","chapter_5/chapter_5_1.html#push-and-pull","chapter_5/chapter_5_1.html#push","chapter_5/chapter_5_1.html#pull","chapter_5/chapter_5_1.html#guarded-computation","chapter_5/chapter_5_2.html#progress-tracking","chapter_5/chapter_5_2.html#dataflow-structure","chapter_5/chapter_5_2.html#maintaining-capabilities","chapter_5/chapter_5_2.html#communicating-implications","chapter_5/chapter_5_2.html#path-summaries","chapter_5/chapter_5_2.html#operator-summaries","chapter_5/chapter_5_2.html#a-compiled-representation","chapter_5/chapter_5_2.html#a-safety-property"],"index":{"documentStore":{"docInfo":{"0":{"body":75,"breadcrumbs":4,"title":2},"1":{"body":23,"breadcrumbs":2,"title":1},"10":{"body":111,"breadcrumbs":3,"title":1},"11":{"body":34,"breadcrumbs":5,"title":2},"12":{"body":625,"breadcrumbs":4,"title":1},"13":{"body":85,"breadcrumbs":5,"title":2},"14":{"body":195,"breadcrumbs":4,"title":1},"15":{"body":102,"breadcrumbs":6,"title":3},"16":{"body":112,"breadcrumbs":5,"title":2},"17":{"body":67,"breadcrumbs":5,"title":2},"18":{"body":82,"breadcrumbs":5,"title":2},"19":{"body":88,"breadcrumbs":6,"title":3},"2":{"body":123,"breadcrumbs":5,"title":2},"20":{"body":111,"breadcrumbs":6,"title":3},"21":{"body":94,"breadcrumbs":7,"title":2},"22":{"body":44,"breadcrumbs":6,"title":1},"23":{"body":61,"breadcrumbs":7,"title":2},"24":{"body":58,"breadcrumbs":7,"title":2},"25":{"body":111,"breadcrumbs":7,"title":2},"26":{"body":34,"breadcrumbs":7,"title":2},"27":{"body":134,"breadcrumbs":6,"title":1},"28":{"body":106,"breadcrumbs":7,"title":2},"29":{"body":51,"breadcrumbs":6,"title":1},"3":{"body":677,"breadcrumbs":4,"title":1},"30":{"body":200,"breadcrumbs":7,"title":2},"31":{"body":92,"breadcrumbs":7,"title":2},"32":{"body":26,"breadcrumbs":6,"title":1},"33":{"body":299,"breadcrumbs":7,"title":2},"34":{"body":74,"breadcrumbs":6,"title":1},"35":{"body":308,"breadcrumbs":6,"title":1},"36":{"body":187,"breadcrumbs":7,"title":2},"37":{"body":428,"breadcrumbs":7,"title":2},"38":{"body":188,"breadcrumbs":7,"title":2},"39":{"body":265,"breadcrumbs":9,"title":4},"4":{"body":246,"breadcrumbs":7,"title":3},"40":{"body":162,"breadcrumbs":8,"title":3},"41":{"body":738,"breadcrumbs":8,"title":3},"42":{"body":128,"breadcrumbs":7,"title":2},"43":{"body":116,"breadcrumbs":6,"title":3},"44":{"body":93,"breadcrumbs":7,"title":2},"45":{"body":152,"breadcrumbs":7,"title":2},"46":{"body":262,"breadcrumbs":7,"title":2},"47":{"body":95,"breadcrumbs":7,"title":2},"48":{"body":92,"breadcrumbs":7,"title":2},"49":{"body":51,"breadcrumbs":6,"title":3},"5":{"body":86,"breadcrumbs":5,"title":1},"50":{"body":114,"breadcrumbs":5,"title":1},"51":{"body":100,"breadcrumbs":7,"title":3},"52":{"body":72,"breadcrumbs":5,"title":1},"53":{"body":74,"breadcrumbs":5,"title":1},"54":{"body":263,"breadcrumbs":5,"title":1},"55":{"body":118,"breadcrumbs":6,"title":2},"56":{"body":107,"breadcrumbs":5,"title":1},"57":{"body":413,"breadcrumbs":7,"title":2},"58":{"body":25,"breadcrumbs":7,"title":2},"59":{"body":177,"breadcrumbs":7,"title":2},"6":{"body":234,"breadcrumbs":7,"title":3},"60":{"body":193,"breadcrumbs":7,"title":2},"61":{"body":276,"breadcrumbs":6,"title":1},"62":{"body":75,"breadcrumbs":7,"title":2},"63":{"body":30,"breadcrumbs":7,"title":2},"64":{"body":26,"breadcrumbs":7,"title":2},"65":{"body":108,"breadcrumbs":7,"title":2},"66":{"body":158,"breadcrumbs":6,"title":1},"67":{"body":182,"breadcrumbs":7,"title":2},"68":{"body":0,"breadcrumbs":2,"title":1},"69":{"body":153,"breadcrumbs":3,"title":1},"7":{"body":24,"breadcrumbs":6,"title":4},"70":{"body":101,"breadcrumbs":3,"title":1},"71":{"body":165,"breadcrumbs":4,"title":2},"72":{"body":75,"breadcrumbs":4,"title":2},"73":{"body":18,"breadcrumbs":4,"title":2},"74":{"body":178,"breadcrumbs":3,"title":1},"75":{"body":91,"breadcrumbs":3,"title":1},"76":{"body":79,"breadcrumbs":4,"title":2},"77":{"body":182,"breadcrumbs":5,"title":2},"78":{"body":176,"breadcrumbs":5,"title":2},"79":{"body":304,"breadcrumbs":5,"title":2},"8":{"body":49,"breadcrumbs":3,"title":1},"80":{"body":46,"breadcrumbs":5,"title":2},"81":{"body":268,"breadcrumbs":5,"title":2},"82":{"body":185,"breadcrumbs":5,"title":2},"83":{"body":29,"breadcrumbs":5,"title":2},"84":{"body":415,"breadcrumbs":5,"title":2},"9":{"body":80,"breadcrumbs":3,"title":1}},"docs":{"0":{"body":"In this book we will work through the motivation and technical details behind timely dataflow , which is both a system for implementing distributed streaming computation, and if you look at it right, a way to structure computation generally. Timely dataflow arose from work at Microsoft Research , where a group of us worked on building scalable, distributed data processing platforms. Our experience was that other systems did not provide both expressive computation and high performance . Efficient systems would only let you write restricted programs, and expressive systems employed synchronous and otherwise inefficient execution. Our goal was to provide a not-unpleasant experience where you could write sophisticated streaming computations (e.g. with iterative control flow), which nonetheless compile down to systems that execute with only a modicum of overhead and synchronization.","breadcrumbs":"Timely Dataflow » Timely Dataflow","id":"0","title":"Timely Dataflow"},"1":{"body":"Let's start with some motivation: what can you do with timely dataflow, and when should you be excited to use it as opposed to other programming frameworks? Is timely dataflow great for everything, or is it only great for a few things? Is it great for anything? We will try and clarify these questions in this section.","breadcrumbs":"Motivation » Motivation","id":"1","title":"Motivation"},"10":{"body":"In a traditional imperative program, if we want to return the maximum of a set of numbers, we just scan all the numbers and return the maximum. We don't have to worry about whether we've considered all of the numbers yet, because the program makes sure not to provide an answer until it has consulted each number. This simple task is much harder in a dataflow setting, where numbers arrive as input to a component that is tracking the maximum. Before releasing a number as output, the component must know if it has seen everything, as one more value could change its answer. But strictly speaking, nothing we've said so far about dataflow or timestamps provide any information about whether more data might arrive. If we combine dataflow program structure with timestamped data in such a way that as data move along the dataflow their timestamps only increase, we are able to reason about the progress of our computation. More specifically, at any component in the dataflow, we can reason about which timestamps we may yet see in the future. Timestamps that are no longer possible are considered \"passed\", and components can react to this information as they see fit. Continual information about the progress of a computation is the only basis of coordination in timely dataflow, and is the lightest touch we could think of.","breadcrumbs":"Core Concepts » Progress","id":"10","title":"Progress"},"11":{"body":"Dataflow programming is fundamentally about describing your program as independent components, each of which operate in response to the availability of input data, as well as describing the connections between these components. This has several advantages, mostly in how it allows a computer to execute your program, but it can take a bit of thinking to re-imagine your imperative computation as a dataflow computation.","breadcrumbs":"Core Concepts » Dataflow » Dataflow Programming","id":"11","title":"Dataflow Programming"},"12":{"body":"Let's write an overly simple dataflow program. Remember our examples/hello.rs program? We are going to revisit that, but with some timestamp aspects removed. The goal is to get a sense for dataflow with all of its warts, and to get you excited for the next section where we bring back the timestamps. :) Here is a reduced version of examples/hello.rs that just feeds data into our dataflow, without paying any attention to progress made. In particular, we have removed the probe() operation, the resulting probe variable, and the use of probe to determine how long we should step the worker before introducing more data. #![allow(unused_variables)]\nextern crate timely; use timely::dataflow::InputHandle;\nuse timely::dataflow::operators::{Input, Exchange, Inspect, Probe}; fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { let index = worker.index(); let mut input = InputHandle::new(); // create a new input, exchange data, and inspect its output let probe = worker.dataflow(|scope| scope.input_from(&mut input) .exchange(|x| *x) .inspect(move |x| println!(\"worker {}:\\thello {}\", index, x)) .probe() ); // introduce data and watch! for round in 0..10 { if worker.index() == 0 { input.send(round); } input.advance_to(round + 1); // worker.step_while(|| probe.less_than(input.time())); } }).unwrap();\n} This program is a dataflow program . There are two dataflow operators here, exchange and inspect, each of which is asked to do a thing in response to input data. The exchange operator takes each datum and hands it to a downstream worker based on the value it sees; with two workers, one will get all the even numbers and the other all the odd numbers. The inspect operator takes an action for each datum, in this case printing something to the screen. Importantly, we haven't imposed any constraints on how these operators need to run. We removed the code that caused the input to be delayed until a certain amount of progress had been made, and it shows in the results when we run with more than one worker: Echidnatron% cargo run --example hello -- -w2 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -w2` worker 1:\thello 1 worker 1:\thello 3 worker 0:\thello 0 worker 1:\thello 5 worker 0:\thello 2 worker 1:\thello 7 worker 1:\thello 9 worker 0:\thello 4 worker 0:\thello 6 worker 0:\thello 8 Echidnatron% What a mess. Nothing in our dataflow program requires that workers zero and one alternate printing to the screen, and you can even see that worker one is done before worker zero even gets to printing hello 4. However, this is only a mess if we are concerned about the order, and in many cases we are not. Imagine instead of just printing the number to the screen, we want to find out which numbers are prime and print them to the screen. .inspect(|x| { // we only need to test factors up to sqrt(x) let limit = (*x as f64).sqrt() as u64; if *x > 1 && (2 .. limit + 1).all(|i| x % i > 0) { println!(\"{} is prime\", x); }\n}) We don't really care that much about the order (we just want the results), and we have written such a simple primality test that we are going to be thrilled if we can distribute the work across multiple cores. Let's check out the time to print out the prime numbers up to 10,000 using one worker: Echidnatron% time cargo run --example hello -- -w1 > output1.txt Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -w1` cargo run --example hello -- -w1 > output1.txt 59.84s user 0.10s system 99% cpu 1:00.01 total Echidnatron% And now again with two workers: Echidnatron% time cargo run --example hello -- -w2 > output2.txt Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -w2` cargo run --example hello -- -w2 > output2.txt 60.74s user 0.12s system 196% cpu 30.943 total Echidnatron% The time is basically halved, from one minute to thirty seconds, which is a great result for those of us who like factoring small numbers. Furthermore, although the 1,262 lines of results of output1.txt and output2.txt are not in the same order, it takes a fraction of a second to make them so, and verify that they are identical: Echidnatron% sort output1.txt > sorted1.txt Echidnatron% sort output2.txt > sorted2.txt Echidnatron% diff sorted1.txt sorted2.txt Echidnatron% This is probably as good a time as any to tell you about Rust's --release flag. I haven't been using it up above to keep things simple, but adding the --release flag to cargo's arguments makes the compilation take a little longer, but the resulting program run a lot faster. Let's do that now, to get a sense for how much of a difference it makes: Echidnatron% time cargo run --release --example hello -- -w1 > output1.txt Finished release [optimized] target(s) in 0.0 secs Running `target/release/examples/hello -w1` cargo run --release --example hello -- -w1 > output1.txt 0.78s user 0.06s system 96% cpu 0.881 total Echidnatron% time cargo run --release --example hello -- -w2 > output2.txt Finished release [optimized] target(s) in 0.0 secs Running `target/release/examples/hello -w2` cargo run --release --example hello -- -w2 > output2.txt 0.73s user 0.05s system 165% cpu 0.474 total That is about a 60x speed-up. The good news is that we are still getting approximately a 2x speed-up going from one worker to two, but you can see that dataflow programming does not magically extract all performance from your computer. This is also a fine time to point out that dataflow programming is not religion. There is an important part of our program up above that is imperative: let limit = (*x as f64).sqrt() as u64; if *x > 1 && (2 .. limit + 1).all(|i| x % i > 0) { println!(\"{} is prime\", x); } This is an imperative fragment telling the inspect operator what to do. We could write this as a dataflow fragment if we wanted, but it is frustrating to do so, and less efficient. The control flow fragment lets us do something important, something that dataflow is bad at: the all method above stops as soon as it sees a factor of x. There is a time and a place for dataflow programming and for control flow programming. We are going to try and get the best of both.","breadcrumbs":"Core Concepts » Dataflow » An example","id":"12","title":"An example"},"13":{"body":"When dataflow programs move data around arbitrarily, it becomes hard to correlate the produced outputs with the supplied inputs. If we supply a stream of bank transactions as input, and the output is a stream of bank balances, how can we know which input transactions are reflected in which output balances? The standard approach to this problem is to install timestamps on the data. Each record gets a logical timestamp associated with it that indicates when it should be thought to happen. This is not necessarily \"when\" in terms of the date, time, or specific nanosecond the record was emitted; a timestamp could simply be a sequence number identifying a batch of input records. Or, and we will get into the terrifying details later, it could be much more complicated than this. Timestamps are what allow us to correlate inputs and outputs. When we introduce records with some logical timestamp, unless our dataflow computation changes the timestamps, we expect to see corresponding outputs with that same timestamp.","breadcrumbs":"Core Concepts » Timestamps » Logical Timestamps","id":"13","title":"Logical Timestamps"},"14":{"body":"Remember from the dataflow section how when we remove the coordination from our examples/hello.rs program, the output was produced in some horrible order? In fact, each of those records had a timestamp associated with it that would reveal the correct order; we just weren't printing the timestamp because inspect doesn't have access to it. Let's change the program to print out the timestamp with each record. This shouldn't be a very thrilling output, because the timestamp is exactly the same as the number itself, but that didn't have to be the case. We are just going to replace the line .inspect(move |x| println!(\"worker {}:\\thello {}\", index, x)) with a slightly more complicated operator, inspect_batch. .inspect_batch(move |t,xs| { for x in xs.iter() { println!(\"worker {}:\\thello {} @ {:?}\", index, x, t) }\n}) The inspect_batch operator gets lower-level access to data in timely dataflow, in particular access to batches of records with the same timestamp. It is intended for diagnosing system-level details, but we can also use it to see what timestamps accompany the data. The output we get with two workers is now: Echidnatron% cargo run --example hello -- -w2 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -w2` worker 1:\thello 1 @ (Root, 1) worker 1:\thello 3 @ (Root, 3) worker 1:\thello 5 @ (Root, 5) worker 0:\thello 0 @ (Root, 0) worker 0:\thello 2 @ (Root, 2) worker 0:\thello 4 @ (Root, 4) worker 0:\thello 6 @ (Root, 6) worker 0:\thello 8 @ (Root, 8) worker 1:\thello 7 @ (Root, 7) worker 1:\thello 9 @ (Root, 9) Echidnatron% The timestamps are the (Root, i) things for various values of i. These happen to correspond to the data themselves, but had we provided random input data rather than i itself we would still be able to make sense of the output and put it back \"in order\".","breadcrumbs":"Core Concepts » Timestamps » An example","id":"14","title":"An example"},"15":{"body":"Timestamps are not only helpful for dataflow users, but for the operators themselves. With time we will start to write more interesting dataflow operators, and it may be important for them to understand which records should be thought to come before others. Imagine, for example, a dataflow operator whose job is to report the \"sum so far\", where \"so far\" should be with respect to the timestamp (as opposed to whatever arbitrary order the operator receives the records). Such an operator can't simply take its input records, add them to a total, and produce the result. The input records may no longer be ordered by timestamp, and the produced summations may not reflect any partial sum of the input. Instead, the operator needs to look at the timestamps on the records, and incorporate the numbers in order of their timestamps. Of course, such an operator works great as long as it expects exactly one record for each timestamp. Things get harder for it if it might receive multiple records at each timestamp, or perhaps none. To address this, the underlying system will have to help the operator reason about the progress of its input, up next.","breadcrumbs":"Core Concepts » Timestamps » Timestamps for dataflow operators","id":"15","title":"Timestamps for dataflow operators"},"16":{"body":"Both dataflow and timestamps are valuable in their own right, but when we bring them together we get something even better. We get the ability to reason about the flow of timestamps through our computation, and we recover the ability to inform each dataflow component about how much of its input data it has seen. Let's recall that bit of code we commented out from examples/hello.rs, which had to do with consulting something named probe. extern crate timely; use timely::dataflow::InputHandle;\nuse timely::dataflow::operators::{Input, Exchange, Inspect, Probe}; fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { let index = worker.index(); let mut input = InputHandle::new(); // create a new input, exchange data, and inspect its output let probe = worker.dataflow(|scope| scope.input_from(&mut input) .exchange(|x| *x) .inspect(move |x| println!(\"worker {}:\\thello {}\", index, x)) .probe() ); // introduce data and watch! for round in 0..10 { if worker.index() == 0 { input.send(round); } input.advance_to(round + 1); worker.step_while(|| probe.less_than(input.time())); } }).unwrap();\n} We'll put the whole program up here, but there are really just two lines that deal with progress tracking: input.advance_to(round + 1);\nworker.step_while(|| probe.less_than(input.time())); Let's talk about each of them.","breadcrumbs":"Core Concepts » Progress » Tracking Progress","id":"16","title":"Tracking Progress"},"17":{"body":"The input structure is how we provide data to a timely dataflow computation, and it has a timestamp associated with it. Initially this timestamp is the default value, usually something like 0 for integers. Whatever timestamp input has, it can introduce data with that timestamp or greater. We can advance this timestamp, via the advance_to method, which restricts the timestamps we can use to those greater or equal to whatever timestamp is supplied as the argument. The advance_to method is a big deal. This is the moment in the computation where our program reveals to the system, and through the system to all other dataflow workers, that we might soon be able to announce a timestamp as complete. There may still be records in flight bearing that timestamp, but as they are retired the system can finally report that progress has been made.","breadcrumbs":"Core Concepts » Progress » Input capabilities","id":"17","title":"Input capabilities"},"18":{"body":"The probe structure is how we learn about the possibility of timestamped data at some point in the dataflow graph. We can, at any point, consult a probe with the less_than method and ask whether it is still possible that we might see a time less than the argument at that point in the dataflow graph. There is also a less_equal method, if you prefer that. Putting a probe after the inspect operator, which passes through all data it receives as input only after invoking its method, tells us whether we should expect to see the method associated with inspect fire again for a given timestamp. If we are told we won't see any more messages with timestamp t after the inspect, then the inspect won't see any either. The less_than and less_equal methods are the only place where we learn about the state of the rest of the system. These methods are non-blocking; they always return immediately with either a \"yes, you might see such a timestamp\" or a \"no, you will not see such a timestamp\".","breadcrumbs":"Core Concepts » Progress » Output possibilities","id":"18","title":"Output possibilities"},"19":{"body":"Progress information is relatively passive. We get to observe what happens in the rest of the system, and perhaps change our behavior based on the amount of progress. We do not get to tell the system what to do next, we just get to see what has happened since last we checked. This passive approach to coordination allows the system to operate with minimal overhead. Workers exchange both data and progress information. If workers want to wait for further progress before introducing more data they see they are welcome to do so, but they can also go and work on a different part of the dataflow graph as well. Progress information provides a relatively unopinionated view of coordination. Workers are welcome to impose a more synchronous discipline using progress information, perhaps proceeding in sequence through operators by consulting probes installed after each of them, but they are not required to do so. Synchronization is possible, but it becomes a choice made by the workers themselves, rather than imposed on them by the system.","breadcrumbs":"Core Concepts » Progress » Responding to progress information","id":"19","title":"Responding to progress information"},"2":{"body":"Let's start with what may be the simplest non-trivial timely dataflow program. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect}; fn main() { timely::example(|scope| { (0..10).to_stream(scope) .inspect(|x| println!(\"seen: {:?}\", x)); });\n} This program gives us a bit of a flavor for what a timely dataflow program might look like, including a bit of what Rust looks like, without getting too bogged down in weird stream processing details. Not to worry; we will do that in just a moment! If we run the program up above, we see it print out the numbers zero through nine. Echidnatron% cargo run --example simple Finished dev [unoptimized + debuginfo] target(s) in 0.05s Running `target/debug/examples/simple` seen: 0 seen: 1 seen: 2 seen: 3 seen: 4 seen: 5 seen: 6 seen: 7 seen: 8 seen: 9 Echidnatron% This isn't very different from a Rust program that would do this much more simply, namely the program fn main() { (0..10).for_each(|x| println!(\"seen: {:?}\", x));\n} Why would we want to make our life so complicated? The main reason is that we can make our program reactive , so that we can run it without knowing ahead of time the data we will use, and it will respond as we produce new data.","breadcrumbs":"Motivation » A Simplest Example » A simplest example","id":"2","title":"A simplest example"},"20":{"body":"Let's talk about how to create timely dataflows. This section will be a bit of a tour through the dataflow construction process, ignoring for the moment details about the interesting ways in which you can get data into and out of your dataflow; those will show up in the \"Running Timely Dataflows\" section. For now we are going to work with examples with fixed input data and no interactivity to speak of, focusing on what we can cause to happen to that data. Here is a relatively simple example, taken from timely/examples/simple.rs, that turns the numbers zero through nine into a stream, and then feeds them through an inspect operator printing them to the screen. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect}; fn main() { timely::example(|scope| { (0..10).to_stream(scope) .inspect(|x| println!(\"seen: {:?}\", x)); });\n} We are going to develop out this example, showing off both the built-in operators as well as timely's generic operator construction features. NOTE : Timely very much assumes that you are going to build the same dataflow on each worker. You don't literally have to, in that you could build a dataflow from user input, or with a random number generator, things like that. Please don't! It will not be a good use of your time.","breadcrumbs":"Building Timely Dataflows » Building Timely Dataflows","id":"20","title":"Building Timely Dataflows"},"21":{"body":"Let's start with the first thing we'll want for a dataflow computation: a source of data. Almost all operators in timely can only be defined from a source of data, with a few exceptions. One of these exceptions is the to_stream operator, which is defined for various types and which takes a scope as an argument and produces a stream in that scope. Our InputHandle type from previous examples has a to_stream method, as well as any type that can be turned into an iterator (which we used in the preceding example). For example, we can create a new dataflow with one interactive input and one static input: extern crate timely; use timely::dataflow::InputHandle;\nuse timely::dataflow::operators::ToStream; fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { let mut input = InputHandle::<(), String>::new(); // define a new dataflow worker.dataflow(|scope| { let stream1 = input.to_stream(scope); let stream2 = (0 .. 9).to_stream(scope); }); }).unwrap();\n} There will be more to do to get data into input, and we aren't going to worry about that at the moment. But, now you know two of the places you can get data from!","breadcrumbs":"Building Timely Dataflows » Creating Inputs » Creating Inputs","id":"21","title":"Creating Inputs"},"22":{"body":"There are other sources of input that are a bit more advanced. Once we learn how to create custom operators, the source method will allow us to create a custom operator with zero input streams and one output stream, which looks like a source of data (hence the name). There are also the Capture and Replay traits that allow us to exfiltrate the contents of a stream from one dataflow (using capture_into) and re-load it in another dataflow (using replay_into).","breadcrumbs":"Building Timely Dataflows » Creating Inputs » Other sources","id":"22","title":"Other sources"},"23":{"body":"Having constructed a minimal streaming computation, we might like to take a peek at the output. There are a few ways to do this, but the simplest by far is the inspect operator. The inspect operator is called with a closure, and it ensures that the closure is run on each record that passes through the operator. This closure can do just about anything, from printing to the screen or writing to a file. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 9) .to_stream(scope) .inspect(|x| println!(\"hello: {}\", x)); }); }).unwrap();\n} This simple example turns the sequence zero through nine into a stream and then prints the results to the screen.","breadcrumbs":"Building Timely Dataflows » Observing Outputs » Observing Outputs","id":"23","title":"Observing Outputs"},"24":{"body":"The inspect operator has a big sibling, inspect_batch, whose closure gets access to whole batches of records at a time, just like the underlying operator. More precisely, inspect_batch takes a closure of two parameters: first, the timestamp of a batch, and second a reference to the batch itself. The inspect_batch operator can be especially helpful if you want to process the outputs more efficiently. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 10) .to_stream(scope) .inspect_batch(|t, xs| println!(\"hello: {:?} @ {:?}\", xs, t)); }); }).unwrap();\n}","breadcrumbs":"Building Timely Dataflows » Observing Outputs » Inspecting Batches","id":"24","title":"Inspecting Batches"},"25":{"body":"The Capture trait provides a mechanism for exfiltrating a stream from a dataflow, into information that can be replayed in other dataflows. The trait is pretty general, and can even capture a stream to a binary writer that can be read back from to reconstruct the stream (see examples/capture_send.rs and examples/capture_recv.rs). The simplest form of capture is the capture() method, which turns the stream into a shared queue of \"events\", which are the sequence of events the operator is exposed to: data arriving and notification of progress through the input stream. The capture method is used in many of timely's documentation tests, to extract a stream and verify that it is correct. Consider the documentation test for the ToStream trait: extern crate timely; use timely::dataflow::operators::{ToStream, Capture};\nuse timely::dataflow::operators::capture::Extract; fn main() { let (data1, data2) = timely::example(|scope| { let data1 = (0..3).to_stream(scope).capture(); let data2 = vec![0,1,2].to_stream(scope).capture(); (data1, data2) }); assert_eq!(data1.extract(), data2.extract());\n} Here the two capture methods each return the receive side of one of Rust's threadsafe channels. The data moving along the channel have a type capture::Event which you would need to read about, but which your main thread can drain out of the channel and process as it sees fit.","breadcrumbs":"Building Timely Dataflows » Observing Outputs » Capturing Streams","id":"25","title":"Capturing Streams"},"26":{"body":"In between introducing streams of data and inspecting or capturing the output, we'll probably want to do some computation on those data. There are a lot of things that you can do, and timely comes with a set of generally useful operators built in. We will survey a few of these, but this list will be necessarily incomplete: the operators are pretty easy to write, and keep showing up.","breadcrumbs":"Building Timely Dataflows » Adding Operators » Introducing Operators","id":"26","title":"Introducing Operators"},"27":{"body":"One of the simplest things one can do with a stream of data is to transform each record into a new record. In database terminology this would be called \"projection\", where you extract some fields from a larger record, but as we are in a more rich programming language we can perform arbitrary transformations. The map operator takes as an argument a closure from the input data type to an output data type that you get to define. The result is the stream of records corresponding to the application of your closure to each record in the input stream. The following program should print out the numbers one through ten. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect, Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 9) .to_stream(scope) .map(|x| x + 1) .inspect(|x| println!(\"hello: {}\", x)); }); }).unwrap();\n} The closure map takes owned data as input, which means you are able to mutate it as you like without cloning or copying the data. For example, if you have a stream of String data, then you could upper-case the string contents without having to make a second copy; your closure owns the data that comes in, with all the benefits that entails. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect, Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 9) .to_stream(scope) .map(|x| x.to_string()) .map(|mut x| { x.truncate(5); x } ) .inspect(|x| println!(\"hello: {}\", x)); }); }).unwrap();\n}","breadcrumbs":"Building Timely Dataflows » Adding Operators » Mapping","id":"27","title":"Mapping"},"28":{"body":"There are a few variants of map with different functionality. For example, the map_in_place method takes a closure which receives a mutable reference and produces no output; instead, this method allows you to change the data in-place , which can be a valuable way to avoid duplication of resources. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect, Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 9) .to_stream(scope) .map(|x| x.to_string()) .map_in_place(|x| x.truncate(5)) .inspect(|x| println!(\"hello: {}\", x)); }); }).unwrap();\n} Alternately, the flat_map method takes input data and allows your closure to transform each element to an iterator, which it then enumerates into the output stream. The following fragment takes each number from zero through eight and has each produce all numbers less than it. The result should be 8 zeros, 7 ones, and so on up to 1 seven. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect, Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 9) .to_stream(scope) .flat_map(|x| 0 .. x) .inspect(|x| println!(\"hello: {}\", x)); }); }).unwrap();\n}","breadcrumbs":"Building Timely Dataflows » Adding Operators » Map variants","id":"28","title":"Map variants"},"29":{"body":"Another fundamental operation is filtering , in which a predicate dictates a subset of the stream to retain. extern crate timely; use timely::dataflow::operators::{ToStream, Inspect, Filter}; fn main() { timely::execute_from_args(std::env::args(), |worker| { worker.dataflow::<(),_,_>(|scope| { (0 .. 9) .to_stream(scope) .filter(|x| *x % 2 == 0) .inspect(|x| println!(\"hello: {}\", x)); }); }).unwrap();\n} Unlike map, the predicate passed to the filter operator does not receive owned data, but rather a reference to the data. This allows filter to observe the data to determine whether to keep it, but not to change it.","breadcrumbs":"Building Timely Dataflows » Adding Operators » Filtering","id":"29","title":"Filtering"},"3":{"body":"Timely dataflow means to capture a large number of idioms, so it is a bit tricky to wrap together one example that shows off all of its features, but let's look at something that shows off some core functionality to give a taste. The following complete program initializes a timely dataflow computation, in which participants can supply a stream of numbers which are exchanged between the workers based on their value. Workers print to the screen when they see numbers. You can also find this as examples/hello.rs in the timely dataflow repository . extern crate timely; use timely::dataflow::InputHandle;\nuse timely::dataflow::operators::{Input, Exchange, Inspect, Probe}; fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { let index = worker.index(); let mut input = InputHandle::new(); // create a new input, exchange data, and inspect its output let probe = worker.dataflow(|scope| scope.input_from(&mut input) .exchange(|x| *x) .inspect(move |x| println!(\"worker {}:\\thello {}\", index, x)) .probe() ); // introduce data and watch! for round in 0..10 { if index == 0 { input.send(round); } input.advance_to(round + 1); while probe.less_than(input.time()) { worker.step(); } } }).unwrap();\n} We can run this program in a variety of configurations: with just a single worker thread, with one process and multiple worker threads, and with multiple processes each with multiple worker threads. To try this out yourself, first clone the timely dataflow repository using git Echidnatron% git clone https://github.com/TimelyDataflow/timely-dataflow Cloning into 'timely-dataflow'... remote: Counting objects: 14254, done. remote: Compressing objects: 100% (2267/2267), done. remote: Total 14254 (delta 2625), reused 3824 (delta 2123), pack-reused 9856 Receiving objects: 100% (14254/14254), 9.01 MiB | 1.04 MiB/s, done. Resolving deltas: 100% (10686/10686), done. Now cd into the directory and build timely dataflow by typing Echidnatron% cd timely-dataflow Echidnatron% cargo build Updating registry `https://github.com/rust-lang/crates.io-index` Compiling timely_sort v0.1.6 Compiling byteorder v0.4.2 Compiling libc v0.2.29 Compiling abomonation v0.4.5 Compiling getopts v0.2.14 Compiling time v0.1.38 Compiling timely_communication v0.1.7 Compiling timely v0.2.0 (file:///Users/mcsherry/Projects/temporary/timely-dataflow) Finished dev [unoptimized + debuginfo] target(s) in 6.37 secs Now we build the hello example Echidnatron% cargo build --example hello Compiling rand v0.3.16 Compiling timely v0.2.0 (file:///Users/mcsherry/Projects/temporary/timely-dataflow) Finished dev [unoptimized + debuginfo] target(s) in 6.35 secs And finally we run the hello example Echidnatron% cargo run --example hello Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello` worker 0:\thello 0 worker 0:\thello 1 worker 0:\thello 2 worker 0:\thello 3 worker 0:\thello 4 worker 0:\thello 5 worker 0:\thello 6 worker 0:\thello 7 worker 0:\thello 8 worker 0:\thello 9 Echidnatron% Rust is relatively clever, and we could have skipped the cargo build and cargo build --example hello commands; just invoking cargo run --example hello will build (or rebuild) anything necessary. Of course, we can run this with multiple workers using the -w or --workers flag, followed by the number of workers we want in the process. Notice that you'll need an -- before the arguments to our program; any arguments before that are treated as arguments to the cargo command. Echidnatron% cargo run --example hello -- -w2 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -w2` worker 0:\thello 0 worker 1:\thello 1 worker 0:\thello 2 worker 1:\thello 3 worker 0:\thello 4 worker 1:\thello 5 worker 0:\thello 6 worker 1:\thello 7 worker 0:\thello 8 worker 1:\thello 9 Echidnatron% Although you can't easily see this happening, timely dataflow has spun up two worker threads and together they have exchanged some data and printed the results as before. However, notice that the worker index is now varied; this is our only clue that different workers exist, and processed different pieces of data. Worker zero introduces all of the data (notice the guard in the code; without this each worker would introduce 0 .. 10), and then it is shuffled between the workers. The only guarantee is that records that evaluate to the same integer in the exchange closure go to the same worker. In practice, we (currently) route records based on the remainder of the number when divided by the number of workers. Finally, let's run with multiple processes. To do this, you use the -n and -p arguments, which tell each process how many total processes to expect (the -n parameter) and which index this process should identify as (the -p parameter). You can also use -h to specify a host file with names and ports of each of the processes involved, but if you leave it off timely defaults to using the local host. In one shell, I'm going to start a computation that expects multiple processes. It will hang out waiting for the other processes to start up. Echidnatron% cargo run --example hello -- -n2 -p0 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -n2 -p0` Now if we head over to another shell, we can type the same thing but with a different -p identifier. Echidnatron% cargo run --example hello -- -n2 -p1 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -n2 -p1` worker 1:\thello 1 worker 1:\thello 3 worker 1:\thello 5 worker 1:\thello 7 worker 1:\thello 9 Echidnatron% Wow, fast! And, we get to see some output too. Only the output for this worker, though. If we head back to the other shell we see the process got moving and produced the other half of the output. Echidnatron% cargo run --example hello -- -n2 -p0 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -n2 -p0` worker 0:\thello 0 worker 0:\thello 2 worker 0:\thello 4 worker 0:\thello 6 worker 0:\thello 8 Echidnatron% This may seem only slightly interesting so far, but we will progressively build up more interesting tools and more interesting computations, and see how timely dataflow can efficiently execute them for us.","breadcrumbs":"Motivation » A Simple Example » An example","id":"3","title":"An example"},"30":{"body":"There are two operators for splitting and combining streams, partition and concat respectively. The partition operator takes two arguments, a number of resulting streams to produce, and a closure which takes each record to a pair of the target partition identifier and the output record. The output of partition is a list of streams, where each stream contains those elements mapped to the stream under the closure. extern crate timely; use timely::dataflow::operators::{ToStream, Partition, Inspect}; fn main() { timely::example(|scope| { let streams = (0..10).to_stream(scope) .partition(3, |x| (x % 3, x)); streams[0].inspect(|x| println!(\"seen 0: {:?}\", x)); streams[1].inspect(|x| println!(\"seen 1: {:?}\", x)); streams[2].inspect(|x| println!(\"seen 2: {:?}\", x)); });\n} This example breaks the input stream apart into three logical streams, which are then subjected to different inspect operators. Importantly, partition only logically partitions the data, it does not move the data between workers. In the example above, each worker partitions its stream into three parts and no data are exchanged at all (as inspect does not require that of its inputs). In the other direction, concat takes two streams and produces one output stream containing elements sent along either. The following example merges the partitioned streams back together. extern crate timely; use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect}; fn main() { timely::example(|scope| { let streams = (0..10).to_stream(scope) .partition(3, |x| (x % 3, x)); streams[0] .concat(&streams[1]) .concat(&streams[2]) .inspect(|x| println!(\"seen: {:?}\", x)); });\n} There is also a concatenate method defined for scopes which collects all streams from a supplied vector, effectively undoing the work of partition in one operator. extern crate timely; use timely::dataflow::operators::{ToStream, Partition, Concatenate, Inspect}; fn main() { timely::example(|scope| { let streams = (0..10).to_stream(scope) .partition(3, |x| (x % 3, x)); scope.concatenate(streams) .inspect(|x| println!(\"seen: {:?}\", x)); });\n} Both concat and concatenate are efficient operations that exchange no data between workers, operate only on batches of stream elements, and do not make further copies of the data.","breadcrumbs":"Building Timely Dataflows » Adding Operators » Logical Partitioning","id":"30","title":"Logical Partitioning"},"31":{"body":"To complement the logical partitioning of partition, timely also provides the physical partitioning operator exchange which routes records to a worker based on a supplied closure. The exchange operator does not change the contents of the stream, but rather the distribution of elements to the workers. This operation can be important if you would like to collect records before printing statistics to the screen, or otherwise do some work that requires a specific data distribution. Operators that require a specific data distribution will ensure that this occurs as part of their definition. As the programmer, you should not need to invoke exchange. There are times where exchange can be useful. For example, if a stream is used by two operators requiring the same distribution, simply using the stream twice will cause duplicate data exchange as each operator satisfies its requirements. Instead, it may make sense to invoke exchange to move the data once, at which point the two operators will no longer require serialization and communication to shuffle their inputs appropriately.","breadcrumbs":"Building Timely Dataflows » Adding Operators » Physical Partitioning","id":"31","title":"Physical Partitioning"},"32":{"body":"There are any number of other operators, most of which you should be able to find in the timely::dataflow::operators module. Scanning through the documentation for this module may lead you to operators that you need, and alternately their implementations may demonstrate how to construct similar operators, if the one you require is not present. Operator construction is the subject of the next section!","breadcrumbs":"Building Timely Dataflows » Adding Operators » Other operators","id":"32","title":"Other operators"},"33":{"body":"What if there isn't an operator that does what you want to do? What if what you want to do is better written as imperative code rather than a tangle of dataflow operators? Not a problem! Timely dataflow has you covered. Timely has several \"generic\" dataflow operators that are pretty much ready to run, except someone (you) needs to supply their implementation. This isn't as scary as it sounds; you just need to write a closure that says \"given a handle to my inputs and outputs, what do I do when timely asks me to run?\". Let's look at an example extern crate timely; use timely::dataflow::operators::ToStream;\nuse timely::dataflow::operators::generic::operator::Operator;\nuse timely::dataflow::channels::pact::Pipeline; fn main() { timely::example(|scope| { (0u64..10) .to_stream(scope) .unary(Pipeline, \"increment\", |capability, info| { let mut vector = Vec::new(); move |input, output| { while let Some((time, data)) = input.next() { data.swap(&mut vector); let mut session = output.session(&time); for datum in vector.drain(..) { session.give(datum + 1); } } } }) .container::>(); });\n} What is going on here? The heart of the mess is the dataflow operator unary, which is a ready-to-assemble dataflow operator with one input and one output. The unary operator takes three arguments (it looks like so many more!): (i) instructions about how it should distribute its inputs, (ii) a tasteful name, and (iii) the logic it should execute whenever timely gives it a chance to do things. Most of what is interesting lies in the closure, so let's first tidy up some loose ends before we dive in there. There are a few ways to request how input data should be distributed and Pipeline is the one that says \"don't move anything\". The string \"increment\" is utterly arbitrary; this happens to be what the operator does, but you could change it to be your name, or a naughty word, or whatever you like. The |capability| stuff should be ignored for the moment; we'll explain in just a moment (it has to do with whether you would like the ability to send data before you receive any). The heart of the logic lies in the closure that binds input and output. These two are handles respectively to the operator's input (from which it can read records) and the operator's output (to which it can send records). The input handle input has one primary method, next, which may return a pair consisting of a CapabilityRef and a batch of data. Rust really likes you to demonstrate a commitment to only looking at valid data, and our while loop does what is called deconstruction: we acknowledge the optional structure and only execute in the case the Option variant is Some, containing data. The next method could also return None, indicating that there is no more data available at the moment. It is strongly recommended that you take the hint and stop trying to read inputs at that point; timely gives you the courtesy of executing whatever code you want in this closure, but if you never release control back to the system you'll break things (timely employs \"cooperative multitasking\" ). The output handle output has one primary method, session, which starts up an output session at the indicated time. The resulting session can be given data in various ways: (i) element at a time with give, (ii) iterator at a time with give_iterator, and (iii) vector at a time with give_content. Internally it is buffering up the output and flushing automatically when the session goes out of scope, which happens above when we go around the while loop.","breadcrumbs":"Building Timely Dataflows » Creating Operators » Creating Operators","id":"33","title":"Creating Operators"},"34":{"body":"The unary method is handy if you have one input and one output. What if you want something with two inputs? Or what about zero inputs? We've still got you covered. There is a binary method which looks a lot like unary, except that it has twice as many inputs (and ways to distribute the inputs), and requires a closure accepting two inputs and one output. You still get to write arbitrary code to drive the operator around as you like. There is also a method operators::source which .. has no inputs. You can't call it on a stream, for obvious reasons, but you call it with a scope as an argument. It looks just like the other methods, except you supply a closure that just takes an output as an argument and sends whatever it wants each time it gets called. This is great for reading from external sources and moving data along as you like.","breadcrumbs":"Building Timely Dataflows » Creating Operators » Other shapes","id":"34","title":"Other shapes"},"35":{"body":"We skipped a discussion of the capability argument, and we need to dig into that now. One of timely dataflow's main features is its ability to track whether an operator may or may not in the future receive more records bearing a certain timestamp. The way that timely does this is by requiring that its operators, like the ones we have written, hold capabilities for sending data at any timestamp. A capability is an instance of the Capability