Skip to content

Commit

Permalink
Async: Use offsets only if explicitly set in AsyncFile{Read | Write}
Browse files Browse the repository at this point in the history
Also give up trying to use EVFILT_READ for buffered files on kqueue (macOS) as it can't report EOF reliably.
  • Loading branch information
Pagghiu committed Sep 10, 2024
1 parent 15ec185 commit c251205
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 22 deletions.
30 changes: 27 additions & 3 deletions Libraries/Async/Async.h
Original file line number Diff line number Diff line change
Expand Up @@ -740,15 +740,27 @@ struct AsyncFileRead : public AsyncRequest
Function<void(Result&)> callback; /// Callback called when some data has been read from the file into the buffer

Span<char> buffer; /// The writeable span of memory where to data will be written
uint64_t offset = 0; /// Offset from file start where to start reading. Not supported on pipes.
FileDescriptor::Handle fileDescriptor; /// The file/pipe descriptor handle to read data from.
/// Use SC::FileDescriptor or SC::PipeDescriptor to open it, with
/// SC::FileDescriptorOpenOptions::blocking == false

/// @brief Returns the last offset set with AsyncFileRead::setOffset
uint64_t getOffset() const { return offset; }

/// @brief Sets the offset in bytes at which start reading.
/// @note Setting file offset when reading is only possible on seekable files
void setOffset(uint64_t fileOffset)
{
useOffset = true;
offset = fileOffset;
}

private:
friend struct AsyncEventLoop;

bool useOffset = false;
uint64_t offset = 0; /// Offset from file start where to start reading. Not supported on pipes.
#if SC_PLATFORM_WINDOWS
uint64_t readCursor = 0;
detail::WinOverlappedOpaque overlapped;
#endif
};
Expand Down Expand Up @@ -812,13 +824,25 @@ struct AsyncFileWrite : public AsyncRequest
Function<void(Result&)> callback; /// Callback called when descriptor is ready to be written with more data

Span<const char> buffer; /// The read-only span of memory where to read the data from
uint64_t offset = 0; /// Offset to start writing from. Not supported on pipes.
FileDescriptor::Handle fileDescriptor; /// The file/pipe descriptor to write data to.
/// Use SC::FileDescriptor or SC::PipeDescriptor to open it, with
/// SC::FileDescriptorOpenOptions::blocking == false

/// @brief Returns the last offset set with AsyncFileWrite::setOffset
uint64_t getOffset() const { return offset; }

/// @brief Sets the offset in bytes at which start writing.
/// @note Setting write file offset when reading is only possible on seekable files
void setOffset(uint64_t fileOffset)
{
useOffset = true;
offset = fileOffset;
}

private:
friend struct AsyncEventLoop;
bool useOffset = false;
uint64_t offset = 0xffffffffffffffff; /// Offset to start writing from. Not supported on pipes.
#if SC_PLATFORM_WINDOWS
detail::WinOverlappedOpaque overlapped;
#endif
Expand Down
4 changes: 2 additions & 2 deletions Libraries/Async/Internal/AsyncLinux.inl
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsIoURing
io_uring_sqe* submission;
SC_TRY(getNewSubmission(async, submission));
globalLibURing.io_uring_prep_read(submission, async.fileDescriptor, async.buffer.data(),
async.buffer.sizeInBytes(), async.offset);
async.buffer.sizeInBytes(), async.useOffset ? async.offset : -1);
globalLibURing.io_uring_sqe_set_data(submission, &async);
return Result(true);
}
Expand All @@ -495,7 +495,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsIoURing
io_uring_sqe* submission;
SC_TRY(getNewSubmission(async, submission));
globalLibURing.io_uring_prep_write(submission, async.fileDescriptor, async.buffer.data(),
async.buffer.sizeInBytes(), 0);
async.buffer.sizeInBytes(), async.useOffset ? async.offset : -1);
globalLibURing.io_uring_sqe_set_data(submission, &async);
return Result(true);
}
Expand Down
52 changes: 39 additions & 13 deletions Libraries/Async/Internal/AsyncPosix.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
#include <errno.h> // For error handling
#include <netdb.h> // socketlen_t/getsocketopt/send/recv
#include <sys/event.h> // kqueue
#include <sys/time.h> // timespec
#include <sys/wait.h> // WIFEXITED / WEXITSTATUS
#include <unistd.h> // read/write/pread/pwrite
#include <sys/stat.h>
#include <sys/time.h> // timespec
#include <sys/wait.h> // WIFEXITED / WEXITSTATUS
#include <unistd.h> // read/write/pread/pwrite

#endif

Expand Down Expand Up @@ -327,7 +328,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
return KernelQueuePosix::setEventWatcher(async, fileDescriptor, filter);
}

[[nodiscard]] static bool isDescriptorWatchable(int fd, bool& canBeWatched)
[[nodiscard]] static bool isDescriptorWriteWatchable(int fd, bool& canBeWatched)
{
struct stat file_stat;
if (::fstat(fd, &file_stat) == -1)
Expand Down Expand Up @@ -386,7 +387,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
return Result(true);
}

[[nodiscard]] static constexpr bool isDescriptorWatchable(int, bool& canBeWatched)
[[nodiscard]] static constexpr bool isDescriptorWriteWatchable(int, bool& canBeWatched)
{
canBeWatched = true; // kevent can also watch regular buffered files (differently from epoll)
return true;
Expand All @@ -409,6 +410,19 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
}
#endif

[[nodiscard]] static bool isDescriptorReadWatchable(int fd, bool& canBeWatched)
{
struct stat file_stat;
if (::fstat(fd, &file_stat) == -1)
{
return false;
}
// epoll doesn't support regular file descriptors
// kqueue doesn't report EOF on vnodes (regular files) for EVFILT_READ
canBeWatched = S_ISREG(file_stat.st_mode) == 0;
return true;
}

static struct timespec timerToTimespec(const Time::HighResolutionCounter& loopTime,
const Time::HighResolutionCounter* nextTimer)
{
Expand Down Expand Up @@ -683,7 +697,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
[[nodiscard]] Result setupAsync(AsyncFileRead& async)
{
bool canBeWatched;
SC_TRY(isDescriptorWatchable(async.fileDescriptor, canBeWatched));
SC_TRY(isDescriptorReadWatchable(async.fileDescriptor, canBeWatched));
if (canBeWatched)
{
return setEventWatcher(async, async.fileDescriptor, INPUT_EVENTS_MASK);
Expand All @@ -705,19 +719,25 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
return KernelQueuePosix::stopSingleWatcherImmediate(*async.eventLoop, async.fileDescriptor, INPUT_EVENTS_MASK);
}

[[nodiscard]] static Result teardownAsync(AsyncFileRead*, AsyncTeardown& teardown)
{
return KernelQueuePosix::stopSingleWatcherImmediate(*teardown.eventLoop, teardown.fileHandle,
INPUT_EVENTS_MASK);
}

[[nodiscard]] static Result executeOperation(AsyncFileRead& async, AsyncFileRead::CompletionData& completionData)
{
auto span = async.buffer;
ssize_t res;
do
{
if (async.offset == 0)
if (async.useOffset)
{
res = ::read(async.fileDescriptor, span.data(), span.sizeInBytes());
res = ::pread(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast<off_t>(async.offset));
}
else
{
res = ::pread(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast<off_t>(async.offset));
res = ::read(async.fileDescriptor, span.data(), span.sizeInBytes());
}
} while ((res == -1) and (errno == EINTR));

Expand All @@ -736,7 +756,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
[[nodiscard]] Result setupAsync(AsyncFileWrite& async)
{
bool canBeWatched;
SC_TRY(isDescriptorWatchable(async.fileDescriptor, canBeWatched));
SC_TRY(isDescriptorWriteWatchable(async.fileDescriptor, canBeWatched));
if (canBeWatched)
{
return setEventWatcher(async, async.fileDescriptor, OUTPUT_EVENTS_MASK);
Expand All @@ -758,19 +778,25 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix
return KernelQueuePosix::stopSingleWatcherImmediate(*async.eventLoop, async.fileDescriptor, OUTPUT_EVENTS_MASK);
}

[[nodiscard]] static Result teardownAsync(AsyncFileWrite*, AsyncTeardown& teardown)
{
return KernelQueuePosix::stopSingleWatcherImmediate(*teardown.eventLoop, teardown.fileHandle,
OUTPUT_EVENTS_MASK);
}

[[nodiscard]] static Result executeOperation(AsyncFileWrite& async, AsyncFileWrite::CompletionData& completionData)
{
auto span = async.buffer;
ssize_t res;
do
{
if (async.offset == 0)
if (async.useOffset)
{
res = ::write(async.fileDescriptor, span.data(), span.sizeInBytes());
res = ::pwrite(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast<off_t>(async.offset));
}
else
{
res = ::pwrite(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast<off_t>(async.offset));
res = ::write(async.fileDescriptor, span.data(), span.sizeInBytes());
}
} while ((res == -1) and (errno == EINTR));
SC_TRY_MSG(res >= 0, "::write failed");
Expand Down
12 changes: 11 additions & 1 deletion Libraries/Async/Internal/AsyncWindows.inl
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,13 @@ struct SC::AsyncEventLoop::Internal::KernelEvents
[[nodiscard]] static Result executeOperation(AsyncFileRead& async, AsyncFileRead::CompletionData& completionData,
bool synchronous = true)
{
return executeFileOperation(&::ReadFile, async, completionData, synchronous, &completionData.endOfFile);
if (not async.useOffset)
{
async.offset = async.readCursor;
}
SC_TRY(executeFileOperation(&::ReadFile, async, completionData, synchronous, &completionData.endOfFile));
async.readCursor = async.offset + async.buffer.sizeInBytes();
return Result(true);
}

[[nodiscard]] static Result completeAsync(AsyncFileRead::Result& result)
Expand All @@ -601,6 +607,10 @@ struct SC::AsyncEventLoop::Internal::KernelEvents
[[nodiscard]] static Result executeOperation(AsyncFileWrite& async, AsyncFileWrite::CompletionData& completionData,
bool synchronous = true)
{
// TODO: Do the same as AsyncFileRead
// To write to the end of file, specify both the Offset and OffsetHigh members of the OVERLAPPED structure as
// 0xFFFFFFFF. This is functionally equivalent to previously calling the CreateFile function to open hFile using
// FILE_APPEND_DATA access.
return executeFileOperation(&::WriteFile, async, completionData, synchronous, nullptr);
}

Expand Down
120 changes: 117 additions & 3 deletions Libraries/Async/Tests/AsyncTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ struct SC::AsyncTest : public SC::TestCase
fileReadWrite(false); // do not use thread-pool
fileReadWrite(true); // use thread-pool
}
if (test_section("file endOfFile"))
{
fileEndOfFile(false); // do not use thread-pool
fileEndOfFile(true); // use thread-pool
}
if (test_section("file close"))
{
fileClose();
Expand Down Expand Up @@ -111,6 +116,7 @@ struct SC::AsyncTest : public SC::TestCase
void socketClose();
void socketSendReceiveError();
void fileReadWrite(bool useThreadPool);
void fileEndOfFile(bool useThreadPool);
void fileClose();
};

Expand Down Expand Up @@ -914,7 +920,7 @@ void SC::AsyncTest::fileReadWrite(bool useThreadPool)
{
SC_TEST_EXPECT(readData.sizeInBytes() == 1);
params.readBuffer[params.readCount++] = readData.data()[0];
res.getAsync().offset += readData.sizeInBytes();
res.getAsync().setOffset(res.getAsync().getOffset() + readData.sizeInBytes());
res.reactivateRequest(true);
}
else
Expand Down Expand Up @@ -949,6 +955,114 @@ void SC::AsyncTest::fileReadWrite(bool useThreadPool)
SC_TEST_EXPECT(fs.removeEmptyDirectory(name));
}

void SC::AsyncTest::fileEndOfFile(bool useThreadPool)
{
// This tests a weird edge case where doing a single file read of the entire size of file
// will not produce end of file flag

// 1. Create ThreadPool and tasks
ThreadPool threadPool;
if (useThreadPool)
{
SC_TEST_EXPECT(threadPool.create(4));
}

// 2. Create EventLoop
AsyncEventLoop eventLoop;
SC_TEST_EXPECT(eventLoop.create(options));

// 3. Create some files on disk
StringNative<255> filePath = StringEncoding::Native;
StringNative<255> dirPath = StringEncoding::Native;
const StringView name = "AsyncTest";
const StringView fileName = "test.txt";
SC_TEST_EXPECT(Path::join(dirPath, {report.applicationRootDirectory, name}));
SC_TEST_EXPECT(Path::join(filePath, {dirPath.view(), fileName}));

FileSystem fs;
SC_TEST_EXPECT(fs.init(report.applicationRootDirectory));
SC_TEST_EXPECT(fs.makeDirectoryIfNotExists(name));
SC_TEST_EXPECT(fs.changeDirectory(dirPath.view()));
{
char data[1024] = {0};
SC_TEST_EXPECT(fs.write(fileName, {data, sizeof(data)}));
}

FileDescriptor::OpenOptions openOptions;
openOptions.blocking = useThreadPool;

FileDescriptor::Handle handle = FileDescriptor::Invalid;
FileDescriptor fd;
SC_TEST_EXPECT(fd.open(filePath.view(), FileDescriptor::ReadOnly, openOptions));
if (not useThreadPool)
{
SC_TEST_EXPECT(eventLoop.associateExternallyCreatedFileDescriptor(fd));
}
SC_TEST_EXPECT(fd.get(handle, Result::Error("asd")));

struct Context
{
int readCount = 0;
size_t readSize = 0;
} context;
AsyncFileRead asyncReadFile;
AsyncFileRead::Task asyncReadTask;
asyncReadFile.setDebugName("FileRead");
asyncReadFile.callback = [this, &context](AsyncFileRead::Result& res)
{
Span<char> readData;
SC_TEST_EXPECT(res.get(readData));
if (context.readCount == 0)
{
context.readSize += readData.sizeInBytes();
res.reactivateRequest(true);
}
else if (context.readCount == 1)
{
context.readSize += readData.sizeInBytes();
}
else if (context.readCount == 2)
{
SC_TEST_EXPECT(res.completionData.endOfFile);
SC_TEST_EXPECT(readData.empty()); // EOF
}
else
{
SC_TEST_EXPECT(context.readCount <= 3);
}
context.readCount++;
};
char buffer[512] = {0};
asyncReadFile.fileDescriptor = handle;
asyncReadFile.buffer = {buffer, sizeof(buffer)};
if (useThreadPool)
{
SC_TEST_EXPECT(asyncReadFile.start(eventLoop, threadPool, asyncReadTask));
}
else
{
SC_TEST_EXPECT(asyncReadFile.start(eventLoop));
}

SC_TEST_EXPECT(eventLoop.run());
SC_TEST_EXPECT(context.readCount == 2);
if (useThreadPool)
{
SC_TEST_EXPECT(asyncReadFile.start(eventLoop, threadPool, asyncReadTask));
}
else
{
SC_TEST_EXPECT(asyncReadFile.start(eventLoop));
}
SC_TEST_EXPECT(eventLoop.run());
SC_TEST_EXPECT(context.readCount == 3);
SC_TEST_EXPECT(fd.close());

SC_TEST_EXPECT(fs.removeFile(fileName));
SC_TEST_EXPECT(fs.changeDirectory(report.applicationRootDirectory));
SC_TEST_EXPECT(fs.removeEmptyDirectory(name));
}

void SC::AsyncTest::fileClose()
{
AsyncEventLoop eventLoop;
Expand Down Expand Up @@ -1287,8 +1401,8 @@ asyncReadFile.callback = [&](AsyncFileRead::Result& res)
// readData is a slice of receivedData with the received bytes
console.print("Read {} bytes from file", readData.sizeInBytes());

// IMPORTANT: Update file offset to receive next range of bytes
res.getAsync().offset += readData.sizeInBytes();
// OPTIONAL: Update file offset to receive a different range of bytes
res.getAsync().setOffset(res.getAsync().getOffset() + readData.sizeInBytes());

// IMPORTANT: Reactivate the request to receive more data
res.reactivateRequest(true);
Expand Down

0 comments on commit c251205

Please sign in to comment.