Skip to content

Commit c8fdffb

Browse files
committed
Merge pull request #955 from leapmotion/feature-rundown
Add Rundown behavior to DispatchQueue
2 parents 3a64e9f + f2410e5 commit c8fdffb

File tree

4 files changed

+94
-36
lines changed

4 files changed

+94
-36
lines changed

src/autowiring/CoreThread.cpp

+3-8
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,10 @@ CoreThread::CoreThread(const char* pName):
1111
CoreThread::~CoreThread(void){}
1212

1313
void CoreThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shared_ptr<CoreObject>&& refTracker) {
14-
try {
15-
// If we are asked to rundown while we still have elements in our dispatch queue,
16-
// we must try to process them:
14+
// Kill everything in the dispatch queue and also run it down
15+
{
1716
CurrentContextPusher pshr(ctxt);
18-
DispatchAllEvents();
19-
}
20-
catch(...) {
21-
// We failed to run down the dispatch queue gracefully, we now need to abort it
22-
Abort();
17+
Rundown();
2318
}
2419

2520
// Handoff to base class:

src/autowiring/DispatchQueue.cpp

+54-28
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,56 @@ DispatchQueue::~DispatchQueue(void) {
2929
}
3030
}
3131

32+
void DispatchQueue::ClearQueueInternal(bool executeDispatchers) {
33+
// Do not permit any more lambdas to be pended to our queue
34+
DispatchThunkBase* pHead;
35+
{
36+
std::priority_queue<autowiring::DispatchThunkDelayed> delayedQueue;
37+
std::lock_guard<std::mutex> lk(m_dispatchLock);
38+
onAborted();
39+
m_dispatchCap = 0;
40+
pHead = m_pHead;
41+
m_pHead = nullptr;
42+
m_pTail = nullptr;
43+
delayedQueue = std::move(m_delayedQueue);
44+
}
45+
46+
// Execute dispatchers if asked to do so
47+
if(executeDispatchers)
48+
while(pHead)
49+
try {
50+
auto next = pHead->m_pFlink;
51+
(*pHead)();
52+
delete pHead;
53+
pHead = next;
54+
55+
// Need to update this as we go along due to the requirements of rundown behavior
56+
m_count--;
57+
}
58+
catch (dispatch_aborted_exception&) {
59+
// Silently ignore, as per documentation
60+
} catch(...) {
61+
// Stop executing dispatchers, nothing we can do here
62+
break;
63+
}
64+
65+
// Destroy everything else. Do so in an unsynchronized context in order to prevent reentrancy.
66+
size_t nTraversed = 0;
67+
for (auto cur = pHead; cur;) {
68+
auto next = cur->m_pFlink;
69+
delete cur;
70+
cur = next;
71+
nTraversed++;
72+
}
73+
74+
// Decrement the count by the number of entries we actually traversed. Abort may potentially
75+
// be called from a lambda function, so assigning this value directly to zero would be an error.
76+
m_count -= nTraversed;
77+
78+
// Wake up anyone who is still waiting:
79+
m_queueUpdated.notify_all();
80+
}
81+
3282
bool DispatchQueue::PromoteReadyDispatchersUnsafe(void) {
3383
// Move all ready elements out of the delayed queue and into the dispatch queue:
3484
size_t nInitial = m_delayedQueue.size();
@@ -97,35 +147,11 @@ void DispatchQueue::TryDispatchEventUnsafe(std::unique_lock<std::mutex>& lk) {
97147
}
98148

99149
void DispatchQueue::Abort(void) {
100-
// Do not permit any more lambdas to be pended to our queue
101-
DispatchThunkBase* pHead;
102-
{
103-
std::priority_queue<autowiring::DispatchThunkDelayed> delayedQueue;
104-
std::lock_guard<std::mutex> lk(m_dispatchLock);
105-
onAborted();
106-
m_dispatchCap = 0;
107-
pHead = m_pHead;
108-
m_pHead = nullptr;
109-
m_pTail = nullptr;
110-
delayedQueue = std::move(m_delayedQueue);
111-
}
112-
113-
// Destroy the whole dispatch queue. Do so in an unsynchronized context in order to prevent
114-
// reentrancy.
115-
size_t nTraversed = 0;
116-
for (auto cur = pHead; cur;) {
117-
auto next = cur->m_pFlink;
118-
delete cur;
119-
cur = next;
120-
nTraversed++;
121-
}
122-
123-
// Decrement the count by the number of entries we actually traversed. Abort may potentially
124-
// be called from a lambda function, so assigning this value directly to zero would be an error.
125-
m_count -= nTraversed;
150+
ClearQueueInternal(false);
151+
}
126152

127-
// Wake up anyone who is still waiting:
128-
m_queueUpdated.notify_all();
153+
void DispatchQueue::Rundown(void) {
154+
ClearQueueInternal(true);
129155
}
130156

131157
bool DispatchQueue::Cancel(void) {

src/autowiring/DispatchQueue.h

+22
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ class DispatchQueue {
114114
/// </summary>
115115
void SetDispatcherCap(size_t dispatchCap) { m_dispatchCap = dispatchCap; }
116116

117+
// Internal implementation for abort/rundown
118+
void ClearQueueInternal(bool executeDispatchers);
119+
117120
public:
118121
/// <returns>
119122
/// True if there are curerntly any dispatchers ready for execution--IE, DispatchEvent would return true
@@ -144,6 +147,25 @@ class DispatchQueue {
144147
/// </remarks>
145148
void Abort(void);
146149

150+
/// <summary>
151+
/// Graceful version of Abort
152+
/// </summary>
153+
/// <remarks>
154+
/// In a synchronized context, all attached lambdas are guaranteed to be called when this function returns.
155+
/// No guarantees are made in an unsynchronized context.
156+
///
157+
/// Any delayed dispatchers that are ready at the time of the call will be invoked. All oter delayed
158+
/// dispatchers will be aborted.
159+
///
160+
/// If a dispatcher throws any exception other than dispatch_aborted_exception, the remaining dispatchers
161+
/// will be aborted.
162+
///
163+
/// This method may be safely called from within a dispatcher.
164+
///
165+
/// This method is idempotent.
166+
/// </remarks>
167+
void Rundown(void);
168+
147169
/// <summary>
148170
/// Causes the very first lambda on the dispatch queue to be deleted without running it
149171
/// </summary>

src/autowiring/test/DispatchQueueTest.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -344,3 +344,18 @@ TEST_F(DispatchQueueTest, DelayedAbort) {
344344
dq.Abort();
345345
ASSERT_TRUE(v.unique()) << "A delayed dispatcher was leaked after a call to Abort";
346346
}
347+
348+
TEST_F(DispatchQueueTest, Rundown) {
349+
auto called = std::make_shared<bool>(false);
350+
auto notCalled = std::make_shared<bool>(false);
351+
352+
DispatchQueue dq;
353+
dq += [called] { *called = true; };
354+
dq.Rundown();
355+
ASSERT_TRUE(*called) << "Dispatcher was not invoked during rundown as expected";
356+
ASSERT_TRUE(called.unique()) << "Rundown dispatcher was leaked";
357+
358+
dq += [notCalled] { *notCalled = true; };
359+
ASSERT_FALSE(*notCalled) << "Dispatcher was incorrectly invoked during rundown";
360+
ASSERT_TRUE(notCalled.unique()) << "Rejected dispatcher was leaked";
361+
}

0 commit comments

Comments
 (0)