Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make m_aborted a once_signal #824

Merged
merged 1 commit into from
Dec 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions autowiring/DispatchQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#pragma once
#include "dispatch_aborted_exception.h"
#include "DispatchThunk.h"
#include "once.h"
#include <atomic>
#include <list>
#include <queue>
Expand Down Expand Up @@ -33,6 +34,10 @@ class DispatchQueue {
/// </remarks>
virtual ~DispatchQueue(void);

// True if DispatchQueue::Abort has been called. This will cause the dispatch queue's remaining entries
// to be dumped and prevent the introduction of new entries to the queue.
autowiring::once_signal<DispatchQueue> onAborted;

protected:
// The maximum allowed number of pended dispatches before pended calls start getting dropped
size_t m_dispatchCap = 1024;
Expand All @@ -56,10 +61,6 @@ class DispatchQueue {
// Notice when the dispatch queue has been updated:
std::condition_variable m_queueUpdated;

// True if DispatchQueue::Abort has been called. This will cause the dispatch queue's remaining entries
// to be dumped and prevent the introduction of new entries to the queue.
bool m_aborted = false;

/// <summary>
/// Moves all ready events from the delayed queue into the dispatch queue
/// </summary>
Expand Down Expand Up @@ -163,6 +164,9 @@ class DispatchQueue {
/// Waits until a lambda function is ready to run in this thread's dispatch queue,
/// dispatches the function, and then returns.
/// </summary>
/// <remarks>
/// This method will throw dispatch_aborted_exception if the queue has been aborted at the time of the call
/// </remarks>
void WaitForEvent(void);

/// <summary>
Expand Down
24 changes: 12 additions & 12 deletions src/autowiring/DispatchQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ DispatchQueue::DispatchQueue(size_t dispatchCap):

DispatchQueue::DispatchQueue(DispatchQueue&& q):
m_dispatchCap(q.m_dispatchCap),
m_aborted(q.m_aborted)
onAborted(std::move(q.onAborted))
{
if (!m_aborted)
if (!onAborted)
*this += std::move(q);
}

Expand Down Expand Up @@ -72,7 +72,7 @@ void DispatchQueue::Abort(void) {
DispatchThunkBase* pHead;
{
std::lock_guard<std::mutex> lk(m_dispatchLock);
m_aborted = true;
onAborted();
m_dispatchCap = 0;
pHead = m_pHead;
m_pHead = nullptr;
Expand Down Expand Up @@ -104,15 +104,15 @@ void DispatchQueue::WakeAllWaitingThreads(void) {

void DispatchQueue::WaitForEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if (m_aborted)
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted prior to waiting for an event");

// Unconditional delay:
uint64_t version = m_version;
m_queueUpdated.wait(
lk,
[this, version] {
if (m_aborted)
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted while waiting for an event");

return
Expand Down Expand Up @@ -153,7 +153,7 @@ bool DispatchQueue::WaitForEvent(std::chrono::steady_clock::time_point wakeTime)
}

bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
if (m_aborted)
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted prior to waiting for an event");

while (!m_pHead) {
Expand All @@ -165,7 +165,7 @@ bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::ch
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);

// Short-circuit if the queue was aborted
if (m_aborted)
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted while waiting for an event");

if (PromoteReadyDispatchersUnsafe())
Expand Down Expand Up @@ -223,7 +223,7 @@ bool DispatchQueue::Barrier(std::chrono::nanoseconds timeout) {
std::unique_lock<std::mutex> lk(m_dispatchLock);

// Short-circuit if dispatching has been aborted
if (m_aborted)
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted before a timed wait was attempted");

// Short-circuit if the queue is already empty
Expand All @@ -246,8 +246,8 @@ bool DispatchQueue::Barrier(std::chrono::nanoseconds timeout) {
lk.lock();

// Wait until our variable is satisfied, which might be right away:
bool rv = m_queueUpdated.wait_for(lk, timeout, [&] { return m_aborted || *complete; });
if (m_aborted)
bool rv = m_queueUpdated.wait_for(lk, timeout, [&] { return onAborted || *complete; });
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted during a timed wait");
return rv;
}
Expand All @@ -259,8 +259,8 @@ void DispatchQueue::Barrier(void) {

// Obtain the lock, wait until our variable is satisfied, which might be right away:
std::unique_lock<std::mutex> lk(m_dispatchLock);
m_queueUpdated.wait(lk, [&] { return m_aborted || complete; });
if (m_aborted)
m_queueUpdated.wait(lk, [&] { return onAborted || complete; });
if (onAborted)
// At this point, the dispatch queue MUST be completely run down. We have no outstanding references
// to our stack-allocated "complete" variable. Furthermore, after m_aborted is true, no further
// dispatchers are permitted to be run.
Expand Down
26 changes: 26 additions & 0 deletions src/autowiring/test/DispatchQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,29 @@ TEST_F(DispatchQueueTest, ZeroIdenticalToPend) {
ASSERT_NE(1, observation) << "Zero-delay lambda was not executed";
ASSERT_EQ(101, observation) << "Zero-delay lambda did not run in the order it was pended";
}

TEST_F(DispatchQueueTest, AbortObserver) {
DispatchQueue dq;
dq += [] {};

std::thread t{ [&dq] {
try {
for (;;)
dq.WaitForEvent();
}
catch (dispatch_aborted_exception&) {}
}};
auto x = MakeAtExit([&] {
dq.Abort();
t.join();
});

// Pend a few operations, barrier for their completion, and then cancel:
for (size_t i = 0; i < 100; i++)
dq += [i] {};

bool onAbortedCalled = false;
dq.onAborted += [&] { onAbortedCalled = true; };
dq.Abort();
ASSERT_TRUE(onAbortedCalled) << "Abort signal handler not asserted as expected";
}