Skip to content

3.9 Parallel forEach and mapReduce

Alexander Damian edited this page May 13, 2019 · 4 revisions

Parallel forEach

Modeled after std::for_each, the forEach function in quantum extends the concept of running a unary function on each element of an array in parallel. This allows for faster processing of large batches of data and then waiting for the total results to complete.

A simple use-case scenario is shown below:

    //this example shows adding a small integer to a char and returning the result.
    std::vector<int> start{0,1,2,3,4,5,6,7,8,9};
    std::vector<char> end{'a','b','c','d','e','f','g','h','i','j'};

    //run the unary function on the array of integers
    std::vector<char> results = dispatcher.forEach<char>(start.begin(), start.size(),
        [](VoidContextPtr, int val)->char {
        return 'a'+val;
    })->get();

    //check to see that we got the expected result. In this case we generated the first 10 letters of the alphabet.
    assert(end == results);

Parallel forEach also allows to build higher level meta functions like mapReduce where the pipeline of tasks is broken down into parallel stages followed by merge stages followed by other parallel stages and so on.

forEach has two modes of operation, batched and non-batched. In the non-batched mode (shown above), a new coroutine is started for each element in the array. In the batched mode (see forEachBatch) the data array is split into equal sized chunks which are then run individually on a single coroutine per thread. This will result in greater performance since it limits the number of coroutines yet still maximizes parallelization. The batch mode is particularly useful when the unary function is very CPU-bound (i.e. non-blocking and w/o IO).

MapReduce

Quantum mapReduce and mapReduceBatch apply the map-reduce paradigm introduced by Hadoop and Spark frameworks to coroutines. Instead of running the mapper and reducer functions on a distributed system, which makes the setup complex (especially at the indexing stages), quantum's mapReduce efficiently spreads the processing on all coroutine threads and manages internally all indexing stages via future joining. mapReduce is somewhat similar to forEach in the sense that it takes two functions (a mapper and a reducer) as well as an initial array of data to be processed.

The following code is the classic textbook map-reduce example of word-length counting:

    //These are the original data vectors ingested by the mapping stage. Each vector is passed to one mapper function.
    //Note that each array could have been passed as a single value and then tokenized inside the mapper
    (i.e. "a b aa aaa ccc", etc.)
    std::vector<std::vector<std::string>> input = {
        {"a", "b", "aa", "aaa", "cccc" },
        {"bb", "bbb", "bbbb", "a", "bb"},
        {"aaa", "bb", "eee", "cccc", "d", "ddddd"},
        {"eee", "d", "a" }
    };

    std::map<std::string, size_t> result = dispatcher.mapReduce<std::string, size_t, size_t>(input.begin(), input.size(),
        //mapper function
        [](VoidContextPtr, const std::vector<std::string>& input)->std::vector<std::pair<std::string, size_t>>
        {
            std::vector<std::pair<std::string, size_t>> out;
            for (auto&& i : input) {
                out.push_back({i, 1});
            }
            return out;
        },
        //reducer function
        [](VoidContextPtr, std::pair<std::string, std::vector<size_t>>&& input)->std::pair<std::string, size_t>
        {
            size_t sum = 0;
            for (auto&& i : input.second) {
                sum += i;
            }
            return {std::move(input.first), sum};
        })->get();

    // Validate the result
    assert(result.size() == 11);
    assert(result["a"] = 3);
    assert(result["aa"] == 1);
    assert(result["aaa"] == 2);
    assert(result["b"] == 1);
    assert(result["bb"] == 3);
    assert(result["bbb"] == 1);
    assert(result["bbbb"] == 1);
    assert(result["cccc"] == 2);
    assert(result["d"] == 2);
    assert(result["ddddd"] == 1);
    assert(result["eee"] == 2);

For more examples on how to use mapReduce and mapReduceBatch please see the gtests included with this library.