Skip to content

Commit

Permalink
Async: Make AsyncSocket{Send | Receive} buffer and handle public
Browse files Browse the repository at this point in the history
  • Loading branch information
Pagghiu committed Sep 10, 2024
1 parent 3c78017 commit 15ec185
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
27 changes: 23 additions & 4 deletions Libraries/Async/Async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,35 @@ SC::Result SC::AsyncSocketConnect::start(AsyncEventLoop& loop, const SocketDescr
SC::Result SC::AsyncSocketSend::start(AsyncEventLoop& loop, const SocketDescriptor& socketDescriptor,
Span<const char> dataToSend)
{
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
buffer = dataToSend;
buffer = dataToSend;
return start(loop);
}

SC::Result SC::AsyncSocketSend::start(AsyncEventLoop& loop)
{
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncSocketSend::start - Zero sized write buffer");
SC_TRY_MSG(handle != SocketDescriptor::Invalid, "AsyncSocketSend::start - Invalid file descriptor");
SC_TRY(validateAsync());
#if SC_PLATFORM_WINDOWS
#else
totalBytesSent = 0;
#endif
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncSocketReceive::start(AsyncEventLoop& loop, const SocketDescriptor& socketDescriptor,
Span<char> receiveData)
{
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
buffer = receiveData;
return start(loop);
}

SC::Result SC::AsyncSocketReceive::start(AsyncEventLoop& loop)
{
SC_TRY(validateAsync());
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}
Expand Down Expand Up @@ -652,6 +667,7 @@ SC::Result SC::AsyncEventLoop::Internal::completeAndEventuallyReactivate(KernelE
}
if (not returnCode)
{
// TODO: We shouldn't probably access async if it has not been reactivated...
reportError(kernelEvents, async, move(returnCode));
}
return Result(true);
Expand Down Expand Up @@ -701,7 +717,9 @@ SC::Result SC::AsyncEventLoop::Internal::blockingPoll(SyncMode syncMode, AsyncKe
{
// We may have some manualCompletions queued (for SocketClose for example) but no active handles
SC_LOG_MESSAGE("Active Requests Before Poll = {}\n", getTotalNumberOfActiveHandle());
SC_TRY(kernelEvents.syncWithKernel(*loop, syncMode));

// If there are manual completions the loop can't block waiting for I/O, to dispatch them immediately
SC_TRY(kernelEvents.syncWithKernel(*loop, numberOfManualCompletions == 0 ? syncMode : SyncMode::NoWait));
SC_LOG_MESSAGE("Active Requests After Poll = {}\n", getTotalNumberOfActiveHandle());
}
return SC::Result(true);
Expand Down Expand Up @@ -986,6 +1004,7 @@ void SC::AsyncEventLoop::Internal::prepareTeardown(AsyncRequest& async, AsyncTea
SC::Result SC::AsyncEventLoop::Internal::setupAsync(KernelEvents& kernelEvents, AsyncRequest& async)
{
SC_LOG_MESSAGE("{} {} SETUP\n", async.debugName, AsyncRequest::TypeToString(async.type));
async.flags = 0; // Reset flags that may have been left by previous activations
return Internal::applyOnAsync(async, SetupAsyncPhase(kernelEvents));
}

Expand Down
34 changes: 27 additions & 7 deletions Libraries/Async/Async.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ struct SC::AsyncRequest
/// @brief Get the event loop associated with this AsyncRequest
[[nodiscard]] AsyncEventLoop* getEventLoop() const { return eventLoop; }

/// @brief Caches the event loop associated with this AsyncRequest.
/// Used to cache eventLoop pointer before starting the AsyncRequest.
void cacheInternalEventLoop(AsyncEventLoop& loop) { eventLoop = &loop; }

/// @brief Type of async request
enum class Type : uint8_t
{
Expand Down Expand Up @@ -553,16 +557,25 @@ struct AsyncSocketSend : public AsyncRequest
[[nodiscard]] SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor,
Span<const char> data);

/// @brief Starts a socket send operation.
/// Callback will be called when the given socket is ready to send more data.
/// @param eventLoop The event loop where queuing this async request
/// @return Valid Result if the request has been successfully queued
/// @note Remeber to fill AsyncSocketSend::buffer and AsyncSocketSend::handle before calling start
[[nodiscard]] SC::Result start(AsyncEventLoop& eventLoop);

Function<void(Result&)> callback; ///< Called when socket is ready to send more data.

Span<const char> buffer; ///< Span of bytes to send
SocketDescriptor::Handle handle = SocketDescriptor::Invalid; ///< The socket to send data to

private:
friend struct AsyncEventLoop;

SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
Span<const char> buffer;
size_t totalBytesSent = 0;
#if SC_PLATFORM_WINDOWS
detail::WinOverlappedOpaque overlapped;
#else
size_t totalBytesSent = 0;
#endif
};
struct AsyncSocketReceive;
Expand Down Expand Up @@ -613,14 +626,21 @@ struct AsyncSocketReceive : public AsyncRequest
[[nodiscard]] SC::Result start(AsyncEventLoop& eventLoop, const SocketDescriptor& socketDescriptor,
Span<char> data);

/// @brief Starts a socket receive operation.
/// Callback will be called when some data is read from socket.
/// @param eventLoop The event loop where queuing this async request
/// @return Valid Result if the request has been successfully queued
/// @note Remeber to fill AsyncSocketReceive::buffer and AsyncSocketReceive::handle before calling start
[[nodiscard]] SC::Result start(AsyncEventLoop& eventLoop);

Function<void(Result&)> callback; ///< Called after data has been received

private:
friend struct AsyncEventLoop;
Span<char> buffer; ///< The writeable span of memory where to data will be written
SocketDescriptor::Handle handle = SocketDescriptor::Invalid; /// The Socket Descriptor handle to read data from.

SocketDescriptor::Handle handle = SocketDescriptor::Invalid;
Span<char> buffer;
private:
#if SC_PLATFORM_WINDOWS
friend struct AsyncEventLoop;
detail::WinOverlappedOpaque overlapped;
#endif
};
Expand Down

0 comments on commit 15ec185

Please sign in to comment.