Skip to content

Commit

Permalink
chore: refactor ExecutorGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
vinniefalco committed Jun 16, 2023
1 parent 5d78642 commit 65b5279
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 98 deletions.
145 changes: 47 additions & 98 deletions include/mrdox/Support/ExecutorGroup.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
#define MRDOX_SUPPORT_EXECUTORGROUP_HPP

#include <mrdox/Platform.hpp>
#include <mrdox/Support/any_callable.hpp>
#include <mrdox/Support/ThreadPool.hpp>
#include <mrdox/Support/unlock_guard.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <vector>

Expand All @@ -25,48 +25,65 @@ namespace mrdox {
class MRDOX_DECL
ExecutorGroupBase
{
class scoped_agent;

protected:
struct Impl;

struct MRDOX_DECL
AnyAgent
{
virtual ~AnyAgent() = 0;
virtual void* get() noexcept = 0;
};

std::unique_ptr<Impl> impl_;
std::vector<std::unique_ptr<AnyAgent>> agents_;
std::deque<any_callable<void(void*)>> work_;

explicit ExecutorGroupBase(ThreadPool&);
void post(any_callable<void(void*)>);
void run(std::unique_lock<std::mutex>);

public:
template<class T>
using arg_t = ThreadPool::arg_t<T>;

~ExecutorGroupBase();
ExecutorGroupBase(ExecutorGroupBase&&) noexcept;

/** Block until all work has completed.
*/
void
wait() noexcept;
};

/** A set of execution agents for performing concurrent work.
*/
template<class Agent>
class ExecutorGroup : public ExecutorGroupBase
{
struct Impl
struct AgentImpl : AnyAgent
{
std::mutex mutex_;
std::condition_variable cv_;
};
Agent agent_;

ThreadPool& threadPool_;
std::unique_ptr<Impl> impl_;
std::vector<std::unique_ptr<Agent>> agents_;
std::deque<any_callable<void(Agent&)>> work_;
std::size_t busy_ = 0;

public:
template<class T>
using arg_t = ThreadPool::arg_t<T>;
template<class... Args>
AgentImpl(Args&&... args)
: agent_(std::forward<Args>(args)...)
{
}

ExecutorGroup(ExecutorGroup const&) = delete;
ExecutorGroup& operator=(ExecutorGroup&&) = delete;
ExecutorGroup& operator=(ExecutorGroup const&) = delete;
ExecutorGroup(ExecutorGroup&&) = default;
void* get() noexcept override
{
return &agent_;
}
};

public:
explicit
ExecutorGroup(
ThreadPool& threadPool) noexcept
: threadPool_(threadPool)
, impl_(std::make_unique<Impl>())
ThreadPool& threadPool)
: ExecutorGroupBase(threadPool)
{
}

Expand All @@ -79,8 +96,9 @@ class ExecutorGroup : public ExecutorGroupBase
void
emplace(Args&&... args)
{
agents_.emplace_back(std::make_unique<Agent>(
std::forward<Args>(args)...));
agents_.emplace_back(
std::make_unique<AgentImpl>(
std::forward<Args>(args)...));
}

/** Submit work to be executed.
Expand All @@ -96,86 +114,17 @@ class ExecutorGroup : public ExecutorGroupBase
async(F&& f, Args&&... args)
{
static_assert(std::is_invocable_v<F, Agent&, arg_t<Args>...>);
std::unique_lock<std::mutex> lock(impl_->mutex_);
work_.emplace_back(
post(
[
f = std::forward<F>(f),
args = std::tuple<arg_t<Args>...>(args...)
](Agent& agent)
](void* agent)
{
std::apply(f, std::tuple_cat(
std::tuple<Agent&>(agent),
std::apply(f,
std::tuple_cat(std::tuple<Agent&>(
*reinterpret_cast<Agent*>(agent)),
std::move(args)));
});
if(agents_.empty())
return;
run(std::move(lock));
}

/** Block until all work has completed.
*/
void
wait()
{
std::unique_lock<std::mutex> lock(impl_->mutex_);
impl_->cv_.wait(lock,
[&]
{
return work_.empty() && busy_ == 0;
});
}

private:
class scoped_agent
{
ExecutorGroup& group_;
std::unique_ptr<Agent> agent_;

public:
scoped_agent(
ExecutorGroup& group,
std::unique_ptr<Agent> agent) noexcept
: group_(group)
, agent_(std::move(agent))
{
}

~scoped_agent()
{
--group_.busy_;
group_.agents_.emplace_back(std::move(agent_));
group_.impl_->cv_.notify_all();
}

Agent& operator*() const noexcept
{
return *agent_;
}
};

void
run(std::unique_lock<std::mutex> lock)
{
std::unique_ptr<Agent> agent(std::move(agents_.back()));
agents_.pop_back();
++busy_;

threadPool_.async(
[this, agent = std::move(agent)]() mutable
{
std::unique_lock<std::mutex> lock(impl_->mutex_);
scoped_agent scope(*this, std::move(agent));
for(;;)
{
if(work_.empty())
break;
any_callable<void(Agent&)> work(
std::move(work_.front()));
work_.pop_front();
unlock_guard unlock(impl_->mutex_);
work(*scope);
}
});
}
};

Expand Down
1 change: 1 addition & 0 deletions source/-adoc/SinglePageVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//

#include "SinglePageVisitor.hpp"
#include <mrdox/Support/unlock_guard.hpp>

namespace clang {
namespace mrdox {
Expand Down
112 changes: 112 additions & 0 deletions source/Support/ExecutorGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,121 @@
//

#include <mrdox/Support/ExecutorGroup.hpp>
#include <mrdox/Support/unlock_guard.hpp>
#include <condition_variable>

namespace clang {
namespace mrdox {

struct ExecutorGroupBase::
Impl
{
ThreadPool& threadPool;
std::mutex mutex;
std::condition_variable cv;
std::size_t busy = 0;

explicit
Impl(ThreadPool& threadPool_)
: threadPool(threadPool_)
{
}
};

class ExecutorGroupBase::
scoped_agent
{
ExecutorGroupBase& group_;
std::unique_ptr<AnyAgent> agent_;

public:
scoped_agent(
ExecutorGroupBase& group,
std::unique_ptr<AnyAgent> agent) noexcept
: group_(group)
, agent_(std::move(agent))
{
}

~scoped_agent()
{
--group_.impl_->busy;
group_.agents_.emplace_back(std::move(agent_));
group_.impl_->cv.notify_all();
}

void* get() const noexcept
{
return agent_->get();
}
};

ExecutorGroupBase::
AnyAgent::
~AnyAgent() = default;

ExecutorGroupBase::
ExecutorGroupBase(
ThreadPool& threadPool)
: impl_(std::make_unique<Impl>(threadPool))
{
}

ExecutorGroupBase::
~ExecutorGroupBase() = default;

ExecutorGroupBase::
ExecutorGroupBase(
ExecutorGroupBase&&) noexcept = default;

void
ExecutorGroupBase::
post(any_callable<void(void*)> work)
{
std::unique_lock<std::mutex> lock(impl_->mutex);
work_.emplace_back(std::move(work));
if(agents_.empty())
return;
run(std::move(lock));
}

void
ExecutorGroupBase::
run(std::unique_lock<std::mutex> lock)
{
std::unique_ptr<AnyAgent> agent(std::move(agents_.back()));
agents_.pop_back();
++impl_->busy;

impl_->threadPool.async(
[this, agent = std::move(agent)]() mutable
{
std::unique_lock<std::mutex> lock(impl_->mutex);
scoped_agent scope(*this, std::move(agent));
for(;;)
{
if(work_.empty())
break;
any_callable<void(void*)> work(
std::move(work_.front()));
work_.pop_front();
unlock_guard unlock(impl_->mutex);
work(scope.get());
}
});
}

void
ExecutorGroupBase::
wait() noexcept
{
std::unique_lock<std::mutex> lock(impl_->mutex);
impl_->cv.wait(lock,
[&]
{
return work_.empty() && impl_->busy == 0;
});
}

} // mrdox
} // clang

0 comments on commit 65b5279

Please sign in to comment.