Skip to content

Commit 6f7212c

Browse files
author
Veronica Zheng
committed
Merge pull request #824 from leapmotion/feature-onaborted
Make `m_aborted` a `once_signal`
2 parents 38e401d + 5a21260 commit 6f7212c

File tree

3 files changed

+46
-16
lines changed

3 files changed

+46
-16
lines changed

autowiring/DispatchQueue.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#pragma once
33
#include "dispatch_aborted_exception.h"
44
#include "DispatchThunk.h"
5+
#include "once.h"
56
#include <atomic>
67
#include <list>
78
#include <queue>
@@ -33,6 +34,10 @@ class DispatchQueue {
3334
/// </remarks>
3435
virtual ~DispatchQueue(void);
3536

37+
// True if DispatchQueue::Abort has been called. This will cause the dispatch queue's remaining entries
38+
// to be dumped and prevent the introduction of new entries to the queue.
39+
autowiring::once_signal<DispatchQueue> onAborted;
40+
3641
protected:
3742
// The maximum allowed number of pended dispatches before pended calls start getting dropped
3843
size_t m_dispatchCap = 1024;
@@ -56,10 +61,6 @@ class DispatchQueue {
5661
// Notice when the dispatch queue has been updated:
5762
std::condition_variable m_queueUpdated;
5863

59-
// True if DispatchQueue::Abort has been called. This will cause the dispatch queue's remaining entries
60-
// to be dumped and prevent the introduction of new entries to the queue.
61-
bool m_aborted = false;
62-
6364
/// <summary>
6465
/// Moves all ready events from the delayed queue into the dispatch queue
6566
/// </summary>
@@ -163,6 +164,9 @@ class DispatchQueue {
163164
/// Waits until a lambda function is ready to run in this thread's dispatch queue,
164165
/// dispatches the function, and then returns.
165166
/// </summary>
167+
/// <remarks>
168+
/// This method will throw dispatch_aborted_exception if the queue has been aborted at the time of the call
169+
/// </remarks>
166170
void WaitForEvent(void);
167171

168172
/// <summary>

src/autowiring/DispatchQueue.cpp

+12-12
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ DispatchQueue::DispatchQueue(size_t dispatchCap):
1212

1313
DispatchQueue::DispatchQueue(DispatchQueue&& q):
1414
m_dispatchCap(q.m_dispatchCap),
15-
m_aborted(q.m_aborted)
15+
onAborted(std::move(q.onAborted))
1616
{
17-
if (!m_aborted)
17+
if (!onAborted)
1818
*this += std::move(q);
1919
}
2020

@@ -72,7 +72,7 @@ void DispatchQueue::Abort(void) {
7272
DispatchThunkBase* pHead;
7373
{
7474
std::lock_guard<std::mutex> lk(m_dispatchLock);
75-
m_aborted = true;
75+
onAborted();
7676
m_dispatchCap = 0;
7777
pHead = m_pHead;
7878
m_pHead = nullptr;
@@ -104,15 +104,15 @@ void DispatchQueue::WakeAllWaitingThreads(void) {
104104

105105
void DispatchQueue::WaitForEvent(void) {
106106
std::unique_lock<std::mutex> lk(m_dispatchLock);
107-
if (m_aborted)
107+
if (onAborted)
108108
throw dispatch_aborted_exception("Dispatch queue was aborted prior to waiting for an event");
109109

110110
// Unconditional delay:
111111
uint64_t version = m_version;
112112
m_queueUpdated.wait(
113113
lk,
114114
[this, version] {
115-
if (m_aborted)
115+
if (onAborted)
116116
throw dispatch_aborted_exception("Dispatch queue was aborted while waiting for an event");
117117

118118
return
@@ -153,7 +153,7 @@ bool DispatchQueue::WaitForEvent(std::chrono::steady_clock::time_point wakeTime)
153153
}
154154

155155
bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
156-
if (m_aborted)
156+
if (onAborted)
157157
throw dispatch_aborted_exception("Dispatch queue was aborted prior to waiting for an event");
158158

159159
while (!m_pHead) {
@@ -165,7 +165,7 @@ bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::ch
165165
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);
166166

167167
// Short-circuit if the queue was aborted
168-
if (m_aborted)
168+
if (onAborted)
169169
throw dispatch_aborted_exception("Dispatch queue was aborted while waiting for an event");
170170

171171
if (PromoteReadyDispatchersUnsafe())
@@ -223,7 +223,7 @@ bool DispatchQueue::Barrier(std::chrono::nanoseconds timeout) {
223223
std::unique_lock<std::mutex> lk(m_dispatchLock);
224224

225225
// Short-circuit if dispatching has been aborted
226-
if (m_aborted)
226+
if (onAborted)
227227
throw dispatch_aborted_exception("Dispatch queue was aborted before a timed wait was attempted");
228228

229229
// Short-circuit if the queue is already empty
@@ -246,8 +246,8 @@ bool DispatchQueue::Barrier(std::chrono::nanoseconds timeout) {
246246
lk.lock();
247247

248248
// Wait until our variable is satisfied, which might be right away:
249-
bool rv = m_queueUpdated.wait_for(lk, timeout, [&] { return m_aborted || *complete; });
250-
if (m_aborted)
249+
bool rv = m_queueUpdated.wait_for(lk, timeout, [&] { return onAborted || *complete; });
250+
if (onAborted)
251251
throw dispatch_aborted_exception("Dispatch queue was aborted during a timed wait");
252252
return rv;
253253
}
@@ -259,8 +259,8 @@ void DispatchQueue::Barrier(void) {
259259

260260
// Obtain the lock, wait until our variable is satisfied, which might be right away:
261261
std::unique_lock<std::mutex> lk(m_dispatchLock);
262-
m_queueUpdated.wait(lk, [&] { return m_aborted || complete; });
263-
if (m_aborted)
262+
m_queueUpdated.wait(lk, [&] { return onAborted || complete; });
263+
if (onAborted)
264264
// At this point, the dispatch queue MUST be completely run down. We have no outstanding references
265265
// to our stack-allocated "complete" variable. Furthermore, after m_aborted is true, no further
266266
// dispatchers are permitted to be run.

src/autowiring/test/DispatchQueueTest.cpp

+26
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,29 @@ TEST_F(DispatchQueueTest, ZeroIdenticalToPend) {
223223
ASSERT_NE(1, observation) << "Zero-delay lambda was not executed";
224224
ASSERT_EQ(101, observation) << "Zero-delay lambda did not run in the order it was pended";
225225
}
226+
227+
TEST_F(DispatchQueueTest, AbortObserver) {
228+
DispatchQueue dq;
229+
dq += [] {};
230+
231+
std::thread t{ [&dq] {
232+
try {
233+
for (;;)
234+
dq.WaitForEvent();
235+
}
236+
catch (dispatch_aborted_exception&) {}
237+
}};
238+
auto x = MakeAtExit([&] {
239+
dq.Abort();
240+
t.join();
241+
});
242+
243+
// Pend a few operations, barrier for their completion, and then cancel:
244+
for (size_t i = 0; i < 100; i++)
245+
dq += [i] {};
246+
247+
bool onAbortedCalled = false;
248+
dq.onAborted += [&] { onAbortedCalled = true; };
249+
dq.Abort();
250+
ASSERT_TRUE(onAbortedCalled) << "Abort signal handler not asserted as expected";
251+
}

0 commit comments

Comments
 (0)