Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncioRunnable #411

Merged
merged 51 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
06258e5
move pycoro to mrc
cwharris Oct 20, 2023
44c3575
python coroutine tests
cwharris Oct 20, 2023
49fc835
remove commented code
cwharris Oct 24, 2023
2b2363e
adjust indentation of `\`
cwharris Oct 24, 2023
5a807f6
fix styles
cwharris Oct 24, 2023
12e54fb
adjust styles
cwharris Oct 25, 2023
4822967
add back some timing and hit-count logic to coro tests
cwharris Oct 25, 2023
ff44453
remove unused namespace and rename pycoro
cwharris Oct 25, 2023
d54ca8f
fix styles
cwharris Oct 25, 2023
5412e76
fix styles
cwharris Oct 25, 2023
26fae58
add asynciorunnable
cwharris Oct 27, 2023
4ea145a
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into fea-sh…
cwharris Oct 27, 2023
7eb5da7
remove unncessary using
cwharris Oct 27, 2023
622fbac
adjust includes for scheduler.cpp/hpp
cwharris Oct 27, 2023
a2a0938
remove unncessary bool return for await_suspend
cwharris Oct 27, 2023
64a346b
add copyright header
cwharris Oct 27, 2023
e157311
asyncio_runnable test
cwharris Oct 30, 2023
d964e00
fix imports
cwharris Oct 31, 2023
d456588
remove unused includes
cwharris Oct 31, 2023
222edcd
move exception catcher to MRC
cwharris Oct 31, 2023
fc49a12
rename and document BoostFutureAwaiter -> BoostFutureAwaitableOperation
cwharris Oct 31, 2023
5f5f7ae
remove unneccessary ireader and iwriter interfaces
cwharris Nov 1, 2023
48dad3d
simplify asyncio_runnable
cwharris Nov 1, 2023
521abc2
simplify asyncio_runnable
cwharris Nov 1, 2023
a0d136e
simplify asyncio_runnable
cwharris Nov 1, 2023
e1da112
simplify asyncio_runnable
cwharris Nov 1, 2023
2126de9
rename coroutinerunnable sink/source to async sink/source, add docs
cwharris Nov 1, 2023
1bf51c8
adjust comments
cwharris Nov 1, 2023
760c314
remove concurrency from asyncioscheduler
cwharris Nov 1, 2023
b835b24
move ownership of event loop out of asyncioscheduler
cwharris Nov 1, 2023
8be2867
add info log
cwharris Nov 1, 2023
efd7fe9
fix bug related to r-value reference and suspended coroutine
cwharris Nov 1, 2023
e509bbb
add python-coroutine-based test for asynciorunnable
cwharris Nov 1, 2023
2b2f26e
fix test_asyncio_runnable test class to support multiple test cases
cwharris Nov 1, 2023
df8feb8
refactor scheduler to be simpler
cwharris Nov 1, 2023
ce77e06
fix includes
cwharris Nov 1, 2023
e733106
add boost future awaitable operation test
cwharris Nov 1, 2023
ce7d371
fix tests
cwharris Nov 2, 2023
98e093a
add comments to scheduler and asyncio_scheduler
cwharris Nov 2, 2023
e650997
comment asyncio_runnable's functions
cwharris Nov 2, 2023
4b1999d
add exception test case for asyncio_runnable
cwharris Nov 2, 2023
1591e3d
add asyncgenerator failure test to asyncio_runnable
cwharris Nov 2, 2023
c2942c2
formatting
cwharris Nov 2, 2023
00253e6
remove inspect
cwharris Nov 2, 2023
a4e49f2
fix includes
cwharris Nov 2, 2023
7fa1f79
fix scheduler comment styles
cwharris Nov 2, 2023
519d0e7
fix asyncio_scheduler styles
cwharris Nov 2, 2023
229dd3a
fix styles
cwharris Nov 2, 2023
5dbfbe0
add copyright header to exception_catcher.hpp
cwharris Nov 2, 2023
6c070c8
styles
cwharris Nov 2, 2023
f2369db
fix exception_catch.cpp copyright header
cwharris Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ add_library(libmrc
src/public/core/logging.cpp
src/public/core/thread.cpp
src/public/coroutines/event.cpp
src/public/coroutines/scheduler.cpp
src/public/coroutines/sync_wait.cpp
src/public/coroutines/task_container.cpp
src/public/coroutines/thread_local_context.cpp
Expand All @@ -124,6 +123,7 @@ add_library(libmrc
src/public/cuda/sync.cpp
src/public/edge/edge_adapter_registry.cpp
src/public/edge/edge_builder.cpp
src/public/exceptions/exception_catcher.cpp
src/public/manifold/manifold.cpp
src/public/memory/buffer_view.cpp
src/public/memory/codable/buffer.cpp
Expand Down
91 changes: 6 additions & 85 deletions cpp/mrc/include/mrc/coroutines/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,109 +25,30 @@
#include <mutex>
#include <string>

// IWYU thinks this is needed, but it's not
// IWYU pragma: no_include "mrc/coroutines/task_container.hpp"

namespace mrc::coroutines {

class TaskContainer; // IWYU pragma: keep

/**
* @brief Scheduler base class
*
* Allows all schedulers to be discovered via the mrc::this_thread::current_scheduler()
*/
class Scheduler : public std::enable_shared_from_this<Scheduler>
{
public:
struct Operation
{
Operation(Scheduler& scheduler);

constexpr static auto await_ready() noexcept -> bool
{
return false;
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept;

constexpr static auto await_resume() noexcept -> void {}

Scheduler& m_scheduler;
std::coroutine_handle<> m_awaiting_coroutine;
Operation* m_next{nullptr};
};

Scheduler();
virtual ~Scheduler() = default;

/**
* @brief Description of Scheduler
*/
virtual std::string description() const = 0;

/**
* Schedules the currently executing coroutine to be run on this thread pool. This must be
* called from within the coroutines function body to schedule the coroutine on the thread pool.
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
* @return The operation to switch from the calling scheduling thread to the executor thread
* pool thread.
*/
[[nodiscard]] virtual auto schedule() -> Operation;

// Enqueues a message without waiting for it. Must return void since the caller will not get the return value
virtual void schedule(Task<void>&& task);

/**
* Schedules any coroutine handle that is ready to be resumed.
* @param handle The coroutine handle to schedule.
*/
virtual auto resume(std::coroutine_handle<> coroutine) -> void = 0;

/**
* Yields the current task to the end of the queue of waiting tasks.
*/
[[nodiscard]] auto yield() -> Operation;

/**
* If the calling thread controlled by a Scheduler, return a pointer to the Scheduler
* @brief Resumes a coroutine according to the scheduler's implementation.
*/
static auto from_current_thread() noexcept -> Scheduler*;
virtual void resume(std::coroutine_handle<> handle) noexcept = 0;

/**
* If the calling thread is owned by a thread_pool, return the thread index (rank) of the current thread with
* respect the threads in the pool; otherwise, return the std::hash of std::this_thread::get_id
* @brief Suspends the current function and resumes it according to the scheduler's implementation.
*/
static auto get_thread_id() noexcept -> std::size_t;
[[nodiscard]] virtual Task<> schedule() = 0;

protected:
virtual auto on_thread_start(std::size_t) -> void;

/**
* @brief Get the task container object
*
* @return TaskContainer&
*/
TaskContainer& get_task_container() const;

private:
/**
* @brief When co_await schedule() is called, this function will be executed by the awaiter. Each scheduler
* implementation should determine how and when to execute the operation.
*
* @param operation The schedule() awaitable pointer
* @return std::coroutine_handle<> Return a coroutine handle to which will be
* used as the return value for await_suspend().
* @brief Suspends the current function and resumes it according to the scheduler's implementation.
*/
virtual std::coroutine_handle<> schedule_operation(Operation* operation) = 0;

mutable std::mutex m_mutex;

// Maintains the lifetime of fire-and-forget tasks scheduled with schedule(Task<void>&& task)
std::unique_ptr<TaskContainer> m_task_container;

thread_local static Scheduler* m_thread_local_scheduler;
thread_local static std::size_t m_thread_id;
[[nodiscard]] virtual Task<> yield() = 0;
};

} // namespace mrc::coroutines
53 changes: 53 additions & 0 deletions cpp/mrc/include/mrc/exceptions/exception_catcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <exception>
#include <mutex>
#include <queue>

#pragma once

namespace mrc {

/**
* @brief A utility for catching out-of-stack exceptions in a thread-safe manner such that they
* can be checked and throw from a parent thread.
*/
class ExceptionCatcher
{
public:
/**
* @brief "catches" an exception to the catcher
*/
void push_exception(std::exception_ptr ex);

/**
* @brief checks to see if any exceptions have been "caught" by the catcher.
*/
bool has_exception();

/**
* @brief rethrows the next exception (in the order in which it was "caught").
*/
void rethrow_next_exception();

private:
std::mutex m_mutex{};
std::queue<std::exception_ptr> m_exceptions{};
};

} // namespace mrc
85 changes: 0 additions & 85 deletions cpp/mrc/src/public/coroutines/scheduler.cpp

This file was deleted.

50 changes: 50 additions & 0 deletions cpp/mrc/src/public/exceptions/exception_catcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <mrc/exceptions/exception_catcher.hpp>

namespace mrc {

void ExceptionCatcher::push_exception(std::exception_ptr ex)
{
auto lock = std::lock_guard(m_mutex);
m_exceptions.push(ex);
}

bool ExceptionCatcher::has_exception()
{
auto lock = std::lock_guard(m_mutex);
return not m_exceptions.empty();
}

void ExceptionCatcher::rethrow_next_exception()
{
auto lock = std::lock_guard(m_mutex);

if (m_exceptions.empty())
{
return;
}

auto ex = m_exceptions.front();

m_exceptions.pop();

std::rethrow_exception(ex);
}

} // namespace mrc
Loading