Skip to content

Folding Data

Matt Bossenbroek edited this page Mar 5, 2014 · 2 revisions

What is folding?

Frequently when working with very large data groupings, it's useful to perform incremental and distributed aggregations of those groups.

For example, say we want to find the maximum value in a set of numbers. We don't have to collect all of the numbers on one machine to find the largest. We can first partition the data, find the max of each partition, and then find the global maximum.

We can use a similar mechanism to compute the average of a large set of numbers. We compute both the sum and count of each partition, combine those intermediate products, and finally divide to compute the average.

Where to use folding

There are three places to use a fold operation. You can reduce the entire dataset using pig/fold:

(require '[pigpen.core :as pig]
         '[pigpen.fold :as fold])

=> (->>
     (pig/return [1 2 3 4])
     (pig/fold (fold/count))
     (pig/dump))

[4]

Or apply a fold to the groupings produced by pig/group-by:

=> (->>
     (pig/return [{:foo "a"} {:foo "b"} {:foo "b"} {:foo "c"} {:foo "c"} {:foo "c"}])
     (pig/group-by :foo
       {:fold (fold/count)})
     (pig/dump)
     set)

#{["a" 1] ["b" 2] ["c" 3]}

or pig/cogroup:

(def foos (pig/return [1 2 2 3 3 3]))
(def bars (pig/return [1 1 1 2 2 3]))

=> (set (pig/dump
     (pig/cogroup [(foos :on identity, :fold (fold/sum))
                   (bars :on identity, :fold (fold/avg))]
       (fn [key f b] [key f b]))))

#{[1 1 1] [2 4 2] [3 9 3]}

In the last examples, f and b are both numerical values instead of the usual sequences. If you need to perform multiple aggregations, fold/juxt is what you need:

=> (->>
     (pig/return [{:foo 1, :bar 12}
                  {:foo 2, :bar 13}
                  {:foo 2, :bar 14}
                  {:foo 3, :bar 15}
                  {:foo 3, :bar 16}
                  {:foo 3, :bar 17}])
     (pig/group-by :foo
       {:fold (fold/juxt (fold/count)
                         (->> (fold/map :bar) (fold/avg))
                         (->> (fold/map :bar) (fold/sum)))})
     (pig/map (fn [[key [count sum avg]]] [key count sum avg]))
     (pig/dump)
     set)

#{[1 1 12 12] [2 2 27/2 27] [3 3 16 48]}

Composing fold operations

Each fold operation consists of 3 parts: pre-processing the input values, a parallel reduce, and post-processing the output value.

There are many pre-defined fold functions that can be composed together easily, but they must be composed in a specific order. It must start with preprocessor functions, then a reducer, and finally post-processing. The post-processing sequence operators can only be used with reducer functions that produce seqs. See below for which is which.

For example, you can do this:

(->> (fold/map :foo) (fold/sort) (fold/take 40))

But you cannot do this:

(->> (fold/count) (fold/map :foo))

In the first example, there's an implicit (vec) operation to reduce the values into a vector. It's actually doing this:

(->> (fold/map :foo) (fold/vec) (fold/sort) (fold/take 40))

The (vec) operation reduces into a vector, which treats the distributed sequence as a local sequence. It uses conj and concat to build the vector. This can be useful if you just want to project a single value from a larger data structure.

Pre-processing operations:

Reduce operations:

Post-processing operations:

And check out the full fold API

Custom fold functions

Use pigpen.fold/fold-fn to create a custom fold function. The pre function is a function that takes a sequence and returns a sequence. The combinef and reducef functions are used to aggregate the collection. Calling (combinef) with no args is used to produce a seed value. Then reducef is used to aggregate chunks of data into intermediate products in parallel. Finally, combinef is optionally called to combine intermediate products. An optional post function is called on the final result. If combinef is not specified, reducef is used for both. The pre and post functions default to identity.