-
Notifications
You must be signed in to change notification settings - Fork 95
3.3 Futures and promises
Futures and promises implemented by quantum follow as closely as possible the behavior of their STL counterparts. The user is however encouraged to read the interface definitions in order to understand the API fully. (See files under IPromise and IFuture interfaces).
quantum makes the programmer's life easier, by completely wrapping and managing the lifetime of promises and futures for each task invocation. This means that futures and promises don't have to be explicitly created (although that is also possible if desired) or disposed of when going out of scope. The simplest way to set/get a value via a promise is to use the coroutine context (callee thread) to write the value and the dispatcher context (caller thread) to read it. Similar concept applies to an IO task.
Writing a value into a promise never blocks, however reading or waiting on the associated future does. Writing a promise uses the set()
function and reading the future is done via get()
which blocks. The future can also be waited on with wait()
or waitFor()
which will block and once those return, the user can call get()
which will return the value and not block.
The following code snippet shows how to return a promised value from a coroutine. In this case, we are returning a string.
ThreadContextPtr<std::string> tctx = dispatcher.post(
[](CoroContextPtr<std::string> ctx)->int
{
return ctx->set("The quick brown fox"); //Set the value and return
});
std::string s = tctx->get(); //Block until the future is set.
//Alternatively:
tctx->wait(); //Block until the future is set
std::string s = tctx->get(); //Extract the value without blocking
Returning a promise from an asynchronous IO task is very similar to a coroutine. In this case however, the promise object is not wrapped and is passed as-is via the first parameter in the function signature. The reason for this is that it prevents IO tasks for further spawning other coroutines (or IO tasks) which could spawn others and so on, creating a very deep and often confusing call tree. As such, IO tasks are meant to perform a single operation, and return the data to the caller (thread or coroutine). If another IO operation is needed, another IO task can be scheduled.
ThreadFuturePtr<std::string> future = dispatcher.postAsyncIo(
[](ThreadPromisePtr<std::string> promise)->int
{
return promise->set("The quick brown fox"); //Set the value
});
std::string s = future->get(); //Block until the future is set.
Similarly, if the async IO is posted from within a coroutine:
dispatcher.post([](CoroContextPtr<std::string> ctx)->int
{
CoroFuturePtr<std::string> future = ctx->postAsyncIo(
[](ThreadPromisePtr<std::string> promise)->int
{
return promise->set("The quick brown fox"); //Set the value
});
return ctx->set(future->get(ctx)); //Return the value from the async IO thread
});
One difference to note is that when ctx2->get(ctx)
is called inside the coroutine, the ICoroSync interface is passed as a parameter which causes the coroutine to yield instead of blocking.
In order to pass an exception from a coroutine or IO task to the caller thread, the exception pointer must be caught via std::current_exception()
and set inside the promise as follows:
ThreadContextPtr<int> tctx = dispatcher.post([](CoroContextPtr<int> ctx)->int
{
std::exception_ptr eptr;
try
{
if (some_error_condition)
{
throw std::logic_error("The quick brown fox is slow");
}
...
}
catch (...)
{
//Get pointer to current exception
eptr = std::current_exception();
//In this case we can also call std::rethrow_exception(eptr) and let
//the scheduler catch it again.
return ctx->setException(eptr);
}
return 0; //no error
});
try
{
int number = tctx->get(); //blocks until the exception is thrown
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl; //prints "The quick brown fox is slow"
}
Everyone hates when someone makes them a promise which is not kept. In this case, if the coroutine exists without setting the promise, and the caller happens to wait on the future, an exception of type BrokenPromiseException will be thrown. The user should also check what other exceptions might be thrown by promises and futures here.
ThreadContextPtr<int> tctx = dispatcher.post([](CoroContextPtr<int> ctx)->int
{
...
return 0; //exit without setting the 'int' promise
});
try
{
std::string s = tctx->get(); //blocks until the exception is thrown
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl; //prints "Broken promise"
}
Alternatively a broken promise can happen if an exception is thrown inside the coroutine and is allowed to propagate out.
ThreadContextPtr<int> tctx = dispatcher.post([](CoroContextPtr<int> ctx)->int
{
...
function_which_throws(); //this throws and we don't catch it
return ctx->set(55); //set the promise
});
try
{
std::string s = tctx->get(); //blocks until the exception is thrown
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl; //prints "Broken promise"
}
When creating an IO task, it is not necessary to set the value inside the task itself. There are occasions when certain 3rd-party APIs might deliver the result of an operation asynchronously via a callback. In order to capture this result via the callback, you can simply pass the promise by copy. Since the promise is wrapped in a shared_ptr
its lifetime will automatically be extended.
ThreadFuturePtr<std::string> tctx = dispatcher.postAsyncIo(
[](ThreadPromisePtr<std::string> promise)->int
{
//Schedule a callback in a 3rd party library and capture promise by copy.
set_third_party_async_callback([promise]()
{
//set inside the callback when 3rd-party async operation completes.
//NOTE: This will be running in a thread outside quantum
promise->set("The quick brown fox");
});
return 0; //return immediately without setting the promise
});
std::string s = tctx->get(); //Returns when the future is set
Imagine that a coroutine needs to perform some work item and as part of that, it needs to get data from a remote source which delivers its content asynchronously (say receiving a message from a facility like Kafka or RabbitMq). This could well be retrieved via postAsyncIo()
but the problem is that the remote source might take an indeterminate amount of time and as a result a whole IO thread is now blocked waiting. In order not to block an IO thread, we can simply create a quantum::Promise<T>
outside of the coroutine and pass it in via lambda capture or as a function argument. Then the coroutine can block on it as long as it needs.
Promise<int> promise; //create a promise externally which will be set somewhere else
dispatcher.post([&promise](CoroContext<int>::Ptr ctx)->int
{
//get a future from the captured promise
CoroFuture<int>::Ptr future = promise.getICoroFuture();
int value = future->get(ctx); //get value
//do something with value...
return 0;
});
When get()
is called to retrieve a future, the value inside is moved and the future object is invalidated. This means that any subsequent calls to get()
will throw a FutureAlreadyRetrieved exception. If the user desires to access a future more than once, such as from different threads, the method getRef()
should be called instead. This method returns a constant reference to the future value and will not modify its internal state. This is a concept borrowed from std::shared_future.
One very important use-case scenario of futures is streaming values. Traditional usage requires a single value to be passed between a promise and a future, but there are many cases when multiple values need to be extracted and returned, one at a time. For example suppose an async IO task executes a database query and returns thousands of rows back. With the traditional approach, all fetched rows have to be aggregated into a single object, then returned. This takes a lot of time. With the streaming approach, each fetch operation can be returned to the caller immediately thus allowing the application to react faster and reduce delays.
For this to work, the future value must be wrapped inside a Buffer object. The Buffer futures and promises have two new methods for passing data push()
which inserts the next element into the buffer (tail), and pull()
which extracts the oldest element (head) from the buffer. When the writer thread finishes, it must call closeBuffer()
to end transmission.
The following example shows how to make use of a buffered future:
ThreadContextPtr<Buffer<int>> ctx = dispatcher.post(
[](CoroContextPtr<Buffer<int>> ctx)->int
{
for (int d = 0; d < 5; d++)
{
ctx->push(d);
ctx->yield(); //simulate some arbitrary delay
}
return ctx->closeBuffer();
});
std::vector<int> v;
while (1)
{
bool isBufferClosed = false;
int value = ctx->pull(isBufferClosed);
if (isBufferClosed) break;
v.push_back(value);
}
//'v' now contains {0, 1, 2, 3, 4}