Skip to content

Commit

Permalink
Deprecate CoreContext thread pools
Browse files Browse the repository at this point in the history
This concept is hard to manage and has very unexpected behavior.  Contexts should be containers used to manage a fixed set of well-defined classes; the thread pool should be used as a utility type for handling pooling operations, and the two types should not really meet.

At a later time it might make sense to have `ThreadPool` inherit `CoreRunnable`, but for now it's fine to keep the interfaces unrelated.
  • Loading branch information
codemercenary committed Feb 10, 2016
1 parent daee07b commit 71d8d41
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 239 deletions.
52 changes: 0 additions & 52 deletions autowiring/CoreContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "MemoEntry.h"
#include "once.h"
#include "result_or_default.h"
#include "ThreadPool.h"
#include "TypeRegistry.h"
#include "TypeUnifier.h"

Expand All @@ -49,10 +48,6 @@ class CoreContextT;
template<typename T>
class JunctionBox;

namespace autowiring {
class ThreadPool;
}

/// \file
/// CoreContext definitions.

Expand Down Expand Up @@ -231,10 +226,6 @@ class CoreContext:
// Actual core threads:
std::list<CoreRunnable*> m_threads;

// The thread pool used by this context. By default, a context inherits the thread pool of
// its parent, and the global context gets the system thread pool.
std::shared_ptr<autowiring::ThreadPool> m_threadPool;

// The start token for the thread pool, if one exists
std::shared_ptr<void> m_startToken;

Expand Down Expand Up @@ -1066,49 +1057,6 @@ class CoreContext:
);
}

/// <summary>
/// Assigns the thread pool handler for this context
/// </summary>
/// <remarks>
/// If the context is currently running, the thread pool will automatically be started. The pool's
/// start token and shared pointer is reset automatically when the context is torn down. If the
/// context has already been shut down (IE, IsShutdown returns true), this method has no effect.
///
/// Dispatchers that have been attached to the current thread pool will not be transitioned to the
/// new pool. Changing the thread pool may cause the previously assigned thread pool to be stopped.
/// This will cause it to complete all work assigned to it and release resources associated with
/// processing. If there are no other handles to the pool, it may potentially destroy itself.
///
/// It is an error to pass nullptr to this method.
/// </remarks>
void SetThreadPool(const std::shared_ptr<autowiring::ThreadPool>& threadPool);

/// <summary>
/// Returns the current thread pool
/// </summary>
/// <remarks>
/// If the context has been shut down, (IE, IsShutdown returns true), this method returns nullptr. Calling
/// ThreadPool::Start on the returned shared pointer will not cause dispatchers pended to this context to
/// be executed. To do this, invoke CoreContext::Initiate
/// </remarks>
std::shared_ptr<autowiring::ThreadPool> GetThreadPool(void) const;

/// <summary>
/// Submits the specified lambda to this context's ThreadPool for processing
/// </summary>
/// <returns>True if the job has been submitted for execution</returns>
/// <remarks>
/// The passed thunk will not be executed if the current context has already stopped.
/// </remarks>
template<class Fx>
bool operator+=(Fx&& fx) {
auto pool = GetThreadPool();
return
pool ?
pool->Submit(std::make_unique<DispatchThunk<Fx>>(std::forward<Fx&&>(fx))) :
false;
}

/// <summary>
/// Adds a post-attachment listener in this context for a particular autowired member.
/// There is no guarantee for the context in which the listener will be called.
Expand Down
2 changes: 1 addition & 1 deletion autowiring/SystemThreadPoolStl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#pragma once
#include "DispatchQueue.h"
#include "SystemThreadPool.h"
#include <thread>
#include <thread>
#include <vector>

namespace autowiring {
Expand Down
12 changes: 12 additions & 0 deletions autowiring/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ class ThreadPool:
/// be submitted for execution.
/// </remarks>
virtual bool Submit(std::unique_ptr<DispatchThunkBase>&& thunk) = 0;

/// <summary>
/// Submits the specified lambda to this context's ThreadPool for processing
/// </summary>
/// <returns>True if the job has been submitted for execution</returns>
/// <remarks>
/// The passed thunk will not be executed if the current context has already stopped.
/// </remarks>
template<class Fx>
bool operator+=(Fx&& fx) {
return Submit(std::make_unique<DispatchThunk<Fx>>(std::forward<Fx&&>(fx)));
}
};

}
87 changes: 1 addition & 86 deletions src/autowiring/CoreContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "NullPool.h"
#include "SystemThreadPool.h"
#include "thread_specific_ptr.h"
#include "ThreadPool.h"
#include <sstream>
#include <stdexcept>

Expand Down Expand Up @@ -58,8 +57,7 @@ CoreContext::CoreContext(const std::shared_ptr<CoreContext>& pParent, t_childLis
m_backReference(backReference),
m_sigilType(sigilType),
m_stateBlock(std::make_shared<CoreContextStateBlock>(pParent ? pParent->m_stateBlock : nullptr)),
m_junctionBoxManager(new JunctionBoxManager),
m_threadPool(std::make_shared<NullPool>())
m_junctionBoxManager(new JunctionBoxManager)
{}

CoreContext::~CoreContext(void) {
Expand Down Expand Up @@ -450,47 +448,9 @@ void CoreContext::Initiate(void) {

// Now we can recover the first thread that will need to be started
auto beginning = m_threads.begin();

// Start our threads before starting any child contexts:
std::shared_ptr<ThreadPool> threadPool;
auto nullPool = std::dynamic_pointer_cast<NullPool>(m_threadPool);
if (nullPool) {
// Decide which pool will become our current thread pool. Global context is the final case,
// which defaults to the system thread pool
if (!nullPool->GetSuccessor())
nullPool->SetSuccessor(m_pParent ? m_pParent->GetThreadPool() : SystemThreadPool::New());

// Trigger null pool destruction at this point:
m_threadPool = nullPool->MoveDispatchersToSuccessor();
}

// The default case should not generally occur, but if it were the case that the null pool were
// updated before the context was initiated, then we would have no work to do as no successors
// exist to be moved. In that case, simply take a record of the current thread pool for the
// call to Start that follows the unlock.
threadPool = m_threadPool;
lk.unlock();
onInitiated();

// Start the thread pool out of the lock, and then update our start token if our thread pool
// reference has not changed. The next pool could potentially be nullptr if the parent is going
// down while we are going up.
if (threadPool) {
// Initiate
auto startToken = threadPool->Start();

// Transfer all dispatchers from the null pool to the new thread pool:
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

// If the thread pool was updated while we were trying to start the pool we observed earlier,
// then allow our token to expire and do not do any other work. Whomever caused the thread
// pool pointer to be updated would also have seen that the context is currently started,
// and would have updated both the thread pool pointer and the start token at the same time.
if (m_threadPool == threadPool)
// Swap, not assign; we don't want teardown to happen while synchronized
std::swap(m_startToken, startToken);
}

if (beginning != m_threads.end()) {
auto outstanding = m_stateBlock->IncrementOutstandingThreadCount(shared_from_this());
for (auto q = beginning; q != m_threads.end(); ++q)
Expand Down Expand Up @@ -554,16 +514,13 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) {

// Thread pool token and pool pointer
std::shared_ptr<void> startToken;
std::shared_ptr<ThreadPool> threadPool;

// Tear down all the children, evict thread pool:
{
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

startToken = std::move(m_startToken);
m_startToken.reset();
threadPool = std::move(m_threadPool);
m_threadPool.reset();

// Fill strong lock series in order to ensure proper teardown interleave:
childrenInterleave.reserve(m_children.size());
Expand Down Expand Up @@ -719,48 +676,6 @@ void CoreContext::BuildCurrentState(void) {
}
}

void CoreContext::SetThreadPool(const std::shared_ptr<ThreadPool>& threadPool) {
if (!threadPool)
throw std::invalid_argument("A context cannot be given a null thread pool");

std::shared_ptr<ThreadPool> priorThreadPool;
{
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
if (IsShutdown())
// Nothing to do, context already down
return;

if (!IsRunning()) {
// Just set up the forwarding thread pool
auto nullPool = std::dynamic_pointer_cast<NullPool>(m_threadPool);
if (!nullPool)
throw autowiring_error("Internal error, null pool was deassigned even though the context has not been started");
priorThreadPool = nullPool->GetSuccessor();
nullPool->SetSuccessor(threadPool);
return;
}

priorThreadPool = m_threadPool;
m_threadPool = threadPool;
}

// We are presently running. We need to start the pool, and then attempt to
// update our token
auto startToken = threadPool->Start();
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
if (m_threadPool != threadPool)
// Thread pool was updated by someone else, let them complete their operation
return;

// Update our start token and return. Swap, not move; we don't want to risk
// calling destructors while synchronized.
std::swap(m_startToken, startToken);
}

std::shared_ptr<ThreadPool> CoreContext::GetThreadPool(void) const {
return (std::lock_guard<std::mutex>)m_stateBlock->m_lock, m_threadPool;
}

void CoreContext::Dump(std::ostream& os) const {
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

Expand Down
121 changes: 21 additions & 100 deletions src/autowiring/test/ThreadPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,124 +11,45 @@
#include "SystemThreadPoolWinLH.hpp"
#endif

template<typename T>
class ThreadPoolTest:
public testing::Test
{};

TEST_F(ThreadPoolTest, SimpleSubmission) {
AutoCurrentContext ctxt;
ctxt->Initiate();

// Simple validation
auto pool = ctxt->GetThreadPool();
ASSERT_NE(nullptr, pool.get()) << "Pool can never be null on an initiated context";
ASSERT_EQ(nullptr, dynamic_cast<autowiring::NullPool*>(pool.get())) << "After context initiation, the pool should not be a null pool";
ASSERT_TRUE(pool->IsStarted()) << "Pool was not started when the enclosing context was initiated";

// Submit a job and then block for its completion. Use a promise to ensure that
// the job is being executed in some other thread context.
auto p = std::make_shared<std::promise<void>>();

*ctxt += [&] {
p->set_value();
};

auto rs = p->get_future();
ASSERT_EQ(std::future_status::ready, rs.wait_for(std::chrono::seconds(5))) << "Thread pool lambda was not dispatched in a timely fashion";
}

TEST_F(ThreadPoolTest, PendBeforeContextStart) {
AutoCurrentContext ctxt;

// Pend
auto barr = std::make_shared<std::promise<void>>();
*ctxt += [barr] { barr->set_value(); };
ASSERT_EQ(2UL, barr.use_count()) << "Lambda was not correctly captured by an uninitiated context";

std::future<void> f = barr->get_future();
ASSERT_EQ(std::future_status::timeout, f.wait_for(std::chrono::milliseconds(1))) << "A pended lambda was completed before the owning context was intiated";

ctxt->Initiate();
ASSERT_EQ(std::future_status::ready, f.wait_for(std::chrono::seconds(5))) << "A lambda did not complete in a timely fashion after its context was started";

// Terminate, verify that we don't capture any more lambdas:
ctxt->SignalShutdown();
ASSERT_EQ(nullptr, ctxt->GetThreadPool()) << "Thread pool was still present on a terminated context";
ASSERT_FALSE(*ctxt += [barr] {}) << "Lambda append operation incorrectly evaluated to true";
ASSERT_TRUE(barr.unique()) << "Lambda was incorrectly captured by a context that was already terminated";
}

TEST_F(ThreadPoolTest, ManualThreadPoolBehavior) {
// Make the manual pool that will be used for this test:
auto pool = std::make_shared<autowiring::ManualThreadPool>();

// Launch a thread that will join the pool:
std::promise<std::shared_ptr<autowiring::ThreadPoolToken>> val;
auto launch = std::async(
std::launch::async,
[pool, &val] {
auto token = pool->PrepareJoin();
val.set_value(token);
pool->Join(token);
}
);

auto valFuture = val.get_future();
ASSERT_EQ(std::future_status::ready, valFuture.wait_for(std::chrono::seconds(5))) << "Join thread took too much time to start up";
auto token = valFuture.get();

// Set up the context
AutoCurrentContext ctxt;
ctxt->SetThreadPool(pool);
ctxt->Initiate();

// Pend some lambdas to be executed by our worker thread:
std::promise<void> hitDone;
std::atomic<int> hitCount{10};
for (size_t i = hitCount; i--; )
*ctxt += [&] {
if (!--hitCount)
hitDone.set_value();
};

// Wait for everything to get hit:
auto hitDoneFuture = hitDone.get_future();
ASSERT_EQ(std::future_status::ready, hitDoneFuture.wait_for(std::chrono::seconds(5))) << "Manual pool did not dispatch lambdas in a timely fashion";
{
public:
ThreadPoolTest(void) {
pool->SuggestThreadPoolSize(2);
token = pool->Start();
}

// Verify that cancellation of the token causes the manual thread to back out
token->Leave();
ASSERT_EQ(std::future_status::ready, launch.wait_for(std::chrono::seconds(5))) << "Token cancellation did not correctly release a single waiting thread";
}
void TearDown(void) {
token.reset();
}

template<typename T>
class SystemThreadPoolTest:
public testing::Test
{};
std::shared_ptr<T> pool = std::make_shared<T>();
std::shared_ptr<void> token;
};

TYPED_TEST_CASE_P(SystemThreadPoolTest);
TYPED_TEST_CASE_P(ThreadPoolTest);

TYPED_TEST_P(SystemThreadPoolTest, PoolOverload) {
TYPED_TEST_P(ThreadPoolTest, PoolOverload) {
AutoCurrentContext ctxt;
auto pool = std::make_shared<TypeParam>();
ctxt->SetThreadPool(pool);
pool->SuggestThreadPoolSize(2);
ctxt->Initiate();

size_t cap = 1000;
auto ctr = std::make_shared<std::atomic<size_t>>(cap);
auto p = std::make_shared<std::promise<void>>();

for (size_t i = cap; i--;)
*ctxt += [=] {
if (!--*ctr)
p->set_value();
};
*pool += [=] {
if (!--*ctr)
p->set_value();
};

auto rs = p->get_future();
ASSERT_EQ(std::future_status::ready, rs.wait_for(std::chrono::seconds(5))) << "Pool saturation did not complete in a timely fashion";
}

REGISTER_TYPED_TEST_CASE_P(SystemThreadPoolTest, PoolOverload);
REGISTER_TYPED_TEST_CASE_P(ThreadPoolTest, PoolOverload);

typedef ::testing::Types<
#ifdef _MSC_VER
Expand All @@ -141,4 +62,4 @@ typedef ::testing::Types<
autowiring::SystemThreadPoolStl
> t_testTypes;

INSTANTIATE_TYPED_TEST_CASE_P(My, SystemThreadPoolTest, t_testTypes);
INSTANTIATE_TYPED_TEST_CASE_P(My, ThreadPoolTest, t_testTypes);

0 comments on commit 71d8d41

Please sign in to comment.