Skip to content

Commit 5915bef

Browse files
committed
chore: executor group
1 parent ed8e91f commit 5915bef

File tree

2 files changed

+231
-33
lines changed

2 files changed

+231
-33
lines changed

include/mrdox/Support/Thread.hpp

+209-23
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414

1515
#include <mrdox/Platform.hpp>
1616
#include <mrdox/Support/Error.hpp>
17-
#include <functional>
17+
#include <atomic>
18+
#include <condition_variable>
19+
#include <deque>
1820
#include <iterator>
21+
#include <memory>
1922
#include <mutex>
2023
#include <thread>
24+
#include <tuple>
25+
#include <type_traits>
2126
#include <utility>
2227
#include <vector>
2328

@@ -33,6 +38,74 @@ class TaskGroup;
3338

3439
//------------------------------------------------
3540

41+
class unlock_guard
42+
{
43+
std::mutex& m_;
44+
45+
public:
46+
explicit
47+
unlock_guard(std::mutex& m)
48+
: m_(m)
49+
{
50+
m_.unlock();
51+
}
52+
53+
~unlock_guard()
54+
{
55+
m_.lock();
56+
}
57+
};
58+
59+
//------------------------------------------------
60+
61+
template<class>
62+
class any_callable;
63+
64+
template<class R, class... Args>
65+
class any_callable<R(Args...)>
66+
{
67+
struct base
68+
{
69+
virtual ~base() = default;
70+
virtual R invoke(Args&&...args) = 0;
71+
};
72+
73+
std::unique_ptr<base> p_;
74+
75+
public:
76+
any_callable() = delete;
77+
78+
template<class Callable>
79+
requires std::is_invocable_r_v<R, Callable, Args...>
80+
any_callable(Callable&& f)
81+
{
82+
class impl : public base
83+
{
84+
Callable f_;
85+
86+
public:
87+
explicit impl(Callable&& f)
88+
: f_(std::forward<Callable>(f))
89+
{
90+
}
91+
92+
R invoke(Args&&... args) override
93+
{
94+
return f_(std::forward<Args>(args)...);
95+
}
96+
};
97+
98+
p_ = std::make_unique<impl>(std::forward<Callable>(f));
99+
}
100+
101+
R operator()(Args&&...args) const
102+
{
103+
return p_->invoke(std::forward<Args>(args)...);
104+
}
105+
};
106+
107+
//------------------------------------------------
108+
36109
/** A pool of threads for executing work concurrently.
37110
*/
38111
class MRDOX_VISIBLE
@@ -43,6 +116,11 @@ class MRDOX_VISIBLE
43116
friend class TaskGroup;
44117

45118
public:
119+
template<class Agent> struct arg_ty { using type = Agent; };
120+
template<class Agent> struct arg_ty<Agent&> { using type =
121+
std::conditional_t< std::is_const_v<Agent>, Agent, Agent&>; };
122+
template<class Agent> using arg_t = typename arg_ty<Agent>::type;
123+
46124
/** Destructor.
47125
*/
48126
MRDOX_DECL
@@ -82,8 +160,12 @@ class MRDOX_VISIBLE
82160
The signature of the submitted function
83161
object should be `void(void)`.
84162
*/
163+
template<class F>
85164
void
86-
async(std::function<void(void)> f);
165+
async(F&& f)
166+
{
167+
post(std::forward<F>(f));
168+
}
87169

88170
/** Invoke a function object for each element of a range.
89171
*/
@@ -95,6 +177,9 @@ class MRDOX_VISIBLE
95177
MRDOX_DECL
96178
void
97179
wait();
180+
181+
private:
182+
MRDOX_DECL void post(any_callable<void(void)>);
98183
};
99184

100185
//------------------------------------------------
@@ -120,14 +205,21 @@ class MRDOX_VISIBLE
120205
The signature of the submitted function
121206
object should be `void(void)`.
122207
*/
208+
template<class F>
123209
void
124-
async(std::function<void(void)> f);
210+
async(F&& f)
211+
{
212+
post(std::forward<F>(f));
213+
}
125214

126215
/** Block until all work has completed.
127216
*/
128217
MRDOX_DECL
129218
void
130219
wait();
220+
221+
private:
222+
MRDOX_DECL void post(any_callable<void(void)>);
131223
};
132224

133225
//------------------------------------------------
@@ -153,48 +245,142 @@ forEach(
153245

154246
//------------------------------------------------
155247

156-
#if 0
157248
/** A set of execution agents for performing concurrent work.
158249
*/
159-
template<class... Args>
160-
class ExecutionGroup
250+
template<class Agent>
251+
class ExecutorGroup
161252
{
162-
std::vector<Agent*> agents_;
163-
std::vector<std::function<void(Agent&)>> work_;
164-
ThreadPool threadPool_;
165-
TaskGroup taskGroup_;
253+
struct Impl
254+
{
255+
std::mutex mutex_;
256+
std::condition_variable cv_;
257+
};
258+
259+
ThreadPool& threadPool_;
260+
std::unique_ptr<Impl> impl_;
261+
std::vector<std::unique_ptr<Agent>> agents_;
262+
std::deque<any_callable<void(Agent&)>> work_;
263+
std::size_t busy_ = 0;
166264

167265
public:
168-
template<class Agents>
266+
template<class T>
267+
using arg_t = ThreadPool::arg_t<T>;
268+
269+
ExecutorGroup(ExecutorGroup const&) = delete;
270+
ExecutorGroup& operator=(ExecutorGroup&&) = delete;
271+
ExecutorGroup& operator=(ExecutorGroup const&) = delete;
272+
ExecutorGroup(ExecutorGroup&&) = default;
273+
169274
explicit
170-
Executors(
171-
Agents&& agents,
172-
ThreadPool &threadPool)
275+
ExecutorGroup(
276+
ThreadPool& threadPool) noexcept
173277
: threadPool_(threadPool)
278+
, impl_(std::make_unique<Impl>())
279+
{
280+
}
281+
282+
/** Construct a new agent in the group.
283+
284+
The behavior is undefined if there is
285+
any outstanding work or busy threads.
286+
*/
287+
template<class... Args>
288+
void
289+
emplace(Args&&... args)
174290
{
175-
agents_.reserve(std::distance(
176-
std::begin(agents), std::end(agents)));
177-
for(auto& agent : agents)
178-
agents_.push_back(&agent);
291+
agents_.emplace_back(std::make_unique<Agent>(
292+
std::forward<Args>(args)...));
179293
}
180294

181295
/** Submit work to be executed.
182296
183-
The signature of the submitted function
184-
object should be `void(Agent&)`.
297+
The function object must have this
298+
equivalent signature:
299+
@code
300+
void( Agent&, Args... );
301+
@endcode
185302
*/
303+
template<class F, class... Args>
186304
void
187-
async(std::function<void(void)> f);
305+
async(F&& f, Args&&... args)
306+
{
307+
static_assert(std::is_invocable_v<F, Agent&, arg_t<Args>...>);
308+
std::unique_lock<std::mutex> lock(impl_->mutex_);
309+
work_.emplace_back(
310+
[
311+
f = std::forward<F>(f),
312+
args = std::tuple<arg_t<Args>...>(args...)
313+
](Agent& agent)
314+
{
315+
std::apply(f, std::tuple_cat(
316+
std::tuple<Agent&>(agent),
317+
std::move(args)));
318+
});
319+
if(agents_.empty())
320+
return;
321+
run(std::move(lock));
322+
}
188323

189324
/** Block until all work has completed.
190325
*/
191326
void
192327
wait()
193328
{
194-
taskGroup_.wait();
329+
std::unique_lock<std::mutex> lock(impl_->mutex_);
330+
impl_->cv_.wait(lock,
331+
[&]
332+
{
333+
return work_.empty() && busy_ == 0;
334+
});
335+
}
336+
337+
private:
338+
class scoped_agent
339+
{
340+
ExecutorGroup& group_;
341+
std::unique_ptr<Agent> agent_;
342+
343+
public:
344+
scoped_agent(
345+
ExecutorGroup& group,
346+
std::unique_ptr<Agent> agent) noexcept
347+
: group_(group)
348+
, agent_(std::move(agent))
349+
{
350+
}
351+
352+
~scoped_agent()
353+
{
354+
--group_.busy_;
355+
group_.agents_.emplace_back(std::move(agent_));
356+
group_.impl_->cv_.notify_all();
357+
}
358+
};
359+
360+
void
361+
run(std::unique_lock<std::mutex> lock)
362+
{
363+
std::unique_ptr<Agent> agent(std::move(agents_.back()));
364+
agents_.pop_back();
365+
++busy_;
366+
367+
threadPool_.async(
368+
[this, agent = std::move(agent)]() mutable
369+
{
370+
scoped_agent scope(*this, std::move(agent));
371+
std::unique_lock<std::mutex> lock(impl_->mutex_);
372+
for(;;)
373+
{
374+
if(work_.empty())
375+
break;
376+
any_callable<void(Agent&)> work(std::move(work_.front()));
377+
work_.pop_front();
378+
unlock_guard unlock(impl_->mutex_);
379+
work(*agent);
380+
}
381+
});
195382
}
196383
};
197-
#endif
198384

199385
} // mrdox
200386
} // clang

source/Support/Thread.cpp

+22-10
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,23 @@ getThreadCount() const noexcept
5757

5858
void
5959
ThreadPool::
60-
async(
61-
std::function<void(void)> f)
60+
wait()
6261
{
63-
impl_->async(std::move(f));
62+
impl_->wait();
6463
}
6564

6665
void
6766
ThreadPool::
68-
wait()
67+
post(
68+
any_callable<void(void)> f)
6969
{
70-
impl_->wait();
70+
auto sp = std::make_shared<
71+
any_callable<void(void)>>(std::move(f));
72+
impl_->async(
73+
[sp]
74+
{
75+
(*sp)();
76+
});
7177
}
7278

7379
//------------------------------------------------
@@ -90,17 +96,23 @@ TaskGroup(
9096

9197
void
9298
TaskGroup::
93-
async(
94-
std::function<void(void)> f)
99+
wait()
95100
{
96-
impl_->async(std::move(f));
101+
impl_->wait();
97102
}
98103

99104
void
100105
TaskGroup::
101-
wait()
106+
post(
107+
any_callable<void(void)> f)
102108
{
103-
impl_->wait();
109+
auto sp = std::make_shared<
110+
any_callable<void(void)>>(std::move(f));
111+
impl_->async(
112+
[sp]
113+
{
114+
(*sp)();
115+
});
104116
}
105117

106118
} // mrdox

0 commit comments

Comments
 (0)