Skip to content

Latest commit

 

History

History
989 lines (730 loc) · 41 KB

README.md

File metadata and controls

989 lines (730 loc) · 41 KB

parallel

parallel is a library of parallel-enabled (not distributed) Clojure functions. Some are designed to emulate existing functions in the standard library, sometimes as drop-in replacement, sometimes with a very different semantic. If you see a function listed below in your project or if you use transducers, chances are you can speed-up your application using parallel. As with any library claiming to speed-up your code, there are too many variables influencing performances that cannot be tested in isolation: please keep a benchmarking tool ready and measure each of the changes.

The library also provides additional transducers (not necessarily for parallel use) and supporting utilities. The functions documented below have been tested and benchmarked and are ready to use. Please report any issue or ideas for improvements, I'll be happy to help.

Functions and macros:

Name Description
p/let Parallel let bindings.
p/args Invoke a function with arguments evaluated in parallel.
p/and Like core/and with expressions evaluated in parallel.
p/or Like core/or with arguments evaluated in parallel.
p/do Parallel do forms.
p/doto Parallel doto forms.
p/slurp Parallel slurping files.
p/count Transducer-aware parallel core/count.
p/frequencies Parallel core/frequencies
p/group-by Parallel core/group-by
p/update-vals Updates values in a map in parallel.
p/sort Parallel core/sort.
p/external-sort Memory efficient, file-based, parallel merge-sort.
p/fold Transducer-aware r/fold.
p/transduce Parallel version of transduce based on p/fold.
p/process-folder Process the files in a folder in parallel.
p/min and p/max Parallel core/min and core/max functions.
p/distinct Parallel version of core/distinct
p/pmap Like core/pmap but running on given n of threads.
p/amap Parallel array transformation.
p/armap Parallel array reversal with transformation.

Transducers:

Name Description
xf/interleave Like core/interleave, transducer version.
xf/pmap Like core/pmap, transducer version.
xf/identity Alternative identity transducer to core/identity

In the pipeline:

Name Description
p/split-by Splitting transducer based on contiguous elements.

How to use the library

All functions are available through the parallel.core namespace. Pure transducers are in parallel.xf. Add the following to your project dependencies:

[parallel "0.10"]

Require at the REPL with:

(require '[parallel.core :as p]
         '[parallel.xf :as xf])

Or in your namespace as:

(ns mynamespace
  (:require [parallel.core :as p]
            [parallel.xf :as xf]))

API Docs

p/let

p/let works like clojure.core/let but evaluates its binding expressions in parallel:

(time
  (p/let [a (Thread/sleep 1000)
          b (Thread/sleep 1000)
          c (Thread/sleep 1000)]
    (= a b c)))
;; "Elapsed time: 1002.519823 msecs"

Don't use p/let if:

  • The expressions have dependencies. p/let cannot resolve cross references between expressions and will throw exception.
  • The expressions are trivial. In this case the thread orchestration outweighs the benefits of executing in parallel. Good expressions to parallelize are for example independent networked API calls, file system calls or other non trivial computations.

p/args

p/args calls a function with arguments that are evaluated in parallel:

(time
  (p/args +
    (do (Thread/sleep 1000) 1)
    (do (Thread/sleep 1000) 2)
    (do (Thread/sleep 1000) 3)))
;; "Elapsed time: 1000.613791 msecs"
;; 6

p/args improve performances when the argument to a function requires some kind of non trivial evaluation, for example if they have side effects requiring input/output. Restrictions to the use of p/args include any dependency between the arguments (which can happen as a side effect of their evaluation).

p/and

p/and works similarly to core/and but the expressions in the body are evaluated in parallel:

(let [x 11]
  (if (p/and
        (odd? x)
        (number? x)
        (even? (count (str x))))
    "true"
    "false"))
;; true

There are a couple of important differences to consider:

  • Differently from core/and, p/and does not short-circuit. This means that even if the first expression is false, p/and is going to evaluate all other expressions. p/and could results in worse performances if the first expression is most certainly false and it evaluates faster than the others:
(time (let [x 11]
  (if (and
        (do (Thread/sleep 100) (even? x))
        (do (Thread/sleep 1000) (number? x))
        (do (Thread/sleep 1000) (even? (count (str x)))))
    "true"
    "false")))
;; "Elapsed time: 104.481973 msecs"
;; false

(time (let [x 11]
  (if (p/and
        (do (Thread/sleep 100) (even? x))
        (do (Thread/sleep 1000) (number? x))
        (do (Thread/sleep 1000) (even? (count (str x)))))
    "true"
    "false")))
;; "Elapsed time: 1001.878881 msecs"
;; false
  • You should not rely on evaluation order of the expressions. The following idiomatic use of core/and for instance, might not work with p/and:
(require '[clojure.java.io :as io])

(def file 1)

(p/and
  (instance? java.io.File file)
  (.exists file)
  (.isDirectory file))
;; IllegalArgumentException No matching field found: exists for class java.lang.Long

p/or

p/or works similarly to core/or but the expressions in the body are evaluated in parallel:

(let [x 11]
  (if (p/or
        (odd? x)
        (string? x)
        (double? x))
    "true"
    "false"))
;; true

Like p/and, p/or it does not short-circuit, potentially taking more time than sequential core/or. This happens for example when the first expression is true but p/or cannot return until all other expressions are evaluated. Also similarly to p/and, p/or should not be used if there is an implicit order between expressions (for example (let [string-length (p/or s (.length s))] string-length) might result in NullPointerException if s is nil.

p/do

p/do works like normal core/do to encapsulate evaluation of multiple forms (presumably for side effects). It returns the last evaluated form:

(def counter (atom 0))

(p/do
  (swap! counter inc)
  (println "counter incremented" @counter)
  (map inc (range 1000))
  (println "more stuff to do"))
;; counter incremented more stuff to do0

As demonstrated by the output, there is no guarantee about the order in which the forms are evaluated, so the use of p/do should be restricted to side effecting forms without an ordering requirement.

p/doto

Similarly to core/doto, p/doto threads an expression into the following forms (presumably for side effects) and returns the initial expression at the end. Threading through forms happens in parallel so:

  • Side effects can happen in any order (the forms following the threaded expression).
  • If side effects are against a shared collection, the collection has to be thread safe (or one of the Clojure ref types).

The following example uses a ConcurrentLinkedQueue to add items concurrently:

(import 'java.util.concurrent.ConcurrentLinkedQueue)

(p/doto
  (ConcurrentLinkedQueue.)
  (.add 1)
  (.add 2))

;; #object[java.util.concurrent.ConcurrentLinkedQueue 0x5fbc5177 "[1, 2]"]

Like other parallel macros, p/doto it's effective when the performed operations are not trivial. The following expression, for example, executes in 1/4 of the time:

(require '[clojure.xml :as xml])
(import 'java.util.concurrent.ConcurrentHashMap)

(defn heavy-stuff [n] (Thread/sleep 1000) n)

(time
  (doto (ConcurrentHashMap.)
    (.put :a (heavy-stuff 1))
    (.put :b (heavy-stuff 2))
    (.put :c (heavy-stuff 3))
    (.put :d (heavy-stuff 4))))
;; "Elapsed time: 4009.656834 msecs"
;; {:d 4, :b 2, :c 3, :a 1}

(time
  (p/doto (ConcurrentHashMap.)
    (.put :a (heavy-stuff 1))
    (.put :b (heavy-stuff 2))
    (.put :c (heavy-stuff 3))
    (.put :d (heavy-stuff 4))))
;; "Elapsed time: 1006.563343 msecs"
;; {:d 4, :b 2, :c 3, :a 1}

p/slurp

p/slurp loads the content of a file in parallel. Compared to core/slurp, it only supports local files (no URLs or other input streams):

(import 'java.io.File)
(take 10 (.split (p/slurp (File. "test/words")) "\n"))
;; ("A" "a" "aa" "aal" "aalii" "aam" "Aani" "aardvark" "aardwolf" "Aaron")

p/slurp offers a way to interpret the loaded byte array differently from a string, for example to load an entry from a zipped file:

(import '[java.io File ByteArrayInputStream]
        '[java.util.zip ZipFile ZipInputStream])

(defn filenames-in-zip [bytes]
  (let [z (ZipInputStream. (ByteArrayInputStream. bytes))]
    (.getName (.getNextEntry z))))

(p/slurp (File. "target/parallel-0.7.jar") filenames-in-zip)
;; "META-INF/MANIFEST.MF"

When *mutable* is set to true the transformation step is skipped altogether and the raw byte array is returned:

(import 'java.io.File)
(binding [p/*mutable* true] (p/slurp (File. "test/words")))
;; #object["[B" 0x705709a4 "[B@705709a4"]

p/slurp performs better than core/slurp on large files (> 500K). Here's for example a comparison benchmark to load a 2.4MB file:

(import 'java.io.File)
(let [fname "test/words" file (File. fname)] (bench (slurp file))) ; 8.84ms
(let [fname "test/words" file (File. fname)] (bench (p/slurp file))) ; 2.87ms

p/count

p/count can speed up counting on collections when non-trivial transformations are involved. It takes a composition of transducers and the collection to count. It applies the transducers to coll and produces a count of the resulting elements (in this case 1.2M):

(def xform
  (comp
    (filter odd?)
    (map inc)
    (map #(mod % 50))
    (mapcat range)
    (map str)))

(p/count xform (range 100000))
;; 1200000

p/count is eager, transforming "coll" into a vector if it's not already a foldable collection (vectors, maps or reducers/Cat objects). Use p/count only if the transformation are altering the number of elements in the input collection, otherwise core/count would likely outperform p/count in most situation. p/count supports stateful transducers. In this example we are dropping 6250 elements from each of the 32 chunks (32 is the default number of chunks p/count operates on, so 32x6250=200k elements will be removed):

(def xform
  (comp
    (filter odd?)
    (map inc)
    (map #(mod % 50))
    (mapcat range)
    (map str)
    (drop 6250)))

(p/count xform (range 100000))
;; 1000000

See bcount.clj for additional benchmarks.

p/frequencies

Like core/frequencies, but executes in parallel. It takes an optional composition of transducers (stateless or stateful) to apply to coll before the frequency is calculated. It does not support nil values. The following is the typical word frequencies example:

(require '[clojure.string :as s])
(def war-and-peace "http://www.gutenberg.org/files/2600/2600-0.txt")
(def book (slurp war-and-peace))
(let [freqs (p/frequencies (re-seq #"\S+" book) (map s/lower-case))]
  (take 5 (sort-by last > freqs)))
;; (["the" 34258] ["and" 21396] ["to" 16500] ["of" 14904] ["a" 10388])

(quick-bench (p/frequencies (re-seq #"\S+" book) (map s/lower-case))) ;; 111ms
(quick-bench (frequencies (map s/lower-case (re-seq #"\S+" book)))) ;; 349ms

p/group-by

p/group-by is similar to clojure.core/group-by, but the grouping happens in parallel. Here's an example about searching most frequent anagrams in a large text:

(require '[clojure.string :as s])

(def war-and-peace
  (s/split (slurp "http://gutenberg.org/files/2600/2600-0.txt") #"\W+"))

(def anagrams
  (p/group-by sort war-and-peace (map s/lower-case)))

(->> anagrams
  (map (comp distinct second))
  (sort-by count >)
  first)

;; ("stop" "post" "spot" "pots" "tops")

p/group-by takes an optional list of transducers to apply to the items in coll before generating the groups. It has been used in the example to lower-case each word. Note that differently from clojure.core/group-by:

  • The order of the items in each value vector can change between runs (this can be a problem or not, depending on your use case).
  • It does not support nil values in the input collection.

p/group-by is generally 2x-5x faster than clojure.core/group-by:

(require '[criterium.core :refer [quick-bench]])

;; with transformation (which boosts p/group-by even further)
(quick-bench (group-by sort (map s/lower-case war-and-peace)))   ;; 957ms
(quick-bench (p/group-by sort war-and-peace (map s/lower-case))) ;; 259ms

;; fair comparison without transformations
(quick-bench (group-by sort war-and-peace))   ;; 936ms
(quick-bench (p/group-by sort war-and-peace)) ;; 239ms

A further boost can be achieved by avoiding conversion back to immutable data structures:

(quick-bench
  (binding [p/*mutable* true]
    (p/group-by sort war-and-peace (map s/lower-case)))) ;; 168ms

When invoked with p/*mutable*, p/group-by returns a Java ConcurrentHashMap with ConcurrentLinkedQueue as values. They are both easy to deal with from Clojure.

(def anagrams
  (binding [p/*mutable* true]
    (p/group-by sort war-and-peace (s/lower-case))))

(distinct (into [] (.get anagrams (sort "stop"))))
;; ("post" "spot" "stop" "tops" "pots")

p/update-vals

p/'update-vals updates the values of a map in parallel. With reference to the p/group-by example of the most frequent anagrams, we could apply the step to calculate the distinct words for each key on the map in parallel ("anagrams" is the map resulting from applying p/group-by to a large text):

(first anagrams)
;; [(\a \d \e \e \h \t) ["heated" "heated" "heated" "heated" "heated" "heated" "heated" "heated"]]

(first (p/update-vals anagrams distinct))
;; [(\a \d \e \e \h \t) ("heated")]

Like other functions in the library, p/update-vals speed can be improved removing the conversation back into a mutable data structure:

(time (dorun (p/update-vals anagrams distinct)))
;; "Elapsed time: 18.462031 msecs"
(time (dorun (binding [p/*mutable* true] (p/update-vals anagrams distinct))))
;; "Elapsed time: 9.908815 msecs"

In the context of the previous computation of the most frequent anagrams, we could operate using a combination of mutable p/sort and p/update-vals and compare it with the previous solution:

(import '[java.util Map$Entry])

(defn cmp [^Map$Entry e1 ^Map$Entry e2]
  (> (count (.getValue e1))
     (count (.getValue e2))))

(time (binding [p/*mutable* true]
  (let [a (p/sort cmp (p/update-vals anagrams distinct))]
    (.getValue ^Map$Entry (aget ^objects a 0)))))
;; "Elapsed time: 128.422734 msecs"
;; ("post" "spot" "stop" "tops" "pots")

(time (->> anagrams
  (map (comp distinct second))
  (sort-by count >)
  first))
;; "Elapsed time: 251.277616 msecs"
;; ("post" "spot" "stop" "tops" "pots")

The mutable version is roughly 50% faster, but it's verbose and requires type annotations.

p/sort

p/sort is a parallel merge-sort implementation that works by splitting the input into smaller chunks which are ordered sequentially below a certain threshold (8192 is the default). p/sort offers similar features to clojure.core/sort and it's not lazy. The following uses the default comparator < to sort a collection of 2M numbers (and by comparison doing the same with core/sort):

(let [coll (shuffle (range 2e6))] (time (dorun (p/sort coll))))
;; "Elapsed time: 1335.769356 msecs"

(let [coll (shuffle (range 2e6))] (time (dorun (sort coll))))
;; "Elapsed time: 2098.151666 msecs"

Or reverse sorting strings:

(let [coll (shuffle (map str (range 2e6)))] (time (dorun (p/sort #(compare %2 %1) coll))))
;; "Elapsed time: 1954.57439 msecs"

(let [coll (shuffle (map str (range 2e6)))] (time (dorun (sort #(compare %2 %1) coll))))
;; "Elapsed time: 2540.829781 msecs"

p/sort is implemented on top of mutable native arrays, converting both input/output into immutable vectors as a default. There are a few ways to speed-up sorting with p/sort:

  • Vector inputs are preferable than sequences.
  • Shave additional milliseconds by using the raw array output, by enclosing p/sort in a binding like (binding [p/*mutable* true] (p/sort coll)). p/sort returns an object array in this case, instead of a vector.
  • If you happen to be working natively with arrays, be sure to feed p/sort with the native array to avoid conversion.

In order of increasing speed:

(require '[criterium.core :refer [quick-bench]])

(let [c (into [] (shuffle (range 2e6)))
      a (to-array c)]
  (quick-bench (p/sort c))
  (quick-bench (binding [p/*mutable* true] (p/sort c)))
  (quick-bench (binding [p/*mutable* true] (p/sort a))))

;; 1185ms
;; 1052ms
;; 46ms

As you can see, the conversion into array is responsible for most of the sorting time. If you are lucky to work with arrays, sorting is one order of magnitude faster and more memory efficient.

p/external-sort

merge-sort is a well known example of parallelizable sorting algorithm. There was a time when machines were forced to use tapes to process large amount of data, loading smaller chunks into memory one at a time. The merge-sort sorting algorithm for example, is suitable for this kind of processing. Today we have bigger memories, but also big-data. File-based merge-sort implementations could still be useful to work with external storage such as S3.

p/external-sort can be used to fetch large amount of data from slow storage, order them by some attribute and consume only the part that is actually needed (e.g. "find the top most" kind of problems). A working but not very useful p/external-sort example is the following:

(let [fetchf (fn [ids] ids)
      v (into [] (reverse (range 10000)))]
  (take 5 (p/external-sort 1000 compare fetchf v)))
;; [0 1 2 3 4]

p/external-sort accepts a vector "v" of IDs as input. The unique identifiers are used to fetch data objects from remote storage. "fetchf" is the way to tell p/external-sort how to retrieve the object given a group of ids (in this example, fetching the id has been simulated by returning the ids themselves). Input IDs are split into chunks not bigger than "1000" (with 512 the default).

Once all data is retrieved for a chunk, data are sorted using the given comparator ("core/compare" by default) and the result is stored in a temporary file on disk. The above example creates 16 files, as the number of files needs to be a power of two and (/ 10000 16) = 625 is the first split that generates chunk less than 1000 in size.

Once all chunks are retrieved and sorted on disk, the result is available as a lazy sequence, which is the type returned by p/external-sort. If the lazy sequence is not fully consumed, the related files are never loaded in memory. In the example above, some files are never loaded in memory. A call to last (instead of take 5) would load all files. If the head of the sequence is not retained, the content of the files is garbage collected from memory accordingly.

The next example verifies these assumptions with a large dataset of around 20M played songs. Each song contains userid, track title, time it was played and other information. We want to print the most recently played songs but we can't load the 2.5 GB file in memory to sort it without blowing the heap (on a normal laptop).

You can download the dataset from this page: http://www.dtic.upf.edu/~ocelma/MusicRecommendationDataset/lastfm-1K.html. We are then going to split the file on disk into smaller but still unordered files with:

split -a 4 -l 18702 userid-timestamp-artid-artname-traid-traname.tsv
num=0; for i in *; do mv "$i" "$num"; ((num++)); done

The big tsv file contains exactly 19150868 played songs. We pick a split size that is the closest to (Math/pow 2 10), which creates 1024 files of a reasonable size (18702 lines) plus a last one containing the remaining 20. We also rename the files using an incremental, so we can quickly know which file contains what. You should now have a folder with 1025 files named 0 to 1024 (no extension). Here's how to use p/external-sort to retrieve the top 3 most recently played tracks:

(require '[clojure.string :as s])

(let [lines 19150868
      chunk-size 18702
      chunk-folder "../resources/lastfm-dataset-1K/splits/"
      fetchf (fn [ids]
               (->> (quot (last ids) chunk-size)
                    (str chunk-folder)
                    slurp
                    s/split-lines
                    (mapv #(s/split % #"\t"))))]
  (pprint (time (take 3
    (p/external-sort
      chunk-size
      #(compare (nth %2 1) (nth %1 1))
      fetchf
      (range lines))))))

The degree of parallelism with which "fetchf" is invoked is equal to the number of cores (physical or virtual) available on the running system. If the collection of IDs is a not a vector, it is converted into one. fetchf is provided a group of ids and we can calculate which file contains those IDs because we know their name and size. The custom comparator uses the timestamp found at index 1 after each line is split by tabs (the format of the file). After about 1 minute (my machine) we get:

(["user_000762"
  "2013-09-29T18:32:04Z"
  "d8354b38-e942-4c89-ba93-29323432abc3"
  "30 Seconds To Mars"
  "b5b40605-5a81-46b4-a51e-2b1ec7964c1a"
  "A Beautiful Lie"]
 ["user_000762"
  "2009-05-02T02:01:47Z"
  "91f7a868-d82e-4cfb-9cd9-a2ffd7faac25"
  "The Cab"
  "7ede8578-bf9c-4e68-a060-56924202cdf0"
  "This City Is Contagious"]
 ["user_000762"
  "2009-05-02T01:58:09Z"
  "91f7a868-d82e-4cfb-9cd9-a2ffd7faac25"
  "The Cab"
  "14298942-7452-444f-9fb7-3199464957d6"
  "Can You Keep A Secret?"])

By taking more results instead of just the top 3, more files will need to load into memory. If you don't hold on the head of the sequence, you can any other part of the ordered sequence including the last element without incurring into an out of memory (about 2 minutes later in my machine).

p/fold, p/xrf and p/folder

p/fold is modeled similar to clojure.core.reducers/fold function, the entry point into the Clojure reduce-combine (Java fork-join) parallel computation framework. It can be used with transducers like you would with normal r/fold:

(def v (vec (range 1000)))
(p/fold + ((comp (map inc) (filter odd?)) +) v)
;; 250000

And exactly like with normal r/fold this would give you inconsistent results when a stateful transducer like (drop 1) is introduced:

(distinct (for [i (range 1000)]
  (p/fold + ((comp (map inc) (drop 1) (filter odd?)) +) v)))
;; (249999 249498 249499)

This is what p/xrf is designed for. p/xrf is a wrapping utility that hides the way the transducers are combined with the reducing function. More importantly, it takes care of the potential presence of stateful transducers in the chain (like drop, take, partition and so on).

(distinct (for [i (range 1000)]
  (p/fold (p/xrf + (map inc) (drop 1) (filter odd?)) v)))
;; (242240)

p/xrf makes sure that stateful transducer state is allocated at each chunk instead of each thread (the "chunk" is the portion of the initial collection that is not subject to any further splitting). This is a drastic departure from the semantic of the same transducers when used sequentially on the whole input. The first practical implication is that operations like take, drop, partition etc. are isolated in their own chunk and don't see each other state (for example, (drop 1) would remove the first element from each chunk, not just the first element from the whole input). The second consequence is that the result is now dependent (consistently) on the number of chunks.

To enable easier design of parallel algorithms, you can pass p/fold a number "n" of desired chunks for the parallel computation (n has to be a power of 2 and it defaults to 32 by default). Note the difference: with (r/fold) the computation is chunk-size driven by "n", the desired chunk size (default to 512). With (p/fold) the computation is chunk-number driven by "n" the number of desired chunks to compute in parallel:

(p/fold 4 + (p/xrf + (map inc) (drop 1) (filter odd?)) v)
;; 248496

Assuming there are 4 cores available, the example above executes on 4 parallel threads. Let's dissect it chunk by chunk:

  • We are asking (p/fold) to create 4 chunks of the initial vector "v" of 1000 elements. Each chunk ends up having 250 items.
  • The content of each chunk can be expressed by the following ranges (the actual type is a subvec not a range but the content it the same): (range 0 250), (range 250 500), (range 500 750), (range 750 1000)
  • Transducers transform each chunk (composition reads backward like normal transducers): (filter odd? (drop 1 (map inc (range 0 250)))), (filter odd? (drop 1 (map inc (range 250 500)))), (filter odd? (drop 1 (map inc (range 500 750)))), (filter odd? (drop 1 (map inc (range 750 1000))))
  • The reducing function "+" is applied on the items on each chunk: 15624, 46624, 77624, 108624
  • The combining function is again "+", resulting in the final sums: (+ (+ 15624 46624) (+ 77624 108624)) which is 248496.

It can be tricky for arbitrary collection sizes to see what is the best strategy in terms of chunk size or number. The utility function p/show-chunks can be used to predict the splitting for a parallel calculation. p/fold parameters can be adjusted accordingly. Here's what happens if you have a vector of 9629 items and you'd like 8 chunks to be created. Some of them will be bigger, other will be smaller:

(p/show-chunks (vec (range 9629)) 8)
;; (1203 1204 1203 1204 1203 1204 1204 1204)

p/fold also allows transducers on hash-maps, not just vectors. A hash-map can be folded with transducers (in parallel) like this:

(require '[clojure.core.reducers :refer [monoid]])
(def input (zipmap (range 10000) (range 10000)))

(def output
 (p/fold
  (monoid merge (constantly {}))
  (p/xrf conj
    (filter (fn [[k v]] (even? k)))
    (map (fn [[k v]] [k (inc v)]))
    (map (fn [[k v]] [(str k) v])))
  input))
(output "664")
;; 665

The single argument for transducers is a vector pair containing a key and a value. In this case each transducer returns another pair to build another map (but that's not required).

Caveats and known problems:

  • Stateful transducers like dedupe and distinct, that operates correctly at the chunk level, can bring back duplicates once combined back into the final result. Keep that in mind if absolute uniqueness is a requirement, you might need an additional step outside p/fold to ensure final elimination of duplicates. I'm thinking what else can be done to avoid the problem in the meanwhile.

p/transduce

p/transduce is a parallel version of the same function present in core:

(p/transduce (comp (filter odd?) (map inc)) + (vec (range 1000)))
;; 250500

Similarly to p/fold, you can use stateful transducers with p/transduce. When you do, it's better to design your computation around the number of chunks that are processed in parallel. p/transduce accepts the number of desired chunks and an additional "combinef" to know how to merge chunks back together.

The example below takes 1000 items and operates in 4 parallel chunks of 250 each, dropping 240 items each chunk, and partitioning the remaining 10 into groups of 5. The results from each parallel thread is combined back with into:

(p/transduce 4 (comp (drop 240) (partition-all 5)) conj into (vec (range 1000)))
;; [[240 241 242 243 244]
;;  [245 246 247 248 249]
;;  [490 491 492 493 494]
;;  [495 496 497 498 499]
;;  [740 741 742 743 744]
;;  [745 746 747 748 749]
;;  [990 991 992 993 994]
;;  [995 996 997 998 999]]

The equivalent operation attempted on reducers/fold would give inconsistent results (the result is different each run or throws exception because the state in statful transducers is shared across concurrent threads):

(require '[clojure.core.reducers :as r])

(r/fold
  250
  (r/monoid into conj)
  ((comp (drop 240) (partition-all 5)) conj)
  (vec (range 1000)))
;; Sometimes ArrayOutOfBound, sometimes a bunch of random partitions.

p/process-folder

p/process-folder applies a composition of transducers to all files in a folder in parallel. The first transducer in the pipeline should expect a line of text. You can use something like split -l 10000 -a 4 <filename> segment- to split a large files into many smaller ones of 10k lines each. After you move them in a folder (please be sure it contains only the files that need processing) you're good to go, for example:

(p/process-folder
  "folder-name-as-string"
  (comp (map s/trim)
        (remove s/blank?)
        (map #(s/split % #"\,"))
        (map peek)))

The snippet above takes the last value for each line of each CSV file in a folder. p/process-folder is eager: if the files are many or lines are big, there is nothing p/process-folder can do to avoid out of memory. Try to compose your transducers so they process and aggregate data as needed returning a result that can fit into memory.

p/min and p/max

p/min and p/max find the minimum or maximum in a vector of numbers in parallel (the input collection is converted into a vector if it's not already):

(let [c (shuffle (conj (range 100000) -9))]
  (p/min c))
;; -9

They also allow any combination of transducers (stateless or stateful) to be passed in as arguments:

(let [c (into [] (range 100000))]
  (p/min c
    (map dec)
    (drop 20)
    (partition-all 30)
    (map last)
    (filter odd?))) ;; 3173

p/min and p/max outperform sequential core/min and core/max starting at 10k items and up (depending on hardware configuration). For a 4 cores machine, the speed increase is roughly 50%:

(require '[criterium.core :refer [bench]])
(require '[parallel.core :as p])

(def 1M (shuffle (range 1000000)))

(bench (reduce min 1M)) ;; 9.963971 ms
(bench (p/min 1M))      ;; 5.474384 ms

(bench (transduce (comp (map inc) (filter odd?)) min ##Inf 1M)) ;; 22.701385 ms
(bench (p/min 1M (map inc) (filter odd?)))                      ;; 12.085497 ms

p/distinct

p/distinct returns a sequence of the distinct items in "coll":

(let [c (apply concat (repeat 20 (range 100)))]
  (take 10 (p/distinct c)))
;; (0 1 2 3 4 5 6 7 8 9)

The sequence is not-lazy and can return in any order. We can see this by supplying a transducer list (without using comp) to change from integers to keywords:

(let [c (apply concat (repeat 20 (range 100)))]
  (take 10 (p/distinct c (map str) (map keyword))))
;; (:59 :16 :39 :47 :28 :58 :36 :15 :25 :18)

p/distinct does not support nil, which needs to be removed (you can pass (remove nil?) as a transducer to the argument list). Performance of p/distinct are quite good on both small and large collections:

(require '[criterium.core :refer [quick-bench]])

(let [small (apply concat (repeat 20 (range 100)))
      large (apply concat (repeat 200 (range 10000)))]
  (quick-bench (p/distinct small))
  (quick-bench (p/distinct large)))
;; Execution time mean : 160.949448 µs
;; Execution time mean : 77.772233 ms

(let [small (apply concat (repeat 20 (range 100)))
      large (apply concat (repeat 200 (range 10000)))]
  (quick-bench (doall (distinct small)))
  (quick-bench (doall (distinct large))))
;; Execution time mean : 565.503835 µs
;; Execution time mean : 862.702828 ms

You can additionally increase p/distinct speed by using a vector input and forcing mutable output (in this case p/distinct returns an java.util.Set interface):

(let [large (into [] (apply concat (repeat 200 (range 10000))))]
  (quick-bench (binding [p/*mutable* true] (p/distinct large))))
;; Execution time mean : 37.703288 ms

p/pmap

p/pmap has a similar interface as core/pmap:

(p/pmap inc (range 10))
;; [1 3 2 6 4 5 7 8 10 9]

But as you can see the output is a vector of results in any order. Additionally p/pmap differs from core/pmap in the following:

  • It executes on n parallel threads (default 100) independently from the input collection chunk size or the number of available cores.
  • It is not lazy.
  • It does not support multiple collections as input.

p/pmap is useful when you want to control the amount of parallelism executing the same task over a collection of inputs. If you are making requests to a highly scalable service, for example, you could take advantage of the higher level of parallelism of p/pmap compared to core/pmap throwing up to 100 (or more) threads at the problem (instead of core/pmap which is bound to the chunk size 32, plus the number of cores, plus 2). To change the number of threads, you can use the optional "n" parameter, for example setting it to 200 threads. In the following example, up to 200 threads are working on "heavyf":

(defn heavyf [x] (Thread/sleep 500) (inc x))

(time (count (p/pmap heavyf (range 1000) 200)))
;; "Elapsed time: 2552.601996 msecs"

(time (count (pmap heavyf (range 1000))))
;; "Elapsed time: 16115.643296 msecs"

p/amap

p/amap is a parallel version of core/amap. It takes an array of objects and a transformation "f" and it mutates the input to produce the transformed version of the output:

(def c (range 2e6))
(defn f [x] (if (zero? (rem x 2)) (* 0.3 x) (Math/sqrt x)))

(let [a (to-array c)] (time (p/amap f a)))
;; "Elapsed time: 34.955138 msecs"

(let [^objects a (to-array c)] (time (amap a idx ret (f (aget a idx)))))
;; "Elapsed time: 53.058256 msecs"

p/amap uses the fork-join framework to update the array in parallel and it performs better than sequential for non-trivial transformations, otherwise the thread orchestration dominates the computational cost. You can optionally pass in a "threshold" which indicates how small the chunk of computation should be before going sequential, otherwise the number is chosen to be (/ alength (* 2 ncores)).

p/armap

p/armap is similar to p/amap but it also inverts the array. It takes an array of objects and a transformation "f" and it mutates the input to produce the transformed-reverse version of the output.

(let [a (object-array [0 9 8 2 0 9 2 2 90 1 2])]
  (p/armap (comp keyword str) a)
  (into [] a))
;; [:2 :1 :90 :2 :2 :9 :0 :2 :8]

p/armap performs better than sequential for non-trivial transformations, otherwise the thread orchestration dominates the computational cost. Here's for example a reverse-complement of some random DNA strand:

(require '[criterium.core :refer [quick-bench]])

(defn random-dna [n] (repeatedly n #(rand-nth [\a \c \g \t])))
(def compl {\a \t \t \a \c \g \g \c})

(defn armap
  "A fair sequential comparison"
  [f ^objects a]
  (loop [i 0]
    (when (< i (quot (alength a) 2))
      (let [tmp (f (aget a i))
            j (- (alength a) i 1)]
        (aset a i (f (aget a j)))
        (aset a j tmp))
      (recur (unchecked-inc i)))))

(let [a (to-array (random-dna 1e6))]
  (quick-bench (p/armap compl a)))
;; "Elapsed time: 39.195143 msecs"

(let [a (to-array (random-dna 1e6))]
  (quick-bench (armap compl a)))
;; "Elapsed time: 70.286622 msecs"

You can optionally pass in a "threshold" which indicates how small the chunk of computation should be before going sequential, otherwise the number is chosen to be (/ alength (* 2 ncores)).

xf/interleave

Like clojure.core/interleave but in transducer version. When xf/interleave is instantiated, it takes a "filler" collection. The items from the collection are used to interleave the others items coming from the main transducing sequence:

(sequence
  (comp
    (map inc)
    (xf/interleave [100 101 102 103 104 105])
    (filter odd?)
    (map str))
  [3 6 9 12 15 18 21 24 37 30])
;; ("7" "101" "13" "103" "19" "105")

The main transducing process runs until there are items in the filler sequence (those starting at 100 in the example). You can provide an infinite sequence to be sure all results are interleaved:

(sequence
  (comp
    (map inc)
    (xf/interleave (range))
    (filter odd?)
    (map str))
  [3 6 9 12 15 18 21 24 37 30])
;; ("7" "1" "13" "3" "19" "5" "25" "7" "31" "9")

xf/pmap

xf/pmap is a transducer version of core/pmap. When added to a transducer chain, it works like the colojure.core/map transducer applying the function "f" to all the items passing through the transducer. Different from clojure.core/map, xf/pmap processes a fixed number items in parallel (competing for the actual number of physical cores). So if you have 12 cores and you're transducing a Clojure collection (big majority of them have a chunk size of 32), then you can achieve a max of 12+32+2 threads working in parallel:

(defn heavyf [x] (Thread/sleep 1000) (inc x))

(time (transduce (comp (map heavyf) (filter odd?)) + (range 10)))
;; 10025ms
(time (transduce (comp (xf/pmap heavyf) (filter odd?)) + (range 10)))
;; 1006ms

xf/pmap has similar limitations to clojure.core/pmap. It works great when "f" is non trivial and the average elapsed of "f" is uniform across the input. If one (f item) takes much more than the others, the current N-chunk is kept busy with parallelism=1 before moving to the next chunk, wasting resources. Use xf/pmap if your transducing transformation is reasonably big and complex. Apart from transduce you can use it with sequence:

(time (doall (pmap heavyf (range 10))))
;; "Elapsed time: 1005.330409 msecs"

(time (doall (sequence (xf/pmap heavyf) (range 10))))
;; "Elapsed time: 1002.868326 msecs"

xf/identity

xf/identity works similarly to (map identity) or just identity as identity transducer:

(sequence (map identity) (range 10))
(sequence clojure.core/identity (range 10))
(sequence xf/identity (range 10))
;; All printing (0 1 2 3 4 5 6 7 8 9)

The identity transducer works as a placeholder for those cases in which a transformation is not requested, for example:

(def config false)

(defn build-massive-xform []
  (when config
    (comp (map inc) (filter odd?))))

(sequence (or (build-massive-xform) identity) (range 5))
;; (0 1 2 3 4)

core/identity works fine as a transducer in most cases, except when it comes to multiple inputs, for which it requires a definition of what "identity" means. We could for example agree that if you want to use core/identity with multiple inputs you need to use it in pair with another transducer, for example (map list):

(sequence (or (build-massive-xform) identity) (range 5) (range 5))
;; Throws exception

(sequence (or (build-massive-xform) (comp (map list) identity)) (range 5) (range 5))
;; ((0 0) (1 1) (2 2) (3 3) (4 4))

xf/identity is a simple transducer that takes care of of this case, assuming "identity" means "wrap around" in case of multiple inputs:

(sequence (or (build-massive-xform) xf/identity) (range 5))
;; (0 1 2 3 4)

(sequence (or (build-massive-xform) xf/identity) (range 5) (range 5))
;; ((0 0) (1 1) (2 2) (3 3) (4 4))

(sequence (or (build-massive-xform) xf/identity) (range 5) (range 5) (range 5))
;; ((0 0 0) (1 1 1) (2 2 2) (3 3 3) (4 4 4))

xf/identity custom transducer compared to (comp (map list) identity) has also positive effects on performances:

(let [items (range 10000)
      xform (comp (map list) identity)]
  (quick-bench
    (dorun
      (sequence xform items items))))
;; 4.09ms

(let [items (range 10000)]
  (quick-bench
    (dorun
      (sequence xf/identity items items))))
;; 2.67ms

Development

There are no dependencies other than Java and Clojure.

  • lein test to run the test suite.

misc todo

  • p/fold Enable extend to (thread-safe) Java collections
  • p/fold Enable extend on Cat objects
  • p/fold operates on a group of keys for hash-maps.
  • A foldable reader of some sort for large files.

License

Copyright © 2018 Renzo Borgatti @reborg http://reborg.net Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.