-
Notifications
You must be signed in to change notification settings - Fork 95
3.10 Sequencing tasks
The Sequencer
class is an important addition to the quantum library as it allows applications to execute tasks that must be kept in strict FIFO order based on a certain sequenceKey
. Any number of sequences can be executed in parallel, but within each sequence only one task runs at any given time. The others are enqueued but remain blocked until the previous task in the sequence completes.
The difference with continuations, is that the sequenced tasks are theoretically isolated and unrelated from one-another. If one task in the sequence fails (or throws), the next task in the wait queue is scheduled to run irrespectively of the outcome. On the other hand, continuations represent a logical chain of work items which must be done one after the other. If one item fails, then the rest are skipped. Think for example if you have to write to a write-through cache. First you serialize the data, then you push it into the cache, then you update a database and finally you might want to log the event. If the serialization fails, there is no point in executing the cache or the permanent store updates! The sequencer comes in handy when trying to process a series of stock prices for example. You must process the stocks in the order they are received but if one fails, the application can simply skip to the next.
//Create a sequencer using size_t keys and inject the dispatcher into it
Sequencer<size_t> sequencer(dispatcher);
//Create 2 sequences of tasks (even and odd numbers with keys '0' and '1'
//respectively) and run them
std::vector<size_t> even, odd;
for (size_t i = 0; i < 10; ++i {
if (i & 1) { //odd -- key is '1'
sequencer.enqueue(1, [&odd](VoidContextPtr ctx, size_t num)->int {
odd.push_back(num);
return 0;
}, size_t(i));
}
else { //even -- key is '0'
sequencer.enqueue(0, [&even](VoidContextPtr ctx, size_t num)->int {
even.push_back(num);
return 0;
}, size_t(i));
}
}
dispatcher.drain();
//'even' now contains {0,2,4,6,8} and 'odd' contains {1,3,5,7,9}
//in this exact order. If the sequencer had not been used, the vectors
//could have contained the numbers in any order.
If the application requires sequencing tasks that need to wait on multiple keys to complete, it is perfectly possible by providing an array of sequence keys.
Imagine that in the previous example, we want to insert the value 100 in each array, after the second position. The arrays would look like this: even={0,2,100,4,6,8} and odd={1,3,100,5,7,9}
.
std::vector<size_t> even, odd;
for (size_t i = 0; i < 10; ++i {
if (i == 4) {
//Sequence a task which will run when then last task in
//both 0 and 1 sequences have completed i.e. 2 and 3 respectively
sequencer.enqueue(std::vector<size_t>{0,1}, [&](VoidContextPtr)->int {
even.push_back(100);
odd.push_back(100);
return 0;
});
}
if (i & 1) { //odd
sequencer.enqueue(1, [&odd](VoidContextPtr, size_t num)->int {
odd.push_back(num);
return 0;
}, size_t(i));
}
else { //even
sequencer.enqueue(0, [&even](VoidContextPtr, size_t num)->int {
even.push_back(num);
return 0;
}, size_t(i));
}
}
dispatcher.drain();
//'even' now contains {0,2,100,4,6,8} and 'odd' contains {1,3,100,5,7,9}
//in this exact order.
Note that it is also possible to wait for all sequences to complete before running a certain task. This task is said to have the universal sequence key (or wildcard sequence key) since it blocks on all other keys before running. The above code could be rewritten as follows by using the enqueueAll()
overload:
std::vector<size_t> even, odd;
for (size_t i = 0; i < 10; ++i {
if (i == 4) {
//Sequence a task which will run when all sequences
//(0 and 1) have completed i.e. 2 and 3 respectively
sequencer.enqueueAll([&](VoidContextPtr)->int {
even.push_back(100);
odd.push_back(100);
return 0;
});
}
if (i & 1) { //odd
sequencer.enqueue(1, [&odd](VoidContextPtr, size_t num)->int {
odd.push_back(num);
return 0;
}, size_t(i));
}
else { //even
sequencer.enqueue(0, [&even](VoidContextPtr, size_t num)->int {
even.push_back(num);
return 0;
}, size_t(i));
}
}
dispatcher.drain();
//'even' now contains {0,2,100,4,6,8} and 'odd' contains {1,3,100,5,7,9}
//in this exact order.
This example may be trivial, but if an application has thousands of sequences running in parallel, enqueueAll()
is definitely the only option as you don't need to track every individual sequenceKey. Note that any task enqueued after enqueueAll()
, will be blocked until the enqueueAll
task completes.
Since tasks enqueued by the sequencer are marshaled internally on separate queues and dispatched later, it is impossible to return a ThreadContextPtr<>
(future) when enqueuing a task since it is not generated immediately. Therefore if the task needs to return some computed value, it should be done as an out parameter or as a lambda capture (see example above where the result even
and odd
arrays were passed-in).
If an exception occurs, the exception is caught by the Sequencer and an exception handler is called (if registered by the application via SequencerConfiguration::setExceptionCallback()
). The application also has the choice to pass-in an opaque
address which will be returned as-is via the exception handler. This allows the application to track or correlate which task has failed. Following an exception, the next task in the sequence is scheduled to run.
If the application needs to wait for all enqueued tasks to complete there are to ways. Either by monitoring (polling) via Sequencer::getStatistics()
which can indicate how many tasks are still pending or by enqueuing a notification task using enqueueAll()
which will execute once all sequences are complete.
When using the Sequencer
the last expired futures from each sequence remains in the scheduler tracker map, therefore it is important to call Sequencer::trimSequenceKeys()
once in a while. Note that if the application calls enqueueAll()
this method will also trim stale futures. The internal tracker map size can also be controlled by specifying a predetermined bucket count via SequencerConfiguration::setBucketCount()
and overriding the hasher and comparator functions.
Using the Sequencer
class as-is has some drawbacks. The problem is that by default, the control coroutine queue used to schedule work items is 0, and because the quantum dispatcher can be shared with the rest of the application, any calls to enqueue()
without specifying a queue id may result in other coroutines being scheduled on queue id 0, which will slow down the Sequencer
.
The way to properly fix is as follows:
- Configure the
Sequencer
by specifying a dedicated control queue viaSequencerConfiguration::setControlQueueId()
, e.g. queue id 0. - Configure the Quantum dispatcher to use a range of queues outside 0 for tasks enqueued to the Any queue i.e. via the generic
enqueue()
method. For this setConfiguration::setCoroQueueIdRangeForAny()
to the range [1, Configuration::getNumCoroutineThreads()-1].
By doing this, you reserve the control queue id 0 for the Sequencer
only.