Skip to content

Commit

Permalink
Extended functionality adding stream-like processing of the tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Alk authored and Alk committed Dec 25, 2018
1 parent 10387db commit fe04128
Show file tree
Hide file tree
Showing 14 changed files with 754 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#pragma once

#include "internal/ExecutionStream.h"
#include "internal/ExecutionQueue.h"
#include "internal/TaskProviderList.h"
#include "internal/TaskProviderUtils.h"
Expand All @@ -33,37 +34,50 @@
namespace execq
{
/**
* @class ExecutionQueueSource
* @class ExecutionPool
* @brief ThreadPool-like object that provides concurrent task execution. Tasks are provided by 'ExecutionQueue' instances.
*/
class ExecutionQueueSource: private details::IExecutionQueueDelegate
class ExecutionPool: private details::IExecutionQueueDelegate, private details::IExecutionStreamDelegate
{
public:
/**
* @brief Creates pool that works as factory for execution queues.
* Usually you should create single instance of ExecutionQueueSource with multiple IExecutionQueue's to achive best performance.
* @brief Creates pool that works as factory for execution queues and streams.
* Usually you should create single instance of ExecutionPool with multiple IExecutionQueue/IExecutionStream to achive best performance.
*/
ExecutionQueueSource();
~ExecutionQueueSource();
ExecutionPool();
~ExecutionPool();

/**
* @brief Creates execution queue with specific processing function.
* @discussion All objects pushed into the queue will be processed on either one of pool threads or on the queue-specific thread.
*/
template <typename T>
std::unique_ptr<IExecutionQueue<T>> createExecutionQueue(std::function<void(const std::atomic_bool& shouldStop, T object)> executor);
std::unique_ptr<IExecutionQueue<T>> createExecutionQueue(std::function<void(const std::atomic_bool& shouldQuit, T object)> executor);

/**
* @brief Creates execution stream with specific executee function. Stream is stopped by default.
* @discussion When stream started, 'executee' function will be called each time when ExecutionSource have free thread.
*/
std::unique_ptr<IExecutionStream> createExecutionStream(std::function<void(const std::atomic_bool& shouldQuit)> executee);

private: // details::IExecutionQueueDelegate
virtual void registerTaskProvider(details::ITaskProvider& taskProvider) final;
virtual void unregisterTaskProvider(const details::ITaskProvider& taskProvider) final;
virtual void taskProviderDidReceiveNewTask() final;
virtual void registerQueueTaskProvider(details::ITaskProvider& taskProvider) final;
virtual void unregisterQueueTaskProvider(const details::ITaskProvider& taskProvider) final;
virtual void queueDidReceiveNewTask() final;

private: // details::IExecutionStreamDelegate
virtual void registerStreamTaskProvider(details::ITaskProvider& taskProvider) final;
virtual void unregisterStreamTaskProvider(const details::ITaskProvider& taskProvider) final;
virtual void streamDidStart() final;

private:
void registerTaskProvider(details::ITaskProvider& taskProvider);
void unregisterTaskProvider(const details::ITaskProvider& taskProvider);
void workerThread();
void shutdown();

private:
std::atomic_bool m_shouldStop { false };
std::atomic_bool m_shouldQuit { false };

details::TaskProviderList m_taskProviders;
std::mutex m_providersMutex;
Expand All @@ -73,56 +87,8 @@ namespace execq
};
}

execq::ExecutionQueueSource::ExecutionQueueSource()
{
const uint32_t defaultThreadCount = 4;
const uint32_t hardwareThreadCount = std::thread::hardware_concurrency();

const uint32_t threadCount = hardwareThreadCount ? hardwareThreadCount : defaultThreadCount;
for (uint32_t i = 0; i < threadCount; i++)
{
m_threads.emplace_back(std::async(std::launch::async, std::bind(&ExecutionQueueSource::workerThread, this)));
}
}

execq::ExecutionQueueSource::~ExecutionQueueSource()
{
shutdown();
}

void execq::ExecutionQueueSource::shutdown()
{
std::lock_guard<std::mutex> lock(m_providersMutex);
m_shouldStop = true;
m_providersCondition.notify_all();
}

template <typename T>
std::unique_ptr<execq::IExecutionQueue<T>> execq::ExecutionQueueSource::createExecutionQueue(std::function<void(const std::atomic_bool& shouldStop, T object)> executor)
std::unique_ptr<execq::IExecutionQueue<T>> execq::ExecutionPool::createExecutionQueue(std::function<void(const std::atomic_bool& shouldQuit, T object)> executor)
{
return std::unique_ptr<details::ExecutionQueue<T>>(new details::ExecutionQueue<T>(*this, std::move(executor)));
}

void execq::ExecutionQueueSource::registerTaskProvider(details::ITaskProvider& taskProvider)
{
std::lock_guard<std::mutex> lock(m_providersMutex);

m_taskProviders.add(taskProvider);
}

void execq::ExecutionQueueSource::unregisterTaskProvider(const details::ITaskProvider& taskProvider)
{
std::lock_guard<std::mutex> lock(m_providersMutex);

m_taskProviders.remove(taskProvider);
}

void execq::ExecutionQueueSource::taskProviderDidReceiveNewTask()
{
m_providersCondition.notify_one();
}

void execq::ExecutionQueueSource::workerThread()
{
WorkerThread(m_taskProviders, m_providersCondition, m_providersMutex, m_shouldStop);
}
9 changes: 8 additions & 1 deletion include/execq/IExecutionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ namespace execq
* @brief Pushed an object to be process on the queue.
*/
template <typename Y = T>
void push(Y&& object) { pushImpl(std::make_unique<T>(std::forward<Y>(object))); }
void push(Y&& object);

private:
virtual void pushImpl(std::unique_ptr<T> object) = 0;
};
}

template <typename T>
template <typename Y>
void execq::IExecutionQueue<T>::push(Y&& object)
{
pushImpl(std::unique_ptr<T>(new T(std::forward<Y>(object))));
}
49 changes: 49 additions & 0 deletions include/execq/IExecutionStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* MIT License
*
* Copyright (c) 2018 Alkenso (Vladimir Vashurkin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

#pragma once

#include <memory>

namespace execq
{
class IExecutionStream
{
public:
virtual ~IExecutionStream() = default;

/**
* @brief Starts execution stream.
* Each time when thread in pool becomes free, execution stream will be prompted of next task to execute.
*/
virtual void start() = 0;

/**
* @brief Stops execution stream.
* Execution stream will not be prompted of next tasks to execute until 'start' is called.
* All tasks being executed during stop will normally continue.
*/
virtual void stop() = 0;
};
}
43 changes: 14 additions & 29 deletions include/execq/internal/ExecutionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

#pragma once

#include "ITaskProvider.h"
#include "IExecutionQueue.h"
#include "TaskProviderUtils.h"

#include <queue>
#include <mutex>
Expand All @@ -42,17 +42,17 @@ namespace execq
public:
virtual ~IExecutionQueueDelegate() = default;

virtual void registerTaskProvider(ITaskProvider& taskProvider) = 0;
virtual void unregisterTaskProvider(const details::ITaskProvider& taskProvider) = 0;
virtual void taskProviderDidReceiveNewTask() = 0;
virtual void registerQueueTaskProvider(ITaskProvider& taskProvider) = 0;
virtual void unregisterQueueTaskProvider(const details::ITaskProvider& taskProvider) = 0;
virtual void queueDidReceiveNewTask() = 0;
};


template <typename T>
class ExecutionQueue: public IExecutionQueue<T>, private ITaskProvider
{
public:
ExecutionQueue(IExecutionQueueDelegate& delegate, std::function<void(const std::atomic_bool& shouldStop, T object)> executor);
ExecutionQueue(IExecutionQueueDelegate& delegate, std::function<void(const std::atomic_bool& shouldQuit, T object)> executor);
~ExecutionQueue();

private: // IExecutionQueue
Expand All @@ -66,7 +66,7 @@ namespace execq
void waitPendingTasks();

private:
std::atomic_bool m_shouldStop { false };
std::atomic_bool m_shouldQuit { false };

std::atomic_size_t m_pendingTaskCount { 0 };
std::mutex m_taskCompleteMutex;
Expand All @@ -77,29 +77,27 @@ namespace execq
std::condition_variable m_taskQueueCondition;

std::reference_wrapper<IExecutionQueueDelegate> m_delegate;
std::function<void(const std::atomic_bool& shouldStop, T object)> m_executor;
std::function<void(const std::atomic_bool& shouldQuit, T object)> m_executor;

std::future<void> m_thread;
};

Task PopTaskFromQueue(std::queue<Task>& queue);
}
}

template <typename T>
execq::details::ExecutionQueue<T>::ExecutionQueue(IExecutionQueueDelegate& delegate, std::function<void(const std::atomic_bool& shouldStop, T object)> executor)
execq::details::ExecutionQueue<T>::ExecutionQueue(IExecutionQueueDelegate& delegate, std::function<void(const std::atomic_bool& shouldQuit, T object)> executor)
: m_delegate(delegate)
, m_executor(std::move(executor))
, m_thread(std::async(std::launch::async, std::bind(&ExecutionQueue::queueThreadWorker, this)))
{
m_delegate.get().registerTaskProvider(*this);
m_delegate.get().registerQueueTaskProvider(*this);
}

template <typename T>
execq::details::ExecutionQueue<T>::~ExecutionQueue()
{
m_shouldStop = true;
m_delegate.get().unregisterTaskProvider(*this);
m_shouldQuit = true;
m_delegate.get().unregisterQueueTaskProvider(*this);
m_taskQueueCondition.notify_all();
waitPendingTasks();
}
Expand All @@ -113,13 +111,13 @@ void execq::details::ExecutionQueue<T>::pushImpl(std::unique_ptr<T> object)

std::shared_ptr<T> sharedObject = std::move(object);
m_taskQueue.push(Task([this, sharedObject] () {
m_executor(m_shouldStop, std::move(*sharedObject));
m_executor(m_shouldQuit, std::move(*sharedObject));

m_pendingTaskCount--;
m_taskCompleteCondition.notify_one();
}));

m_delegate.get().taskProviderDidReceiveNewTask();
m_delegate.get().queueDidReceiveNewTask();
m_taskQueueCondition.notify_one();
}

Expand Down Expand Up @@ -148,7 +146,7 @@ void execq::details::ExecutionQueue<T>::queueThreadWorker()
};

NonblockingTaskProvider taskProvider(m_taskQueue);
WorkerThread(taskProvider, m_taskQueueCondition, m_taskQueueMutex, m_shouldStop);
WorkerThread(taskProvider, m_taskQueueCondition, m_taskQueueMutex, m_shouldQuit);
}

template <typename T>
Expand All @@ -160,16 +158,3 @@ void execq::details::ExecutionQueue<T>::waitPendingTasks()
m_taskCompleteCondition.wait(lock);
}
}

execq::details::Task execq::details::PopTaskFromQueue(std::queue<Task>& queue)
{
if (queue.empty())
{
return Task();
}

Task task = std::move(queue.front());
queue.pop();

return task;
}
85 changes: 85 additions & 0 deletions include/execq/internal/ExecutionStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* MIT License
*
* Copyright (c) 2018 Alkenso (Vladimir Vashurkin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

#pragma once

#include "IExecutionStream.h"
#include "TaskProviderUtils.h"

#include <queue>
#include <mutex>
#include <thread>
#include <atomic>
#include <condition_variable>

namespace execq
{
namespace details
{
class IExecutionStreamDelegate
{
public:
virtual ~IExecutionStreamDelegate() = default;

virtual void registerStreamTaskProvider(details::ITaskProvider& taskProvider) = 0;
virtual void unregisterStreamTaskProvider(const details::ITaskProvider& taskProvider) = 0;
virtual void streamDidStart() = 0;
};


class ExecutionStream: public IExecutionStream, private ITaskProvider
{
public:
ExecutionStream(IExecutionStreamDelegate& delegate, std::function<void(const std::atomic_bool& shouldQuit)> executee);
~ExecutionStream();

public: // IExecutionStream
virtual void start() final;
virtual void stop() final;

private: // ITaskProvider
virtual Task nextTask() final;

private:
void streamThreadWorker();
void waitPendingTasks();

private:
std::atomic_bool m_shouldQuit { false };
std::atomic_bool m_started { false };

std::atomic_size_t m_pendingTaskCount { 0 };
std::mutex m_taskCompleteMutex;
std::condition_variable m_taskCompleteCondition;

std::mutex m_taskStartMutex;
std::condition_variable m_taskStartCondition;

std::reference_wrapper<IExecutionStreamDelegate> m_delegate;
std::function<void(const std::atomic_bool& shouldQuit)> m_executee;

std::future<void> m_thread;
};
}
}
Loading

0 comments on commit fe04128

Please sign in to comment.