Skip to content

Commit

Permalink
chore: refactor thread facilities
Browse files Browse the repository at this point in the history
  • Loading branch information
vinniefalco committed Jun 10, 2023
1 parent bdc9864 commit 9121bef
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 205 deletions.
158 changes: 61 additions & 97 deletions include/mrdox/Support/Thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,33 @@

#include <mrdox/Platform.hpp>
#include <mrdox/Support/Error.hpp>
#include <functional>
#include <iterator>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

namespace llvm {
class ThreadPool;
class ThreadPoolTaskGroup;
} // llvm

namespace clang {
namespace mrdox {

class TaskGroup;

//------------------------------------------------

/** A pool of threads for executing work concurrently.
*/
class MRDOX_VISIBLE
ThreadPool
{
class Impl;

struct MRDOX_VISIBLE
Work
{
virtual ~Work() = 0;
virtual void operator()() = 0;
};
std::unique_ptr<llvm::ThreadPool> impl_;

std::unique_ptr<Impl> impl_;
friend class TaskGroup;

public:
/** Destructor.
Expand All @@ -52,117 +53,80 @@ class MRDOX_VISIBLE
MRDOX_DECL
explicit
ThreadPool(
std::size_t concurrency);
unsigned concurrency);

/** Submit work to be executed.
The signature of the submitted function
object should be `void(void)`.
*/
template<class F>
void
post(F&& f);
async(std::function<void(void)> f);

/** Invoke a function object for each element of a range.
*/
template<class Range, class F>
void forEach(Range&& range, F const& f);

/** Block until all work has completed.
*/
MRDOX_DECL
void
wait();

private:
MRDOX_DECL void do_post(std::unique_ptr<Work>);
};

template<class F>
void
ThreadPool::
post(F&& f)
//------------------------------------------------

/** A subset of possible work in a thread pool.
*/
class MRDOX_VISIBLE
TaskGroup
{
if(! impl_)
{
// no threads
f();
return;
}

struct WorkImpl : Work
{
F f;

~WorkImpl() = default;

explicit WorkImpl(F&& f_)
: f(std::forward<F>(f_))
{
}

void operator()() override
{
f();
}
};

do_post(std::make_unique<WorkImpl>(std::forward<F>(f)));
}
std::unique_ptr<llvm::ThreadPoolTaskGroup> impl_;

public:
MRDOX_DECL
~TaskGroup();

MRDOX_DECL
explicit
TaskGroup(
ThreadPool& threadPool);

/** Submit work to be executed.
The signature of the submitted function
object should be `void(void)`.
*/
void
async(std::function<void(void)> f);

/** Block until all work has completed.
*/
MRDOX_DECL
void
wait();
};

//------------------------------------------------

/** Visit all elements of a range concurrently.
/** Invoke a function object for each element of a range.
*/
template<
class Elements,
class Workers,
class... Args>
Error
template<class Range, class F>
void
ThreadPool::
forEach(
Elements& elements,
Workers& workers,
Args&&... args)
Range&& range,
F const& f)
{
if(std::next(std::begin(workers)) == std::end(workers))
{
// Non-concurrent
auto&& worker(*std::begin(workers));
for(auto&& element : elements)
if(! worker(element, std::forward<Args>(args)...))
return Error("canceled");
return Error::success();
}

std::mutex m;
bool cancel = false;
auto it = std::begin(elements);
auto const end = std::end(elements);
auto const do_work =
[&](auto&& agent)
{
std::unique_lock<std::mutex> lock(m);
if(it == end || cancel)
return false;
auto it0 = it;
++it;
lock.unlock();
bool cancel_ = ! agent(*it0,
std::forward<Args>(args)...);
if(! cancel_)
return true;
cancel = true;
return false;
};
std::vector<std::thread> threads;
for(auto& worker : workers)
threads.emplace_back(std::thread(
[&](auto&& agent)
TaskGroup tg(*this);
for(auto&& value : range)
tg.async(
[&f, &value]
{
for(;;)
if(! do_work(agent))
break;
}, worker));
for(auto& t : threads)
t.join();
if(cancel)
return Error("canceled");
return Error::success();
f(value);
});
tg.wait();
}

} // mrdox
Expand Down
157 changes: 51 additions & 106 deletions source/Support/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,136 +8,81 @@
// Official repository: https://github.com/cppalliance/mrdox
//

#include "Support/Debug.hpp"
#include <mrdox/Support/Thread.hpp>
#include <condition_variable>
#include <cstdlib>
#include <deque>
#include <iterator>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
#include <llvm/Support/ThreadPool.h>
#include <utility>

namespace clang {
namespace mrdox {

ThreadPool::Work::~Work() = default;

class ThreadPool::Impl
{
std::vector<std::thread> threads_;
std::condition_variable work_cv_;
std::condition_variable join_cv_;
std::mutex mutex_;

std::deque<std::unique_ptr<Work>> work_;
std::size_t busy_ = 0;
bool destroy_ = false;

void
run()
{
std::unique_lock<std::mutex> lock(mutex_);
++busy_;
for(;;)
{
--busy_;
work_cv_.wait(lock,
[&]
{
return destroy_ || ! work_.empty();
});
++busy_;
if(! work_.empty())
{
std::unique_ptr<Work> work =
std::move(work_.front());
work_.pop_front();
lock.unlock();
(*work)();
continue;
}
if(destroy_)
break;
}
--busy_;
join_cv_.notify_all();
}

public:
explicit
Impl(std::size_t concurrency)
{
threads_.resize(concurrency);
for(auto& t : threads_)
t = std::thread(&Impl::run, this);
}

~Impl()
{
for(auto& t : threads_)
t.join();
}

void destroy() noexcept
{
std::unique_lock<std::mutex> lock(mutex_);
destroy_ = true;
work_cv_.notify_all();
}

void post(std::unique_ptr<Work> work)
{
std::unique_lock<std::mutex> lock(mutex_);
work_.emplace_back(std::move(work));
work_cv_.notify_one();
}

void wait()
{
std::unique_lock<std::mutex> lock(mutex_);
join_cv_.wait(lock,
[&]
{
return busy_ == 0;
});
}
};
//------------------------------------------------
//
// ThreadPool
//
//------------------------------------------------

ThreadPool::
~ThreadPool()
{
if(impl_)
{
wait();
impl_->destroy();
}
}

ThreadPool::
ThreadPool(
std::size_t concurrency)
unsigned concurrency)
{
if( concurrency == 0)
concurrency = std::thread::hardware_concurrency();
if(concurrency > 1)
impl_ = std::make_unique<Impl>(concurrency);
llvm::ThreadPoolStrategy S;
S.ThreadsRequested = concurrency;
S.Limit = true;
impl_ = std::make_unique<llvm::ThreadPool>(S);
}

void
ThreadPool::
wait()
async(
std::function<void(void)> f)
{
if( impl_)
impl_->wait();
impl_->async(std::move(f));
}

void
ThreadPool::
do_post(
std::unique_ptr<Work> work)
wait()
{
impl_->wait();
}

//------------------------------------------------
//
// TaskGroup
//
//------------------------------------------------

TaskGroup::
~TaskGroup()
{
}

TaskGroup::
TaskGroup(
ThreadPool& threadPool)
: impl_(std::make_unique<llvm::ThreadPoolTaskGroup>(*threadPool.impl_))
{
}

void
TaskGroup::
async(
std::function<void(void)> f)
{
impl_->async(std::move(f));
}

void
TaskGroup::
wait()
{
impl_->post(std::move(work));
impl_->wait();
}

} // mrdox
Expand Down
Loading

0 comments on commit 9121bef

Please sign in to comment.