Skip to content

Commit

Permalink
Async: Split runOnce into submitRequests, blockingPoll and dispatchCo…
Browse files Browse the repository at this point in the history
…mpletions

This allows easy integration with third party event loops, including GUI event Loops.
It allows polling in a secondary thread and submitting or running completions on the third party event loop (or GUI) thread.
  • Loading branch information
Pagghiu committed May 19, 2024
1 parent 138d1d8 commit 0526b5c
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 39 deletions.
13 changes: 11 additions & 2 deletions Documentation/Libraries/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,24 @@ This is usable but needs some more testing and a few more features.

### Run modes

Event loop can be run in different ways to allow integrated it in multiple ways in applications that may already have an existing event loop (example GUI applications).
Event loop can be run in different ways to allow integrated it in multiple ways in applications.

| Run mode | Description |
|:------------------------------|:------------------------------------------|
| SC::AsyncEventLoop::run | @copydoc SC::AsyncEventLoop::run |
| SC::AsyncEventLoop::runOnce | @copydoc SC::AsyncEventLoop::runOnce |
| SC::AsyncEventLoop::runNoWait | @copydoc SC::AsyncEventLoop::runNoWait |


Alternatively user can explicitly use three methods to submit, poll and dispatch events.
This is very useful to integrate the event loop into applications with other event loops (for example GUI applications).

| Run mode | Description |
|:------------------------------------------|:--------------------------------------------------|
| SC::AsyncEventLoop::submitRequests | @copydoc SC::AsyncEventLoop::submitRequests |
| SC::AsyncEventLoop::blockingPoll | @copydoc SC::AsyncEventLoop::blockingPoll |
| SC::AsyncEventLoop::dispatchCompletions | @copydoc SC::AsyncEventLoop::dispatchCompletions |

## AsyncLoopTimeout
@copydoc SC::AsyncLoopTimeout

Expand Down Expand Up @@ -105,7 +115,6 @@ SC::ArenaMap from the [Containers](@ref library_containers) can be used to preal
# Roadmap

🟩 Usable Features:
- Implement option to do blocking poll check without dispatching callbacks (needed for efficient gui event loop integration)
- More comprehensive test suite, testing all cancellations
- FS operations (open stat read write unlink copyfile mkdir chmod etc.)
- UDP Send/Receive
Expand Down
62 changes: 56 additions & 6 deletions Libraries/Async/Async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ SC::Result SC::AsyncEventLoop::run()
return SC::Result(true);
}

SC::Result SC::AsyncEventLoop::submitRequests(AsyncKernelEvents& kernelEvents)
{
return internal.submitRequests(kernelEvents);
}

SC::Result SC::AsyncEventLoop::blockingPoll(AsyncKernelEvents& kernelEvents)
{
return internal.blockingPoll(Internal::SyncMode::ForcedForwardProgress, kernelEvents);
}

SC::Result SC::AsyncEventLoop::dispatchCompletions(AsyncKernelEvents& kernelEvents)
{
return internal.dispatchCompletions(Internal::SyncMode::ForcedForwardProgress, kernelEvents);
}

template <>
void SC::AsyncEventLoop::InternalOpaque::construct(Handle& buffer)
{
Expand Down Expand Up @@ -369,14 +384,14 @@ SC::AsyncLoopTimeout* SC::AsyncEventLoop::Internal::findEarliestLoopTimeout() co
return earliestTime;
}

void SC::AsyncEventLoop::Internal::invokeExpiredTimers(AsyncLoopTimeout& timeout)
void SC::AsyncEventLoop::Internal::invokeExpiredTimers(Time::HighResolutionCounter currentTime)
{
AsyncLoopTimeout* async;
for (async = activeLoopTimeouts.front; //
async != nullptr; //
async = static_cast<AsyncLoopTimeout*>(async->next))
{
if (timeout.expirationTime.isLaterThanOrEqualTo(async->expirationTime))
if (currentTime.isLaterThanOrEqualTo(async->expirationTime))
{
removeActiveHandle(*async);
AsyncLoopTimeout::Result result(*async, Result(true));
Expand Down Expand Up @@ -533,7 +548,20 @@ SC::Result SC::AsyncEventLoop::Internal::completeAndEventuallyReactivate(KernelE

SC::Result SC::AsyncEventLoop::Internal::runStep(SyncMode syncMode)
{
KernelEvents kernelEvents(loop->internal.kernelQueue.get());
alignas(uint64_t) uint8_t buffer[8 * 1024]; // 8 Kb of kernel events
AsyncKernelEvents kernelEvents;
kernelEvents.eventsMemory = buffer;
SC_TRY(submitRequests(kernelEvents));
SC_TRY(blockingPoll(syncMode, kernelEvents));
return dispatchCompletions(syncMode, kernelEvents);
}

SC::Result SC::AsyncEventLoop::Internal::submitRequests(AsyncKernelEvents& asyncKernelEvents)
{
KernelEvents kernelEvents(loop->internal.kernelQueue.get(), asyncKernelEvents);
asyncKernelEvents.numberOfEvents = 0;
// TODO: Check if it's possible to avoid zeroing kernel events memory
memset(asyncKernelEvents.eventsMemory.data(), 0, asyncKernelEvents.eventsMemory.sizeInBytes());
SC_LOG_MESSAGE("---------------\n");

while (AsyncRequest* async = submissions.dequeueFront())
Expand All @@ -545,6 +573,12 @@ SC::Result SC::AsyncEventLoop::Internal::runStep(SyncMode syncMode)
}
}

return SC::Result(true);
}

SC::Result SC::AsyncEventLoop::Internal::blockingPoll(SyncMode syncMode, AsyncKernelEvents& asyncKernelEvents)
{
KernelEvents kernelEvents(loop->internal.kernelQueue.get(), asyncKernelEvents);
if (getTotalNumberOfActiveHandle() <= 0 and numberOfManualCompletions == 0)
{
// happens when we do cancelAsync on the last active async for example
Expand All @@ -558,11 +592,27 @@ SC::Result SC::AsyncEventLoop::Internal::runStep(SyncMode syncMode)
SC_TRY(kernelEvents.syncWithKernel(*loop, syncMode));
SC_LOG_MESSAGE("Active Requests After Poll = {}\n", getTotalNumberOfActiveHandle());
}
return SC::Result(true);
}

if (expiredTimer)
SC::Result SC::AsyncEventLoop::Internal::dispatchCompletions(SyncMode syncMode, AsyncKernelEvents& asyncKernelEvents)
{
KernelEvents kernelEvents(loop->internal.kernelQueue.get(), asyncKernelEvents);
switch (syncMode)
{
invokeExpiredTimers(*expiredTimer);
expiredTimer = nullptr;
case SyncMode::NoWait: {
updateTime();
invokeExpiredTimers(loopTime);
}
break;
case SyncMode::ForcedForwardProgress: {
if (expiredTimer)
{
invokeExpiredTimers(expiredTimer->expirationTime);
expiredTimer = nullptr;
}
}
break;
}
runStepExecuteCompletions(kernelEvents);
runStepExecuteManualCompletions(kernelEvents);
Expand Down
67 changes: 59 additions & 8 deletions Libraries/Async/Async.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct ThreadPool;
struct ThreadPoolTask;

struct EventObject;
struct AsyncKernelEvents;
struct AsyncEventLoop;

struct AsyncRequest;
Expand Down Expand Up @@ -848,6 +849,18 @@ struct AsyncFilePoll : public AsyncRequest

} // namespace SC

/// @brief Allows user to supply a block of memory that will store kernel I/O events retrieved from
/// AsyncEventLoop::runOnce. Such events can then be later passed to AsyncEventLoop::dispatchCompletions.
/// @see AsyncEventLoop::runOnce
struct SC::AsyncKernelEvents
{
Span<uint8_t> eventsMemory; ///< User supplied block of memory used to store kernel I/O events

private:
int numberOfEvents = 0;
friend struct AsyncEventLoop;
};

/// @brief Asynchronous I/O (files, sockets, timers, processes, fs events, threads wake-up) (see @ref library_async)
/// AsyncEventLoop pushes all AsyncRequest derived classes to I/O queues in the OS.
/// Basic lifetime for an event loop is:
Expand Down Expand Up @@ -876,25 +889,63 @@ struct SC::AsyncEventLoop
/// Closes the event loop kernel object
[[nodiscard]] Result close();

/// Blocks until there are no more active queued requests.
/// Blocks until there are no more active queued requests, dispatching all completions.
/// It's useful for applications where the eventLoop is the only (or the main) loop.
/// One example could be a console based app doing socket IO or a web server.
/// Waiting on requests blocks the current thread with 0% CPU utilization.
/// Waiting on kernel events blocks the current thread with 0% CPU utilization.
/// @see AsyncEventLoop::blockingPoll to integrate the loop with a GUI event loop
[[nodiscard]] Result run();

/// Blocks until at least one request proceeds, ensuring forward progress.
/// It's useful for applications where the eventLoop events needs to be interleaved with other work.
/// For example one possible way of integrating with a UI event loop could be to schedule a recurrent timeout
/// timer every 1/60 seconds where calling GUI event loop updates every 60 seconds, blocking for I/O for
/// the remaining time. Waiting on requests blocks the current thread with 0% CPU utilization.
/// Blocks until at least one request proceeds, ensuring forward progress, dispatching all completions.
/// It's useful for application where it's needed to run some idle work after every IO event.
/// Waiting on requests blocks the current thread with 0% CPU utilization.
///
/// This function is a shortcut invoking async event loop building blocks:
/// - AsyncEventLoop::submitRequests
/// - AsyncEventLoop::blockingPoll
/// - AsyncEventLoop::dispatchCompletions
/// @see AsyncEventLoop::blockingPoll for a description on how to integrate AsyncEventLoop with another event loop
[[nodiscard]] Result runOnce();

/// Process active requests if they exist or returns immediately without blocking.
/// Process active requests if any, dispatching their completions, or returns immediately without blocking.
/// It's useful for game-like applications where the event loop runs every frame and one would like to check
/// and dispatch its I/O callbacks in-between frames.
/// This call allows poll-checking I/O without blocking.
/// @see AsyncEventLoop::blockingPoll to integrate the loop with a GUI event loop
[[nodiscard]] Result runNoWait();

/// Submits all queued async requests.
/// An AsyncRequest becomes queued after user calls its specific AsyncRequest::start method.
///
/// @see AsyncEventLoop::blockingPoll for a description on how to integrate AsyncEventLoop with another event loop
[[nodiscard]] Result submitRequests(AsyncKernelEvents& kernelEvents);

/// Blocks until at least one event happens, ensuring forward progress, without executing completions.
/// It's one of the three building blocks of AsyncEventLoop::runOnce allowing co-operation of AsyncEventLoop
/// within another event loop (for example a GUI event loop or another IO event loop).
///
/// One possible example of such integration with a GUI event loop could:
///
/// - Call AsyncEventLoop::submitRequests on the GUI thread to queue some requests
/// - Call AsyncEventLoop::blockingPoll on a secondary thread, storying AsyncKernelEvents
/// - Wake up the GUI event loop from the secondary thread after AsyncEventLoop::blockingPoll returns
/// - Call AsyncEventLoop:dispatchCompletions on the GUI event loop to dispatch callbacks on GUI thread
/// - Repeat all steps
///
/// Waiting on requests blocks the current thread with 0% CPU utilization.
/// @param kernelEvents Mandatory parameter to store kernel IO events WITHOUT running their completions.
/// In that case user is expected to run completions passing it to AsyncEventLoop::dispatchCompletions.
/// @see AsyncEventLoop::submitRequests sends async requests to kernel before calling blockingPoll
/// @see AsyncEventLoop::dispatchCompletions invokes callbacks associated with kernel events after blockingPoll
[[nodiscard]] Result blockingPoll(AsyncKernelEvents& kernelEvents);

/// Invokes completions for the AsyncKernelEvents collected by a call to AsyncEventLoop::blockingPoll.
/// This is typically done when user wants to pool for events on a thread (calling AsyncEventLoop::blockingPoll)
/// and dispatch the callbacks on another thread (calling AsyncEventLoop::dispatchCompletions).
/// The typical example would be integrating AsyncEventLoop with a GUI event loop.
/// @see AsyncEventLoop::blockingPoll for a description on how to integrate AsyncEventLoop with another event loop
[[nodiscard]] Result dispatchCompletions(AsyncKernelEvents& kernelEvents);

/// Wake up the event loop from a thread different than the one where run() is called (and potentially blocked).
/// The parameter is an AsyncLoopWakeUp that must have been previously started (with AsyncLoopWakeUp::start).
[[nodiscard]] Result wakeUpFromExternalThread(AsyncLoopWakeUp& wakeUp);
Expand Down
6 changes: 5 additions & 1 deletion Libraries/Async/Internal/AsyncInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ struct SC::AsyncEventLoop::Internal
// Timers
[[nodiscard]] AsyncLoopTimeout* findEarliestLoopTimeout() const;

void invokeExpiredTimers(AsyncLoopTimeout& timeout);
void invokeExpiredTimers(Time::HighResolutionCounter currentTime);
void updateTime();

[[nodiscard]] Result cancelAsync(AsyncRequest& async);
Expand Down Expand Up @@ -131,6 +131,10 @@ struct SC::AsyncEventLoop::Internal

[[nodiscard]] Result runStep(SyncMode syncMode);

[[nodiscard]] Result submitRequests(AsyncKernelEvents& kernelEvents);
[[nodiscard]] Result blockingPoll(SyncMode syncMode, AsyncKernelEvents& kernelEvents);
[[nodiscard]] Result dispatchCompletions(SyncMode syncMode, AsyncKernelEvents& kernelEvents);

void runStepExecuteCompletions(KernelEvents& kernelEvents);
void runStepExecuteManualCompletions(KernelEvents& kernelEvents);
void runStepExecuteManualThreadPoolCompletions(KernelEvents& kernelEvents);
Expand Down
23 changes: 14 additions & 9 deletions Libraries/Async/Internal/AsyncLinux.inl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct SC::AsyncEventLoop::Internal::KernelEvents
bool isEpoll = true;
AlignedStorage<16400> storage;

KernelEvents(KernelQueue& kernelQueue);
KernelEvents(KernelQueue& kernelQueue, AsyncKernelEvents& asyncKernelEvents);
~KernelEvents();

KernelEventsIoURing& getUring();
Expand Down Expand Up @@ -171,15 +171,20 @@ struct SC::AsyncEventLoop::Internal::KernelQueueIoURing
struct SC::AsyncEventLoop::Internal::KernelEventsIoURing
{
private:
static constexpr int totalNumEvents = 256;

int newEvents = 0;
io_uring_cqe events[totalNumEvents];
io_uring_cqe* events;

KernelEvents& parentKernelEvents;

int& newEvents;
const int totalNumEvents;

public:
KernelEventsIoURing(KernelEvents& kq) : parentKernelEvents(kq) {}
KernelEventsIoURing(KernelEvents& kq, AsyncKernelEvents& kernelEvents)
: parentKernelEvents(kq), newEvents(kernelEvents.numberOfEvents),
totalNumEvents(static_cast<int>(kernelEvents.eventsMemory.sizeInBytes() / sizeof(events[0])))
{
events = reinterpret_cast<decltype(events)>(kernelEvents.eventsMemory.data());
}

[[nodiscard]] AsyncRequest* getAsyncRequest(uint32_t idx)
{
Expand Down Expand Up @@ -634,11 +639,11 @@ SC::Result SC::AsyncEventLoop::Internal::KernelQueue::wakeUpFromExternalThread()
// AsyncEventLoop::Internal::KernelEvents
//----------------------------------------------------------------------------------------

SC::AsyncEventLoop::Internal::KernelEvents::KernelEvents(KernelQueue& kernelQueue)
SC::AsyncEventLoop::Internal::KernelEvents::KernelEvents(KernelQueue& kernelQueue, AsyncKernelEvents& asyncKernelEvents)
{
isEpoll = kernelQueue.isEpoll;
isEpoll ? placementNew(storage.reinterpret_as<KernelEventsPosix>(), *this)
: placementNew(storage.reinterpret_as<KernelEventsIoURing>(), *this);
isEpoll ? placementNew(storage.reinterpret_as<KernelEventsPosix>(), *this, asyncKernelEvents)
: placementNew(storage.reinterpret_as<KernelEventsIoURing>(), *this, asyncKernelEvents);
}

SC::AsyncEventLoop::Internal::KernelEvents::~KernelEvents()
Expand Down
24 changes: 16 additions & 8 deletions Libraries/Async/Internal/AsyncPosix.inl
Original file line number Diff line number Diff line change
Expand Up @@ -281,22 +281,30 @@ struct SC::AsyncEventLoop::Internal::KernelQueuePosix
struct SC::AsyncEventLoop::Internal::KernelEventsPosix
{
private:
static constexpr int totalNumEvents = 1024;

#if SC_ASYNC_USE_EPOLL
epoll_event events[totalNumEvents];
epoll_event* events;
#else
struct kevent events[totalNumEvents];
struct kevent* events;
#endif
int newEvents = 0;

KernelEvents& parentKernelEvents;

int& newEvents;
const int totalNumEvents;

public:
KernelEventsPosix(KernelEvents& kq) : parentKernelEvents(kq) { memset(events, 0, sizeof(events)); }
#if SC_PLATFORM_APPLE
KernelEventsPosix(KernelQueuePosix&) : parentKernelEvents(*this) { memset(events, 0, sizeof(events)); }
KernelEventsPosix(KernelQueue&, AsyncKernelEvents& kernelEvents)
: parentKernelEvents(*this),
#else
KernelEventsPosix(KernelEvents& ke, AsyncKernelEvents& kernelEvents)
: parentKernelEvents(ke),
#endif
newEvents(kernelEvents.numberOfEvents),
totalNumEvents(static_cast<int>(kernelEvents.eventsMemory.sizeInBytes() / sizeof(events[0])))
{
events = reinterpret_cast<decltype(events)>(kernelEvents.eventsMemory.data());
}

uint32_t getNumEvents() const { return static_cast<uint32_t>(newEvents); }

[[nodiscard]] AsyncRequest* getAsyncRequest(uint32_t idx) const
Expand Down
18 changes: 13 additions & 5 deletions Libraries/Async/Internal/AsyncWindows.inl
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,18 @@ struct SC::AsyncEventLoop::Internal::KernelQueue

struct SC::AsyncEventLoop::Internal::KernelEvents
{
static constexpr int totalNumEvents = 128;

OVERLAPPED_ENTRY events[totalNumEvents];
ULONG newEvents = 0;
OVERLAPPED_ENTRY* events;

KernelEvents(KernelQueue&) { ::memset(events, 0, totalNumEvents * sizeof(events[0])); }
int& newEvents;
const int totalNumEvents = 0;

KernelEvents(KernelQueue&, AsyncKernelEvents& kernelEvents)
: newEvents(kernelEvents.numberOfEvents),
totalNumEvents(static_cast<int>(kernelEvents.eventsMemory.sizeInBytes() / sizeof(events[0])))
{
events = reinterpret_cast<decltype(events)>(kernelEvents.eventsMemory.data());
}

uint32_t getNumEvents() const { return static_cast<uint32_t>(newEvents); }

Expand Down Expand Up @@ -230,7 +236,9 @@ struct SC::AsyncEventLoop::Internal::KernelEvents
}
const DWORD ms =
nextTimer or syncMode == Internal::SyncMode::NoWait ? static_cast<ULONG>(timeout.ms) : INFINITE;
const BOOL res = ::GetQueuedCompletionStatusEx(loopFd, events, totalNumEvents, &newEvents, ms, FALSE);
ULONG ulongEvents = static_cast<ULONG>(newEvents);
const BOOL res = ::GetQueuedCompletionStatusEx(loopFd, events, totalNumEvents, &ulongEvents, ms, FALSE);
newEvents = static_cast<int>(ulongEvents);
if (res == FALSE)
{
if (::GetLastError() == WAIT_TIMEOUT)
Expand Down

0 comments on commit 0526b5c

Please sign in to comment.