-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask.cc
179 lines (164 loc) · 3.97 KB
/
task.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#include "task.hh"
//static members of the Task class
//---------------------------------------------------------------------------//
bool Task::_taskLibInited = false;
std::shared_ptr<std::thread> Task::_reaper;
bool Task::_reaperAskedToExit = false;
std::mutex Task::_reaperMutex;
std::condition_variable Task::_reaperCondVar;
Task* Task::_reaperObject = NULL;
bool Task::_reaperSignalled = false;
//---------------------------------------------------------------------------//
//our reaper logic is simple wait on the reaper promise and
//future for 2 seconds and reap the objects who exited.
void
Task::_reaperLogic()
{
//std::cerr<<"\nReaper started";
std::unique_lock<std::mutex> _reaperLock(Task::_reaperMutex);
while(!_reaperAskedToExit)
{
std::cv_status rv = _reaperCondVar.wait_for(_reaperLock, std::chrono::seconds(START_WAIT_TIME));
if ((rv != std::cv_status::timeout) && Task::_reaperSignalled) {
Task *tptr = Task::_reaperObject;
assert(tptr);
delete tptr;
Task::_reaperObject = NULL;
Task::_reaperSignalled = false;
//std::cerr<<"\nReaper just reaped:"<<tptr;
}
}
//std::cerr<<"\nReaper no more active";
return;
}
//initialize the akorp book keeping information
void
Task::_initializeTaskLib()
{
if (Task::_taskLibInited) return;
Task::_reaper = std::make_shared<std::thread>(Task::_reaperLogic);
Task::_taskLibInited = true;
atexit(Task::_finalizeTaskLib);
return;
}
void
Task::_finalizeTaskLib()
{
if (!Task::_taskLibInited) return;
Task::_reaperAskedToExit = true;
Task::_reaper->join();
Task::_taskLibInited = false;
return;
}
//inform the reaper to reap our exit status.
void Task::_informReaper()
{
Task::_reaperObject = getDerivedObjPtr();
std::unique_lock<std::mutex> _reaperLock(_reaperMutex);
Task::_reaperSignalled = true;
Task::_reaperCondVar.notify_one();
return;
}
//the executor which calls the thread function up on a start trigger.
void Task::_run()
{
//wait for the start trigger to run on the future.
std::future_status rv = _runFuture.wait_for(std::chrono::seconds(START_WAIT_TIME));
if (rv == std::future_status::ready)
{
if (_runFuture.get() == true)
{
_aboutToRun = false;
_thrFunc();
if (!_askedToStop) _informReaper();
return;
}
}
else if (rv == std::future_status::timeout) {
std::cerr<<"\nTimed out waiting for run trigger.";
assert(0);
}
return;
}
Task::Task(std::function<void(void)>f) :
_runFuture(_runPromise.get_future()),
_askedToStop(false),
_aboutToRun(true),
_joinDone(false),
_thrFunc(f)
{
if (!Task::_taskLibInited) Task::_initializeTaskLib();
_thread = std::make_shared<std::thread>(std::bind(&Task::_run, this));
return;
};
//The thread logic should always check this flag whether its asked to stop or not.
bool Task::askedToStop()
{
return _askedToStop;
}
//This is always called from another thread context so we need to
//invoke join.
Task::~Task()
{
if(_aboutToRun) _runPromise.set_value(false);
_askedToStop = true;
if(!_joinDone) { _thread->join(); _joinDone = true; }
return;
}
//method to be called by the other thread to stop us.
void Task::stop()
{
assert(!_aboutToRun);
_askedToStop = true;
return;
}
//method to be called by the other thread to wait for us.
void Task::join()
{
assert(!_joinDone);
_thread->join();
_joinDone = true;
return;
}
//to be called by the object itself when it has done its construction successfully
//infact this should be the last call in the constructor.
void Task::start()
{
assert(_aboutToRun);
_runPromise.set_value(true);
return;
}
#if 0
//compile with g++-4.7 -g -pthread -std=c++0x task.cc -o task
class lazyObject : public Task
{
int count;
public:
lazyObject(int c) :
Task([&](){
//Logic of the task
for (int i = 0 ;i < count ; i++) {
std::cerr<<"\nneverending nonsense";
sleep(1);
}
}),
count(c)
{
start();
return;
}
~lazyObject()
{
stop();
return;
};
Task*getDerivedObjPtr() { return this; }
};
int
main(int ac, char **av)
{
lazyObject *l = new lazyObject(5);
l->join();
return 0;
}
#endif