Skip to content

Commit

Permalink
Async: Rename AsyncPipeline::Sink to AsyncPipeline::Pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Pagghiu committed Nov 23, 2024
1 parent a075702 commit a6bdc11
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
8 changes: 4 additions & 4 deletions Libraries/Async/AsyncStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,9 @@ SC::Result SC::AsyncPipeline::start()
SC_TRY_MSG(source != nullptr, "AsyncPipeline::start - Missing source");

AsyncBuffersPool& buffers = source->getBuffersPool();
for (Sink sink : sinks)
for (const Pipe& pipe : pipes)
{
if (&sink.sink->getBuffersPool() != &buffers)
if (&pipe.sink->getBuffersPool() != &buffers)
{
return Result::Error("AsyncPipeline::start - all streams must use the same AsyncBuffersPool");
}
Expand All @@ -484,12 +484,12 @@ SC::Result SC::AsyncPipeline::start()

void SC::AsyncPipeline::onBufferRead(AsyncBufferView::ID bufferID)
{
for (Sink sink : sinks)
for (const Pipe& pipe : pipes)
{
source->getBuffersPool().refBuffer(bufferID); // 4a. AsyncPipeline::onBufferWritten
Function<void(AsyncBufferView::ID)> cb;
cb.bind<AsyncPipeline, &AsyncPipeline::onBufferWritten>(*this);
Result res = sink.sink->write(bufferID, move(cb));
Result res = pipe.sink->write(bufferID, move(cb));
if (not res)
{
eventError.emit(res);
Expand Down
7 changes: 4 additions & 3 deletions Libraries/Async/AsyncStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,18 @@ struct AsyncWritableStream
/// @note It's crucial to use the same AsyncBuffersPool for the AsyncReadableStream and all AsyncWritableStream
struct AsyncPipeline
{
static constexpr int MaxListeners = 8;
static constexpr int MaxListeners = 8;

Event<MaxListeners, Result> eventError; /// Emitted when an error occurs

// TODO: Make all these private
AsyncReadableStream* source = nullptr; /// User specified source

struct Sink
struct Pipe
{
AsyncWritableStream* sink = nullptr;
};
Span<Sink> sinks; /// User specified sinks
Span<Pipe> pipes; /// User specified sinks

/// @brief Starts the pipeline
/// @note Both source and sinks must have been already setup by the caller
Expand Down
18 changes: 9 additions & 9 deletions Libraries/Async/Tests/AsyncRequestStreamsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ void SC::AsyncRequestStreamsTest::fileToFile()
// Create Pipeline
AsyncPipeline pipeline;

AsyncPipeline::Sink destinations[1];
pipeline.source = &readable;
pipeline.sinks = {destinations, 1};
destinations[0].sink = &writable;
AsyncPipeline::Pipe pipe[1];
pipeline.source = &readable;
pipeline.pipes = {pipe, 1};
pipe[0].sink = &writable;

SC_TEST_EXPECT(pipeline.start());

Expand Down Expand Up @@ -297,19 +297,19 @@ void SC::AsyncRequestStreamsTest::fileToSocketToFile()
// Create Pipelines
AsyncPipeline pipelines[2];

AsyncPipeline::Sink sinks[2];
AsyncPipeline::Pipe pipes[2];

// Create first Async Pipeline (file to socket)
pipelines[0].source = &readFileStream;
pipelines[0].sinks = {&sinks[0], 1};
pipelines[0].pipes = {&pipes[0], 1};

pipelines[0].sinks[0].sink = &writeSocketStream;
pipelines[0].pipes[0].sink = &writeSocketStream;

// Create second Async Pipeline (socket to file)
pipelines[1].source = &readSocketStream;
pipelines[1].sinks = {&sinks[1], 1};
pipelines[1].pipes = {&pipes[1], 1};

pipelines[1].sinks[0].sink = &writeFileStream;
pipelines[1].pipes[0].sink = &writeFileStream;

// Start Async Pipeline
SC_TEST_EXPECT(pipelines[0].start());
Expand Down

0 comments on commit a6bdc11

Please sign in to comment.