diff --git a/.travis.yml b/.travis.yml index 44b092a..500689e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,17 +16,30 @@ addons: packages: - libboost1.70-dev - valgrind + - gdb env: - CXXSTANDARD=11 - CXXSTANDARD=14 - CXXSTANDARD=17 +after_failure: + - COREFILE=$(find $TRAVIS_BUILD_DIR/cores -maxdepth 1 -name "core.*" -print | head -1) + - echo $COREFILE + - if [[ -f "$COREFILE" ]]; then gdb -c "$COREFILE" $EXEC -ex "thread apply all bt" -ex "set pagination 0" -batch; fi + before_script: + - ulimit -c + - ulimit -a -S + - ulimit -a -H + - mkdir -p -m 0777 $TRAVIS_BUILD_DIR/cores + - sudo sysctl -w kernel.core_pattern=$TRAVIS_BUILD_DIR/cores/core.%e.%p + - cat /proc/sys/kernel/core_pattern - git clone https://github.com/google/googletest.git - cd googletest && cmake -DCMAKE_INSTALL_PREFIX=install . - make install - cd .. script: - - cmake -DQUANTUM_ENABLE_TESTS=ON -DQUANTUM_BOOST_USE_VALGRIND=ON -DCMAKE_INSTALL_PREFIX=tests -DGTEST_ROOT=googletest/install . + - ulimit -c unlimited -S #enable core dumps + - cmake -DCMAKE-BUILD_TYPE=Debug -DQUANTUM_ENABLE_TESTS=ON -DQUANTUM_BOOST_USE_VALGRIND=ON -DCMAKE_INSTALL_PREFIX=tests -DGTEST_ROOT=googletest/install . - make QuantumTests && ./tests/QuantumTests.Linux64 diff --git a/CMakeLists.txt b/CMakeLists.txt index b9e7c1d..c05076b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,7 +53,7 @@ if (NOT CMAKE_CXX_STANDARD) endif() #Set compile flags if CXXFLAGS environment variable is not set if (NOT CMAKE_CXX_FLAGS) - set(CMAKE_CXX_FLAGS "-Wall -Wextra -O0 -m${MODE} -std=c++${CMAKE_CXX_STANDARD} -ftemplate-backtrace-limit=0") + set(CMAKE_CXX_FLAGS "-Wall -Wextra -O0 -m${MODE} -std=c++${CMAKE_CXX_STANDARD} -ftemplate-backtrace-limit=0 -faligned-new") endif() if (QUANTUM_VERBOSE_MAKEFILE) message(STATUS "CMAKE_CXX_FLAGS = ${CMAKE_CXX_FLAGS}") diff --git a/quantum/impl/quantum_condition_variable_impl.h b/quantum/impl/quantum_condition_variable_impl.h index 9ca1bee..b767c3b 100644 --- a/quantum/impl/quantum_condition_variable_impl.h +++ b/quantum/impl/quantum_condition_variable_impl.h @@ -32,24 +32,18 @@ ConditionVariable::ConditionVariable() : inline ConditionVariable::~ConditionVariable() { - Mutex::Guard lock(_thisLock); + Mutex::Guard lock(local::context(), _thisLock); _destroyed = true; } inline void ConditionVariable::notifyOne() { - notifyOneImpl(nullptr); + notifyOne(nullptr); } inline void ConditionVariable::notifyOne(ICoroSync::Ptr sync) -{ - notifyOneImpl(sync); -} - -inline -void ConditionVariable::notifyOneImpl(ICoroSync::Ptr sync) { //LOCKED OR UNLOCKED SCOPE Mutex::Guard lock(sync, _thisLock); @@ -64,17 +58,11 @@ void ConditionVariable::notifyOneImpl(ICoroSync::Ptr sync) inline void ConditionVariable::notifyAll() { - notifyAllImpl(nullptr); + notifyAll(nullptr); } inline void ConditionVariable::notifyAll(ICoroSync::Ptr sync) -{ - notifyAllImpl(sync); -} - -inline -void ConditionVariable::notifyAllImpl(ICoroSync::Ptr sync) { //LOCKED OR UNLOCKED SCOPE Mutex::Guard lock(sync, _thisLock); @@ -88,7 +76,7 @@ void ConditionVariable::notifyAllImpl(ICoroSync::Ptr sync) inline void ConditionVariable::wait(Mutex& mutex) { - waitImpl(nullptr, mutex); + wait(nullptr, mutex); } inline @@ -102,7 +90,7 @@ template void ConditionVariable::wait(Mutex& mutex, PREDICATE predicate) { - waitImpl(nullptr, mutex, std::move(predicate)); + wait(nullptr, mutex, std::move(predicate)); } template @@ -117,12 +105,7 @@ template bool ConditionVariable::waitFor(Mutex& mutex, const std::chrono::duration& time) { - if (time == std::chrono::milliseconds(-1)) - { - waitImpl(nullptr, mutex); - return true; - } - return waitForImpl(nullptr, mutex, time); + return waitFor(nullptr, mutex, time); } template @@ -143,12 +126,7 @@ bool ConditionVariable::waitFor(Mutex& mutex, const std::chrono::duration& time, PREDICATE predicate) { - if (time == std::chrono::milliseconds(-1)) - { - waitImpl(nullptr, mutex, std::move(predicate)); - return true; - } - return waitForImpl(nullptr, mutex, time, std::move(predicate)); + return waitFor(nullptr, mutex, time, predicate); } template @@ -157,6 +135,11 @@ bool ConditionVariable::waitFor(ICoroSync::Ptr sync, const std::chrono::duration& time, PREDICATE predicate) { + if (time == std::chrono::milliseconds(-1)) + { + waitImpl(sync, mutex, predicate); + return true; + } return waitForImpl(sync, mutex, time, std::move(predicate)); } diff --git a/quantum/impl/quantum_context_impl.h b/quantum/impl/quantum_context_impl.h index a00ea2a..992ec16 100644 --- a/quantum/impl/quantum_context_impl.h +++ b/quantum/impl/quantum_context_impl.h @@ -239,7 +239,8 @@ template template int ICoroContext::closeBuffer() { - return static_cast(this)->closeBuffer(); + std::shared_ptr ctx = static_cast(this)->shared_from_this(); + return ctx->closeBuffer(ctx); } template @@ -664,6 +665,12 @@ void Context::terminate() } } +template +TaskId Context::taskId() const +{ + return _task->getTaskId(); +} + template bool Context::validAt(int num) const { @@ -679,7 +686,7 @@ bool Context::valid() const template int Context::setException(std::exception_ptr ex) { - return _promises.back()->setException(ex); + return _promises.back()->setException(this->shared_from_this(), ex); } template @@ -1109,18 +1116,11 @@ Context::mapReduceBatch(INPUT_IT first, Functions::ReduceFunc{std::move(reducer)}); } -template -template -int Context::set(V&& value) -{ - return std::static_pointer_cast>(_promises.back())->set(std::forward(value)); -} - template template void Context::push(V&& value) { - std::static_pointer_cast>(_promises.back())->push(std::forward(value)); + push(nullptr, std::forward(value)); } template @@ -1134,7 +1134,7 @@ template template BufferRetType Context::pull(bool& isBufferClosed) { - return std::static_pointer_cast>(_promises.back())->getIThreadFuture()->pull(isBufferClosed); + return pull(nullptr, isBufferClosed); } template @@ -1148,73 +1148,79 @@ template template int Context::closeBuffer() { - return std::static_pointer_cast>(_promises.back())->closeBuffer(); + return closeBuffer(nullptr); +} + +template +template +int Context::closeBuffer(ICoroSync::Ptr sync) +{ + return std::static_pointer_cast>(_promises.back())->closeBuffer(sync); } template template NonBufferRetType Context::getAt(int num) { - return std::static_pointer_cast>(_promises[index(num)])->getIThreadFuture()->get(); + return getAt(num, nullptr); } template template const NonBufferRetType& Context::getRefAt(int num) const { - return std::static_pointer_cast>(_promises[index(num)])->getIThreadFuture()->getRef(); + return getRefAt(num, nullptr); } template template NonBufferRetType Context::get() { - return getAt(-1); + return get(nullptr); } template template const NonBufferRetType& Context::getRef() const { - return getRefAt(-1); + return getRef(nullptr); } template void Context::waitAt(int num) const { - _promises[index(num)]->getIThreadFutureBase()->wait(); + waitAt(num, nullptr); } template std::future_status Context::waitForAt(int num, std::chrono::milliseconds timeMs) const { - return _promises[index(num)]->getIThreadFutureBase()->waitFor(timeMs); + return waitForAt(num, nullptr, timeMs); } template void Context::wait() const { - waitAt(-1); + wait(nullptr); } template std::future_status Context::waitFor(std::chrono::milliseconds timeMs) const { - return waitForAt(-1, timeMs); + return waitFor(nullptr, timeMs); } template void Context::waitAll() const { - for (auto&& promise : _promises) - { - try - { - promise->getIThreadFutureBase()->wait(); - } - catch(...) //catch all broken promises or any other exception - {} - } + waitAll(nullptr); +} + +template +template +int Context::set(V&& value) +{ + return set(nullptr, std::forward(value)); } template @@ -1227,7 +1233,7 @@ int Context::set(ICoroSync::Ptr sync, V&& value) template template NonBufferRetType Context::getAt(int num, - ICoroSync::Ptr sync) + ICoroSync::Ptr sync) { validateContext(sync); return std::static_pointer_cast>(_promises[index(num)])->getICoroFuture()->get(sync); @@ -1236,7 +1242,7 @@ NonBufferRetType Context::getAt(int num, template template const NonBufferRetType& Context::getRefAt(int num, - ICoroSync::Ptr sync) const + ICoroSync::Ptr sync) const { validateContext(sync); return std::static_pointer_cast>(_promises[index(num)])->getICoroFuture()->getRef(sync); diff --git a/quantum/impl/quantum_future_impl.h b/quantum/impl/quantum_future_impl.h index f089db5..e98e4ce 100644 --- a/quantum/impl/quantum_future_impl.h +++ b/quantum/impl/quantum_future_impl.h @@ -99,30 +99,26 @@ template template NonBufferRetType Future::get() { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->get(); + return get(nullptr); } template template const NonBufferRetType& Future::getRef() const { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->getRef(); + return getRef(nullptr); } template void Future::wait() const { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->wait(); + return wait(nullptr); } template std::future_status Future::waitFor(std::chrono::milliseconds timeMs) const { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->waitFor(timeMs); + return waitFor(nullptr, timeMs); } template @@ -160,8 +156,7 @@ template template BufferRetType Future::pull(bool& isBufferClosed) { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->pull(isBufferClosed); + return pull(nullptr, isBufferClosed); } template diff --git a/quantum/impl/quantum_io_queue_impl.h b/quantum/impl/quantum_io_queue_impl.h index fd939c5..25b4c68 100644 --- a/quantum/impl/quantum_io_queue_impl.h +++ b/quantum/impl/quantum_io_queue_impl.h @@ -126,6 +126,8 @@ void IoQueue::run() } } + // set the current task + IQueue::TaskSetterGuard taskSetter(*this, task); //========================= START TASK ========================= int rc = task->run(); //========================== END TASK ========================== diff --git a/quantum/impl/quantum_io_task_impl.h b/quantum/impl/quantum_io_task_impl.h index 6a025b7..dfe1e0c 100644 --- a/quantum/impl/quantum_io_task_impl.h +++ b/quantum/impl/quantum_io_task_impl.h @@ -43,7 +43,8 @@ IoTask::IoTask(std::true_type, _func(Util::bindIoCaller(promise, std::forward(func), std::forward(args)...)), _terminated(false), _queueId(queueId), - _isHighPriority(isHighPriority) + _isHighPriority(isHighPriority), + _taskId(ThreadContextTag{}) { } @@ -57,7 +58,8 @@ IoTask::IoTask(std::false_type, _func(Util::bindIoCaller2(promise, std::forward(func), std::forward(args)...)), _terminated(false), _queueId(queueId), - _isHighPriority(isHighPriority) + _isHighPriority(isHighPriority), + _taskId(ThreadContextTag{}) { } @@ -80,7 +82,11 @@ void IoTask::terminate() inline int IoTask::run() { - return _func ? _func() : (int)ITask::RetCode::NotCallable; + if (_func) { + _taskId.assignCurrentThread(); + return _func(); + } + return (int)ITask::RetCode::NotCallable; } inline @@ -90,7 +96,7 @@ void IoTask::setQueueId(int queueId) } inline -int IoTask::getQueueId() +int IoTask::getQueueId() const { return _queueId; } @@ -101,6 +107,12 @@ ITask::Type IoTask::getType() const return ITask::Type::IO; } +inline +TaskId IoTask::getTaskId() const +{ + return _taskId; +} + inline bool IoTask::isBlocked() const { @@ -125,6 +137,12 @@ bool IoTask::isSuspended() const return false; } +inline +ITask::LocalStorage& IoTask::getLocalStorage() +{ + return _localStorage; +} + inline void* IoTask::operator new(size_t) { diff --git a/quantum/impl/quantum_local_impl.h b/quantum/impl/quantum_local_impl.h index 200a553..c378a4a 100644 --- a/quantum/impl/quantum_local_impl.h +++ b/quantum/impl/quantum_local_impl.h @@ -19,22 +19,22 @@ //#################################### IMPLEMENTATIONS ######################################### //############################################################################################## #include -#include #include #include namespace Bloomberg { namespace quantum { + namespace local { template T*& variable(const std::string& key) { // default thread local map to be used outside of coroutines - thread_local Task::CoroLocalStorage defaultStorage; + thread_local ITask::LocalStorage defaultStorage; - Task* task = TaskQueue::getCurrentTask(); - Task::CoroLocalStorage& storage = task ? task->getCoroLocalStorage() : defaultStorage; + ITask::Ptr task = IQueue::getCurrentTask(); + ITask::LocalStorage& storage = task ? task->getLocalStorage() : defaultStorage; void** r = &storage.emplace(key, nullptr).first->second; return *reinterpret_cast(r); @@ -43,12 +43,20 @@ T*& variable(const std::string& key) inline VoidContextPtr context() { - Task* task = TaskQueue::getCurrentTask(); - if (!task) + ITask::Ptr task = IQueue::getCurrentTask(); + if (!task || (task->getType() == ITask::Type::IO)) { return nullptr; } - return std::static_pointer_cast, ITaskAccessor>(task->getTaskAccessor()); + return std::static_pointer_cast, ITaskAccessor> + (std::static_pointer_cast(task)->getTaskAccessor()); +} + +inline +TaskId taskId() +{ + ITask::Ptr task = IQueue::getCurrentTask(); + return task ? task->getTaskId() : TaskId(TaskId::ThisThreadTag{}); } }}} diff --git a/quantum/impl/quantum_mutex_impl.h b/quantum/impl/quantum_mutex_impl.h index cebc028..b2c9c67 100644 --- a/quantum/impl/quantum_mutex_impl.h +++ b/quantum/impl/quantum_mutex_impl.h @@ -18,10 +18,17 @@ //############################################################################################## //#################################### IMPLEMENTATIONS ######################################### //############################################################################################## +#include namespace Bloomberg { namespace quantum { +//fwd declarations +namespace local { + VoidContextPtr context(); + TaskId taskId(); +} + inline void yield(ICoroSync::Ptr sync) { @@ -37,24 +44,16 @@ void yield(ICoroSync::Ptr sync) //============================================================================================== // class Mutex //============================================================================================== -inline -Mutex::Mutex() -{} - inline void Mutex::lock() { - lockImpl(nullptr); + //Application must use the other lock() overload if we are running inside a coroutine + assert(!local::context()); + lock(nullptr); } inline void Mutex::lock(ICoroSync::Ptr sync) -{ - lockImpl(sync); -} - -inline -void Mutex::lockImpl(ICoroSync::Ptr sync) { while (!tryLock()) { @@ -65,12 +64,24 @@ void Mutex::lockImpl(ICoroSync::Ptr sync) inline bool Mutex::tryLock() { - return _spinlock.tryLock(); + bool rc = _spinlock.tryLock(); + if (rc) { + //mutex is locked + _taskId = local::taskId(); + //task id must be valid + assert(_taskId != TaskId{}); + } + return rc; } inline void Mutex::unlock() { + if (_taskId != local::taskId()) { + //invalid operation + assert(false); + } + _taskId = TaskId{}; //reset the task id _spinlock.unlock(); } @@ -78,36 +89,35 @@ void Mutex::unlock() // class Mutex::Guard //============================================================================================== inline -Mutex::Guard::Guard(Mutex& mutex, - bool tryLock) : - _mutex(mutex) +Mutex::Guard::Guard(Mutex& mutex) : + Mutex::Guard::Guard(nullptr, mutex) { - if (tryLock) - { - _ownsLock = _mutex.tryLock(); - } - else - { - _mutex.lock(); - _ownsLock = true; - } } inline Mutex::Guard::Guard(ICoroSync::Ptr sync, - Mutex& mutex, - bool tryLock) : - _mutex(mutex) + Mutex& mutex) : + _mutex(&mutex), + _ownsLock(true) { - if (tryLock) - { - _ownsLock = _mutex.tryLock(); - } - else - { - _mutex.lock(sync); - _ownsLock = true; - } + _mutex->lock(std::move(sync)); +} + +inline +Mutex::Guard::Guard(Mutex& mutex, + Mutex::TryToLock) : + _mutex(&mutex), + _ownsLock(_mutex->tryLock()) +{ +} + +inline +Mutex::Guard::Guard(Mutex& mutex, + Mutex::AdoptLock) : + _mutex(&mutex), + _ownsLock(true) +{ + assert(!_mutex->tryLock()); } inline @@ -119,7 +129,46 @@ bool Mutex::Guard::ownsLock() const inline Mutex::Guard::~Guard() { - _mutex.unlock(); + unlock(); +} + +inline +void Mutex::Guard::lock() +{ + lock(nullptr); +} + +inline +void Mutex::Guard::lock(ICoroSync::Ptr sync) +{ + if (_ownsLock) return; + assert(_mutex); + _mutex->lock(std::move(sync)); + _ownsLock = true; +} + +inline +bool Mutex::Guard::tryLock() +{ + if (_ownsLock) return true; + assert(_mutex); + _ownsLock = _mutex->tryLock(); + return _ownsLock; +} + +inline +void Mutex::Guard::unlock() +{ + if (!_ownsLock) return; + _mutex->unlock(); + _ownsLock = false; +} + +inline +void Mutex::Guard::release() +{ + _ownsLock = false; + _mutex = nullptr; } //============================================================================================== @@ -127,24 +176,23 @@ Mutex::Guard::~Guard() //============================================================================================== inline Mutex::ReverseGuard::ReverseGuard(Mutex& mutex) : - _mutex(mutex) + Mutex::ReverseGuard::ReverseGuard(nullptr, mutex) { - _mutex.unlock(); } inline Mutex::ReverseGuard::ReverseGuard(ICoroSync::Ptr sync, Mutex& mutex) : - _mutex(mutex), + _mutex(&mutex), _sync(sync) { - _mutex.unlock(); + _mutex->unlock(); } inline Mutex::ReverseGuard::~ReverseGuard() { - _mutex.lock(_sync); + _mutex->lock(_sync); } }} diff --git a/quantum/impl/quantum_promise_impl.h b/quantum/impl/quantum_promise_impl.h index a5105be..9ca7912 100644 --- a/quantum/impl/quantum_promise_impl.h +++ b/quantum/impl/quantum_promise_impl.h @@ -105,7 +105,7 @@ void Promise::terminate() bool value{false}; if (_terminated.compare_exchange_strong(value, true)) { - if (_sharedState) _sharedState->breakPromise(); + if (_sharedState) _sharedState->breakPromise(local::context()); } } @@ -117,9 +117,15 @@ bool Promise::valid() const template int Promise::setException(std::exception_ptr ex) +{ + return setException(nullptr, ex); +} + +template +int Promise::setException(ICoroSync::Ptr sync, std::exception_ptr ex) { if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->setException(ex); + return _sharedState->setException(sync, ex); } template @@ -140,8 +146,7 @@ template template int Promise::set(V&& value) { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->set(std::forward(value)); + return set(nullptr, std::forward(value)); } template @@ -170,8 +175,7 @@ template template void Promise::push(V&& value) { - if (!_sharedState) ThrowFutureException(FutureState::NoState); - _sharedState->push(std::forward(value)); + push(nullptr, std::forward(value)); } template @@ -185,9 +189,16 @@ void Promise::push(ICoroSync::Ptr sync, V&& value) template template int Promise::closeBuffer() +{ + return closeBuffer(nullptr); +} + +template +template +int Promise::closeBuffer(ICoroSync::Ptr sync) { if (!_sharedState) ThrowFutureException(FutureState::NoState); - return _sharedState->closeBuffer(); + return _sharedState->closeBuffer(sync); } template diff --git a/quantum/impl/quantum_read_write_mutex_impl.h b/quantum/impl/quantum_read_write_mutex_impl.h new file mode 100644 index 0000000..8d2f42b --- /dev/null +++ b/quantum/impl/quantum_read_write_mutex_impl.h @@ -0,0 +1,298 @@ +/* +** Copyright 2020 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +//NOTE: DO NOT INCLUDE DIRECTLY + +//############################################################################################## +//#################################### IMPLEMENTATIONS ######################################### +//############################################################################################## + +namespace Bloomberg { +namespace quantum { + +void yield(ICoroSync::Ptr sync); + +//============================================================================================== +// class ReadWriteMutex +//============================================================================================== +inline +void ReadWriteMutex::lockRead() +{ + //Application must use the other lockRead() overload if we are running inside a coroutine + assert(!local::context()); + lockRead(nullptr); +} + +inline +void ReadWriteMutex::lockRead(ICoroSync::Ptr sync) +{ + while (!tryLockRead()) + { + yield(sync); + } +} + +inline +void ReadWriteMutex::lockWrite() +{ + //Application must use the other lockWrite() overload if we are running inside a coroutine + assert(!local::context()); + lockWrite(nullptr); +} + +inline +void ReadWriteMutex::lockWrite(ICoroSync::Ptr sync) +{ + while (!tryLockWrite()) + { + yield(sync); + } +} + +inline +bool ReadWriteMutex::tryLockRead() +{ + return _spinlock.tryLockRead(); +} + +inline +bool ReadWriteMutex::tryLockWrite() +{ + bool rc = _spinlock.tryLockWrite(); + if (rc) { + //mutex is write-locked + _taskId = local::taskId(); + //task id must be valid + assert(_taskId != TaskId{}); + } + return rc; +} + +inline +void ReadWriteMutex::unlockRead() +{ + _spinlock.unlockRead(); +} + +inline +void ReadWriteMutex::unlockWrite() +{ + if (_taskId != local::taskId()) { + //invalid operation + assert(false); + } + _taskId = TaskId{}; //reset the task id + _spinlock.unlockWrite(); +} + +inline +bool ReadWriteMutex::isLocked() const +{ + return _spinlock.isLocked(); +} + +inline +bool ReadWriteMutex::isReadLocked() const +{ + return _spinlock.isReadLocked(); +} + +inline +bool ReadWriteMutex::isWriteLocked() const +{ + return _spinlock.isWriteLocked(); +} + +inline +int ReadWriteMutex::numReaders() const +{ + return _spinlock.numReaders(); +} + +//============================================================================================== +// class ReadGuard +//============================================================================================== +inline +ReadWriteMutex::ReadGuard::ReadGuard(ReadWriteMutex& lock) : + ReadWriteMutex::ReadGuard::ReadGuard(nullptr, lock) +{ +} + +inline +ReadWriteMutex::ReadGuard::ReadGuard(ICoroSync::Ptr sync, + ReadWriteMutex& lock) : + _mutex(&lock), + _ownsLock(true) +{ + _mutex->lockRead(sync); +} + +inline +ReadWriteMutex::ReadGuard::ReadGuard(ReadWriteMutex& lock, + ReadWriteMutex::TryToLock) : + _mutex(&lock), + _ownsLock(_mutex->tryLockRead()) +{ +} + +inline +ReadWriteMutex::ReadGuard::ReadGuard(ReadWriteMutex& lock, + ReadWriteMutex::AdoptLock) : + _mutex(&lock), + _ownsLock(true) +{ +} + +inline +ReadWriteMutex::ReadGuard::~ReadGuard() +{ + unlock(); +} + +inline +void ReadWriteMutex::ReadGuard::lock() +{ + lock(nullptr); +} + +inline +void ReadWriteMutex::ReadGuard::lock(ICoroSync::Ptr sync) +{ + if (_ownsLock) return; + assert(_mutex); + _mutex->lockRead(sync); + _ownsLock = true; +} + +inline +bool ReadWriteMutex::ReadGuard::tryLock() +{ + + if (_ownsLock) return true; + assert(_mutex); + _ownsLock = _mutex->tryLockRead(); + return _ownsLock; +} + +inline +void ReadWriteMutex::ReadGuard::unlock() +{ + if (!_ownsLock) return; + _mutex->unlockRead(); + _ownsLock = false; +} + +inline +void ReadWriteMutex::ReadGuard::release() +{ + _ownsLock = false; + _mutex = nullptr; +} + +inline +bool ReadWriteMutex::ReadGuard::ownsLock() const +{ + return _ownsLock; +} + +//============================================================================================== +// class WriteGuard +//============================================================================================== +inline +ReadWriteMutex::WriteGuard::WriteGuard(ReadWriteMutex& lock) : + ReadWriteMutex::WriteGuard::WriteGuard(nullptr, lock) +{ +} + +inline +ReadWriteMutex::WriteGuard::WriteGuard(ICoroSync::Ptr sync, + ReadWriteMutex& lock) : + _mutex(&lock), + _ownsLock(true) +{ + _mutex->lockWrite(sync); +} + +inline +ReadWriteMutex::WriteGuard::WriteGuard(ReadWriteMutex& lock, + ReadWriteMutex::TryToLock) : + _mutex(&lock), + _ownsLock(_mutex->tryLockWrite()) +{ +} + +inline +ReadWriteMutex::WriteGuard::WriteGuard(ReadWriteMutex& lock, + ReadWriteMutex::AdoptLock) : + _mutex(&lock), + _ownsLock(true) +{ + assert(!_mutex->tryLockWrite()); +} + +inline +ReadWriteMutex::WriteGuard::~WriteGuard() +{ + unlock(); +} + +inline +void ReadWriteMutex::WriteGuard::lock() +{ + lock(nullptr); +} + +inline +void ReadWriteMutex::WriteGuard::lock(ICoroSync::Ptr sync) +{ + if (_ownsLock) return; + assert(_mutex); + _mutex->lockWrite(sync); + _ownsLock = true; +} + +inline +bool ReadWriteMutex::WriteGuard::tryLock() +{ + + if (_ownsLock) return true; + assert(_mutex); + _ownsLock = _mutex->tryLockWrite(); + return _ownsLock; +} + +inline +void ReadWriteMutex::WriteGuard::unlock() +{ + if (!_ownsLock) return; + _mutex->unlockWrite(); + _ownsLock = false; +} + +inline +void ReadWriteMutex::WriteGuard::release() +{ + _ownsLock = false; + _mutex = nullptr; +} + +inline +bool ReadWriteMutex::WriteGuard::ownsLock() const +{ + return _ownsLock; +} + +}} + diff --git a/quantum/impl/quantum_read_write_spinlock_impl.h b/quantum/impl/quantum_read_write_spinlock_impl.h index 8053c2c..d9aa0ca 100644 --- a/quantum/impl/quantum_read_write_spinlock_impl.h +++ b/quantum/impl/quantum_read_write_spinlock_impl.h @@ -19,35 +19,21 @@ //#################################### IMPLEMENTATIONS ######################################### //############################################################################################## +#include + namespace Bloomberg { namespace quantum { inline void ReadWriteSpinLock::lockRead() { - int oldValue = 0; - int newValue = 1; - while(!_count.compare_exchange_weak(oldValue, newValue, std::memory_order_acq_rel)) - { - if (oldValue == -1) - { - oldValue = 0; - newValue = 1; - } - else - { - newValue = oldValue + 1; - } - } + SpinLockUtil::lockShared(_count, -1, 0, 1); } inline void ReadWriteSpinLock::lockWrite() { - int i{0}; - while (!_count.compare_exchange_weak(i, -1, std::memory_order_acq_rel)) { - i = 0; - } + SpinLockUtil::lockExclusive(_count, -1, 0); } inline @@ -55,16 +41,17 @@ bool ReadWriteSpinLock::tryLockRead() { int oldValue = 0; int newValue = 1; - while(!_count.compare_exchange_weak(oldValue, newValue, std::memory_order_acq_rel)) + while (!_count.compare_exchange_weak(oldValue, newValue, std::memory_order_acq_rel)) { if (oldValue == -1) { - return false; + return false; //write-locked } else { newValue = oldValue + 1; } + SpinLockUtil::pauseCPU(); } return true; } @@ -72,20 +59,35 @@ bool ReadWriteSpinLock::tryLockRead() inline bool ReadWriteSpinLock::tryLockWrite() { - int i{0}; + int i = 0; return _count.compare_exchange_strong(i, -1, std::memory_order_acq_rel); } inline -void ReadWriteSpinLock::unlockRead() +bool ReadWriteSpinLock::unlockRead() { - _count.fetch_sub(1, std::memory_order_acq_rel); + int oldValue = 1; + int newValue = 0; + while (!_count.compare_exchange_weak(oldValue, newValue, std::memory_order_acq_rel)) + { + if (oldValue <= 0) + { + return false; //unlocked or write-locked + } + else + { + newValue = oldValue - 1; + } + SpinLockUtil::pauseCPU(); + } + return true; } inline -void ReadWriteSpinLock::unlockWrite() +bool ReadWriteSpinLock::unlockWrite() { - _count.fetch_add(1, std::memory_order_acq_rel); + int i = -1; + return _count.compare_exchange_strong(i, 0, std::memory_order_acq_rel); } inline @@ -160,6 +162,14 @@ bool ReadWriteSpinLock::ReadGuard::ownsLock() const return _ownsLock; } +inline +void ReadWriteSpinLock::ReadGuard::unlock() +{ + if (_ownsLock) { + _spinlock.unlockRead(); + } +} + inline ReadWriteSpinLock::WriteGuard::WriteGuard(ReadWriteSpinLock& lock) : _spinlock(lock), @@ -207,6 +217,13 @@ bool ReadWriteSpinLock::WriteGuard::ownsLock() const return _ownsLock; } +inline +void ReadWriteSpinLock::WriteGuard::unlock() +{ + if (_ownsLock) { + _spinlock.unlockWrite(); + } +} } diff --git a/quantum/impl/quantum_shared_state_impl.h b/quantum/impl/quantum_shared_state_impl.h index 103be5a..4928723 100644 --- a/quantum/impl/quantum_shared_state_impl.h +++ b/quantum/impl/quantum_shared_state_impl.h @@ -37,18 +37,7 @@ template template int SharedState::set(V&& value) { - { - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - if (_state != FutureState::PromiseNotSatisfied) - { - ThrowFutureException(_state); - } - _value = std::forward(value); - _state = FutureState::PromiseAlreadySatisfied; - } - _cond.notifyAll(); - return 0; + return set(nullptr, std::forward(value)); } template @@ -65,27 +54,20 @@ int SharedState::set(ICoroSync::Ptr sync, V&& value) _value = std::forward(value); _state = FutureState::PromiseAlreadySatisfied; } - _cond.notifyAll(); + _cond.notifyAll(sync); return 0; } template T SharedState::get() { - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - conditionWait(); - _state = FutureState::FutureAlreadyRetrieved; - return std::move(_value); + return get(nullptr); } template const T& SharedState::getRef() const { - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - conditionWait(); - return _value; + return getRef(nullptr); } template @@ -109,26 +91,27 @@ const T& SharedState::getRef(ICoroSync::Ptr sync) const template void SharedState::breakPromise() +{ + return breakPromise(nullptr); +} + +template +void SharedState::breakPromise(ICoroSync::Ptr sync) { {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); + Mutex::Guard lock(sync, _mutex); if (_state == FutureState::PromiseNotSatisfied) { _state = FutureState::BrokenPromise; } } - _cond.notifyAll(); + _cond.notifyAll(sync); } template void SharedState::wait() const { - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _cond.wait(_mutex, [this]()->bool - { - return stateHasChanged(); - }); + return wait(nullptr); } template @@ -146,13 +129,7 @@ template template std::future_status SharedState::waitFor(const std::chrono::duration &time) const { - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _cond.waitFor(_mutex, time, [this]()->bool - { - return stateHasChanged(); - }); - return _state == FutureState::PromiseNotSatisfied ? std::future_status::timeout : std::future_status::ready; + return waitFor(nullptr, time); } template @@ -172,12 +149,7 @@ std::future_status SharedState::waitFor(ICoroSync::Ptr sync, template int SharedState::setException(std::exception_ptr ex) { - {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _exception = ex; - } - _cond.notifyAll(); - return -1; + return setException(nullptr, ex); } template @@ -188,18 +160,14 @@ int SharedState::setException(ICoroSync::Ptr sync, Mutex::Guard lock(sync, _mutex); _exception = ex; } - _cond.notifyAll(); + _cond.notifyAll(sync); return -1; } template void SharedState::conditionWait() const { - _cond.wait(_mutex, [this]()->bool - { - return stateHasChanged(); - }); - checkPromiseState(); + return conditionWait(nullptr); } template @@ -242,33 +210,28 @@ SharedState>::SharedState() : template void SharedState>::breakPromise() +{ + return breakPromise(nullptr); +} + +template +void SharedState>::breakPromise(ICoroSync::Ptr sync) { {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); + Mutex::Guard lock(sync, _mutex); if ((_state == FutureState::PromiseNotSatisfied) || (_state == FutureState::BufferingData)) { _state = FutureState::BrokenPromise; } } - _cond.notifyAll(); + _cond.notifyAll(sync); } template void SharedState>::wait() const { - if (!_reader.empty()) - { - return; //there is still data available - } - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _cond.wait(_mutex, [this]()->bool - { - BufferStatus status = _writer.empty() ? - (_writer.isClosed() ? BufferStatus::Closed : BufferStatus::DataPending) : BufferStatus::DataPosted; - return stateHasChanged(status); - }); + return wait(nullptr); } template @@ -292,19 +255,7 @@ template template std::future_status SharedState>::waitFor(const std::chrono::duration &time) const { - if (!_reader.empty()) - { - return std::future_status::ready; //there is still data available - } - //========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _cond.waitFor(_mutex, time, [this]()->bool - { - BufferStatus status = _writer.empty() ? - (_writer.isClosed() ? BufferStatus::Closed : BufferStatus::DataPending) : BufferStatus::DataPosted; - return stateHasChanged(status); - }); - return (_writer.empty() && !_writer.isClosed()) ? std::future_status::timeout : std::future_status::ready; + return waitFor(nullptr, time); } template @@ -330,12 +281,7 @@ std::future_status SharedState>::waitFor(ICoroSync::Ptr sync, template int SharedState>::setException(std::exception_ptr ex) { - {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _exception = ex; - } - _cond.notifyAll(); - return -1; + return setException(nullptr, ex); } template @@ -354,20 +300,7 @@ template template void SharedState>::push(V&& value) { - {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - if ((_state != FutureState::PromiseNotSatisfied) && (_state != FutureState::BufferingData)) - { - ThrowFutureException(_state); - } - BufferStatus status = _writer.push(std::forward(value)); - if (status == BufferStatus::Closed) - { - ThrowFutureException(FutureState::BufferClosed); - } - _state = FutureState::BufferingData; - } - _cond.notifyAll(); + push(nullptr, std::forward(value)); } template @@ -387,42 +320,13 @@ void SharedState>::push(ICoroSync::Ptr sync, V&& value) } _state = FutureState::BufferingData; } - _cond.notifyAll(); + _cond.notifyAll(sync); } template T SharedState>::pull(bool& isBufferClosed) { - T out{}; - if (!_reader.empty()) - { - T out{}; - _reader.pull(out); - return out; - } - {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); - _cond.wait(_mutex, [this]()->bool - { - BufferStatus status = _writer.empty() ? - (_writer.isClosed() ? BufferStatus::Closed : BufferStatus::DataPending) : BufferStatus::DataPosted; - bool changed = stateHasChanged(status); - if (changed) { - // Move the writer to the reader for consumption - _reader = std::move(_writer); - } - return changed; - }); - } - isBufferClosed = _reader.empty() && _reader.isClosed(); - if (isBufferClosed) { - //Mark the future as fully retrieved - _state = FutureState::FutureAlreadyRetrieved; - return out; - } - _reader.pull(out); - checkPromiseState(); - return out; + return pull(nullptr, isBufferClosed); } template @@ -431,12 +335,11 @@ T SharedState>::pull(ICoroSync::Ptr sync, bool& isBufferClosed) T out{}; if (!_reader.empty()) { - T out{}; _reader.pull(out); return out; } {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); + Mutex::Guard lock(sync, _mutex); _cond.wait(sync, _mutex, [this]()->bool { BufferStatus status = _writer.empty() ? @@ -462,16 +365,22 @@ T SharedState>::pull(ICoroSync::Ptr sync, bool& isBufferClosed) template int SharedState>::closeBuffer() +{ + return closeBuffer(nullptr); +} + +template +int SharedState>::closeBuffer(ICoroSync::Ptr sync) { {//========= LOCKED SCOPE ========= - Mutex::Guard lock(_mutex); + Mutex::Guard lock(sync, _mutex); if ((_state == FutureState::PromiseNotSatisfied) || (_state == FutureState::BufferingData)) { _state = FutureState::BufferClosed; } _writer.close(); } - _cond.notifyAll(); + _cond.notifyAll(sync); return 0; } diff --git a/quantum/impl/quantum_spinlock_impl.h b/quantum/impl/quantum_spinlock_impl.h index 0e4fd42..53a95ae 100644 --- a/quantum/impl/quantum_spinlock_impl.h +++ b/quantum/impl/quantum_spinlock_impl.h @@ -18,31 +18,34 @@ //############################################################################################## //#################################### IMPLEMENTATIONS ######################################### //############################################################################################## +#include +#include +#include namespace Bloomberg { namespace quantum { -inline -SpinLock::SpinLock() : - _flag ATOMIC_FLAG_INIT -{} - +//============================================================================== +// SpinLock +//============================================================================== inline void SpinLock::lock() { - while (_flag.test_and_set(std::memory_order_acquire)); //spin + SpinLockUtil::lockExclusive(_flag, 1, 0); } inline bool SpinLock::tryLock() { - return !_flag.test_and_set(std::memory_order_acquire); + int i = 0; + return _flag.compare_exchange_strong(i, 1, std::memory_order_acquire); } inline void SpinLock::unlock() { - _flag.clear(std::memory_order_release); + int i = 1; + _flag.compare_exchange_strong(i, 0, std::memory_order_release); } inline @@ -60,6 +63,15 @@ SpinLock::Guard::Guard(SpinLock& lock, SpinLock::TryToLock) : { } +inline +SpinLock::Guard::Guard(SpinLock& lock, SpinLock::AdoptLock) : + _spinlock(lock), + _ownsLock(true) +{ + //Make sure the lock is already taken + assert(!_spinlock.tryLock()); +} + inline SpinLock::Guard::~Guard() { @@ -91,6 +103,13 @@ bool SpinLock::Guard::ownsLock() const { return _ownsLock; } +inline +void SpinLock::Guard::unlock() { + if (_ownsLock) { + _spinlock.unlock(); + } +} + inline SpinLock::ReverseGuard::ReverseGuard(SpinLock& lock) : _spinlock(lock) diff --git a/quantum/impl/quantum_spinlock_traits_impl.h b/quantum/impl/quantum_spinlock_traits_impl.h new file mode 100644 index 0000000..d2b1020 --- /dev/null +++ b/quantum/impl/quantum_spinlock_traits_impl.h @@ -0,0 +1,56 @@ +/* +** Copyright 2020 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +//NOTE: DO NOT INCLUDE DIRECTLY + +//############################################################################################## +//#################################### IMPLEMENTATIONS ######################################### +//############################################################################################## + +namespace Bloomberg { +namespace quantum { + +inline size_t& +SpinLockTraits::minSpins() { + static size_t minSpins = QUANTUM_SPINLOCK_MIN_SPINS; + return minSpins; +} + +inline size_t& +SpinLockTraits::maxSpins() { + static size_t maxSpins = QUANTUM_SPINLOCK_MAX_SPINS; + return maxSpins; +} + +inline size_t& +SpinLockTraits::numYieldsBeforeSleep() { + static size_t numYieldsBeforeSleep = QUANTUM_SPINLOCK_NUM_YIELDS_BEFORE_SLEEP; + return numYieldsBeforeSleep; +} + +inline std::chrono::microseconds& +SpinLockTraits::sleepDuration() { + static std::chrono::microseconds sleepDuration{QUANTUM_SPINLOCK_SLEEP_DURATION_US}; + return sleepDuration; +} + +inline SpinLockTraits::BackoffPolicy& +SpinLockTraits::backoffPolicy() { + static BackoffPolicy backoffPolicy = static_cast(QUANTUM_SPINLOCK_BACKOFF_POLICY); + return backoffPolicy; +} + +}} + diff --git a/quantum/impl/quantum_task_id_impl.h b/quantum/impl/quantum_task_id_impl.h new file mode 100644 index 0000000..9c10154 --- /dev/null +++ b/quantum/impl/quantum_task_id_impl.h @@ -0,0 +1,161 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +//NOTE: DO NOT INCLUDE DIRECTLY + +//############################################################################################## +//#################################### IMPLEMENTATIONS ######################################### +//############################################################################################## +#include +#include +#include +#include + +namespace Bloomberg { +namespace quantum { + +inline +TaskId::TaskId(ThisThreadTag) : + _threadId(std::this_thread::get_id()) +{ +} + +inline +TaskId::TaskId(CoroContextTag) : + _id(-generate()) +{ + //the thread id will be populated when the task runs +} + +inline +TaskId::TaskId(ThreadContextTag) : + _id(generate()) +{ + //the thread id will be populated when the task runs +} + +inline +bool TaskId::operator==(const TaskId& rhs) const +{ + if (isCoroutine() && rhs.isCoroutine()) { + //compare the coroutine id only since a coroutine may run + //on different threads if shared mode is enabled + return _id == rhs._id; + } + return std::tie(_id, _threadId) == std::tie(rhs._id, rhs._threadId); +} + +inline +bool TaskId::operator!=(const TaskId& rhs) const +{ + return !operator==(rhs); +} + +inline +bool TaskId::operator<(const TaskId& rhs) const +{ + if (isCoroutine() && rhs.isCoroutine()) { + //compare the coroutine id only since a coroutine may run + //on different threads if shared mode is enabled + return _id < rhs._id; + } + return std::tie(_id, _threadId) < std::tie(rhs._id, rhs._threadId); +} + +inline +bool TaskId::operator>(const TaskId& rhs) const +{ + return !operator==(rhs) && !operator<(rhs); +} + +inline +bool TaskId::operator<=(const TaskId& rhs) const +{ + return !operator>(rhs); +} + +inline +bool TaskId::operator>=(const TaskId& rhs) const +{ + return !operator<(rhs); +} + +inline +size_t TaskId::hashValue() const +{ + size_t seed = std::hash()(_threadId); + boost::hash_combine(seed, _id); + return seed; +} + +inline +size_t TaskId::id() const +{ + return static_cast(_id); +} + +inline +std::thread::id TaskId::threadId() const +{ + return _threadId; +} + +inline +void TaskId::assignCurrentThread() +{ + _threadId = std::this_thread::get_id(); +} + +inline +bool TaskId::isCoroutine() const +{ + return _id < 0; +} + +inline +ssize_t TaskId::generate() +{ + static std::atomic gen{1}; + ssize_t i = gen.load(std::memory_order_acquire); + ssize_t j = ((i+1) < 0) ? 1 : (i+1); //check for overflow + while (!gen.compare_exchange_weak(i, j, std::memory_order_acq_rel)) { + if (i < 0) { + //rollover + j = 1; + } + else { + j = ((i+1) < 0) ? 1 : (i+1); //check for overflow + } + } + return j; +} + +inline +std::ostream& operator<<(std::ostream& os, const TaskId& tid) +{ + if (tid.isCoroutine()) { + //task running on a coroutine + os << 'C'; + } + else { + //task running on a thread + os << 'T'; + } + os << tid._id << ":"; + os << tid._threadId; + return os; +} + +}} diff --git a/quantum/impl/quantum_task_impl.h b/quantum/impl/quantum_task_impl.h index 1b59fd4..2c79275 100644 --- a/quantum/impl/quantum_task_impl.h +++ b/quantum/impl/quantum_task_impl.h @@ -48,9 +48,9 @@ Task::Task(std::false_type, _queueId(queueId), _isHighPriority(isHighPriority), _type(type), + _taskId(CoroContextTag{}), _terminated(false), - _suspendedState((int)State::Suspended), - _coroLocalStorage() + _suspendedState((int)State::Suspended) {} template @@ -67,9 +67,9 @@ Task::Task(std::true_type, _queueId(queueId), _isHighPriority(isHighPriority), _type(type), + _taskId(CoroContextTag{}), _terminated(false), - _suspendedState((int)State::Suspended), - _coroLocalStorage() + _suspendedState((int)State::Suspended) {} inline @@ -106,8 +106,8 @@ int Task::run() { return (int)ITask::RetCode::Sleeping; } - int rc = (int)ITask::RetCode::Running; + _taskId.assignCurrentThread(); _coro(rc); if (!_coro) { @@ -125,7 +125,7 @@ void Task::setQueueId(int queueId) } inline -int Task::getQueueId() +int Task::getQueueId() const { return _queueId; } @@ -133,6 +133,9 @@ int Task::getQueueId() inline ITask::Type Task::getType() const { return _type; } +inline +TaskId Task::getTaskId() const { return _taskId; } + inline ITaskContinuation::Ptr Task::getNextTask() { return _next; } @@ -196,9 +199,9 @@ bool Task::isSuspended() const } inline -Task::CoroLocalStorage& Task::getCoroLocalStorage() +ITask::LocalStorage& Task::getLocalStorage() { - return _coroLocalStorage; + return _localStorage; } inline diff --git a/quantum/impl/quantum_task_queue_impl.h b/quantum/impl/quantum_task_queue_impl.h index c1e62b9..f199274 100644 --- a/quantum/impl/quantum_task_queue_impl.h +++ b/quantum/impl/quantum_task_queue_impl.h @@ -43,19 +43,6 @@ TaskQueue::ProcessTaskResult::ProcessTaskResult(bool isBlocked, _blockedQueueRound(blockedQueueRound) { } - -inline -TaskQueue::CurrentTaskSetter::CurrentTaskSetter(TaskQueue& taskQueue, const TaskPtr & task) : - _taskQueue(taskQueue) -{ - _taskQueue.setCurrentTask(task.get()); -} - -inline -TaskQueue::CurrentTaskSetter::~CurrentTaskSetter() -{ - _taskQueue.setCurrentTask(nullptr); -} inline TaskQueue::TaskQueue() : @@ -189,7 +176,7 @@ TaskQueue::ProcessTaskResult TaskQueue::processTask() int rc; { // set the current task for local-storage queries - CurrentTaskSetter taskSetter(*this, task); + IQueue::TaskSetterGuard taskSetter(*this, task); //========================= START/RESUME COROUTINE ========================= rc = task->run(); //=========================== END/YIELD COROUTINE ========================== @@ -627,23 +614,5 @@ void TaskQueue::acquireWaiting() } } -inline -Task*& getCurrentTaskImpl() -{ - static thread_local Task* currentTask = nullptr; - return currentTask; -} - -inline -Task* TaskQueue::getCurrentTask() -{ - return getCurrentTaskImpl(); -} - -inline void TaskQueue::setCurrentTask(Task* task) -{ - getCurrentTaskImpl() = task; -} - }} diff --git a/quantum/interface/quantum_icontext_base.h b/quantum/interface/quantum_icontext_base.h index e68b7ab..987ab26 100644 --- a/quantum/interface/quantum_icontext_base.h +++ b/quantum/interface/quantum_icontext_base.h @@ -18,13 +18,11 @@ #include #include +#include namespace Bloomberg { namespace quantum { -struct CoroContextTag{}; -struct ThreadContextTag{}; - //============================================================================================== // interface IContextBase //============================================================================================== @@ -37,6 +35,10 @@ struct IContextBase /// @brief Virtual destructor. virtual ~IContextBase() = default; + /// @brief Returns the id of the currently executing coroutine or IO task + /// @return The task id + virtual TaskId taskId() const = 0; + /// @brief Determines if the future object associated with this context has a valid shared state with the /// corresponding promise. /// @return True if valid, false otherwise. diff --git a/quantum/interface/quantum_ipromise_base.h b/quantum/interface/quantum_ipromise_base.h index af024a0..5213c11 100644 --- a/quantum/interface/quantum_ipromise_base.h +++ b/quantum/interface/quantum_ipromise_base.h @@ -44,6 +44,8 @@ struct IPromiseBase : public ITerminate /// @param[in] ex An exception pointer which has been caught via std::current_exception. /// @return 0 on success virtual int setException(std::exception_ptr ex) = 0; + virtual int setException(ICoroSync::Ptr sync, + std::exception_ptr ex) = 0; /// @brief Get a thread-compatible interface used to access the associated future. /// @return An interface to the associated future. diff --git a/quantum/interface/quantum_iqueue.h b/quantum/interface/quantum_iqueue.h index 13f24ad..e2556d5 100644 --- a/quantum/interface/quantum_iqueue.h +++ b/quantum/interface/quantum_iqueue.h @@ -74,6 +74,22 @@ struct IQueue : public ITerminate int queueId, bool shared, bool any); + + static ITask::Ptr getCurrentTask(); + + static void setCurrentTask(ITask::Ptr task); + +protected: + struct TaskSetterGuard + { + TaskSetterGuard(IQueue& taskQueue, + ITask::Ptr task); + ~TaskSetterGuard(); + IQueue& _taskQueue; + }; + +private: + static ITask::Ptr& currentTask(); }; using IQueuePtr = IQueue::Ptr; @@ -122,6 +138,39 @@ void IQueue::setThreadName(QueueType type, using IoQueueListAllocator = StlAllocator; #endif +inline +ITask::Ptr& IQueue::currentTask() +{ + static thread_local ITask::Ptr currentTask; + return currentTask; +} + +inline +ITask::Ptr IQueue::getCurrentTask() +{ + return currentTask(); +} + +inline +void IQueue::setCurrentTask(ITask::Ptr task) +{ + currentTask() = std::move(task); +} + +inline +IQueue::TaskSetterGuard::TaskSetterGuard(IQueue& taskQueue, + ITask::Ptr task) : + _taskQueue(taskQueue) +{ + _taskQueue.setCurrentTask(std::move(task)); +} + +inline +IQueue::TaskSetterGuard::~TaskSetterGuard() +{ + _taskQueue.setCurrentTask(nullptr); +} + }} #endif //BLOOMBERG_QUANTUM_IQUEUE_H diff --git a/quantum/interface/quantum_itask.h b/quantum/interface/quantum_itask.h index 7916240..00e331b 100644 --- a/quantum/interface/quantum_itask.h +++ b/quantum/interface/quantum_itask.h @@ -18,8 +18,10 @@ #include #include +#include #include #include +#include namespace Bloomberg { namespace quantum { @@ -33,6 +35,7 @@ struct ITask : public ITerminate { using Ptr = std::shared_ptr; using WeakPtr = std::weak_ptr; + using LocalStorage = std::unordered_map; enum class Type : int { @@ -57,17 +60,21 @@ struct ITask : public ITerminate virtual void setQueueId(int queueId) = 0; - virtual int getQueueId() = 0; + virtual int getQueueId() const = 0; + + virtual TaskId getTaskId() const = 0; virtual Type getType() const = 0; virtual bool isBlocked() const = 0; - virtual bool isSleeping(bool updateTimer = false) = 0; + virtual bool isSleeping(bool updateTimer) = 0; virtual bool isHighPriority() const = 0; virtual bool isSuspended() const = 0; + + virtual LocalStorage& getLocalStorage() = 0; }; using ITaskPtr = ITask::Ptr; diff --git a/quantum/interface/quantum_iterminate.h b/quantum/interface/quantum_iterminate.h index 1089b59..83a2c12 100644 --- a/quantum/interface/quantum_iterminate.h +++ b/quantum/interface/quantum_iterminate.h @@ -17,6 +17,7 @@ #define BLOOMBERG_QUANTUM_ITERMINATE_H #include +#include namespace Bloomberg { namespace quantum { diff --git a/quantum/quantum.h b/quantum/quantum.h index 42c1c09..dfeecee 100644 --- a/quantum/quantum.h +++ b/quantum/quantum.h @@ -66,6 +66,7 @@ #include #include #include +#include #include #include #include diff --git a/quantum/quantum_condition_variable.h b/quantum/quantum_condition_variable.h index a715629..39d28b1 100644 --- a/quantum/quantum_condition_variable.h +++ b/quantum/quantum_condition_variable.h @@ -215,10 +215,6 @@ class ConditionVariable PREDICATE predicate); private: - void notifyOneImpl(ICoroSync::Ptr sync); - - void notifyAllImpl(ICoroSync::Ptr sync); - void waitImpl(ICoroSync::Ptr sync, Mutex& mutex); diff --git a/quantum/quantum_configuration.h b/quantum/quantum_configuration.h index c75890c..111075c 100644 --- a/quantum/quantum_configuration.h +++ b/quantum/quantum_configuration.h @@ -17,6 +17,7 @@ #define BLOOMBERG_QUANTUM_CONFIGURATION_H #include +#include #include #include @@ -31,9 +32,10 @@ namespace quantum { class Configuration { public: - enum class BackoffPolicy : int { Linear, ///< Linear backoff - Exponential }; ///< Exponential backoff (doubles every time) - + enum class BackoffPolicy : int { + Linear = QUANTUM_BACKOFF_LINEAR, ///< Linear backoff + Exponential = QUANTUM_BACKOFF_EXPONENTIAL ///< Exponential backoff (doubles every time) + }; /// @brief Get the JSON schema corresponding to this configuration object. /// @return The draft-04 compatible schema. static const std::string& getJsonSchema(); diff --git a/quantum/quantum_context.h b/quantum/quantum_context.h index 39738c5..15cadac 100644 --- a/quantum/quantum_context.h +++ b/quantum/quantum_context.h @@ -69,6 +69,7 @@ class Context : public IThreadContext, //=================================== // ICONTEXTBASE //=================================== + TaskId taskId() const final; bool valid() const final; bool validAt(int num) const final; int setException(std::exception_ptr ex) final; @@ -146,6 +147,8 @@ class Context : public IThreadContext, //=================================== template > int closeBuffer(); + template > + int closeBuffer(ICoroSync::Ptr sync); int getNumCoroutineThreads() const; int getNumIoThreads() const; const std::pair& getCoroQueueIdRangeForAny() const; @@ -328,14 +331,14 @@ class Context : public IThreadContext, void validateContext(ICoroSync::Ptr sync) const; //throws //Members - ITask::Ptr _task; - std::vector _promises; - DispatcherCore* _dispatcher; - std::atomic_bool _terminated; - std::atomic_int _signal; - Traits::Yield* _yield; - std::chrono::microseconds _sleepDuration; - std::chrono::steady_clock::time_point _sleepTimestamp; + ITask::Ptr _task; + std::vector _promises; + DispatcherCore* _dispatcher; + std::atomic_bool _terminated; + std::atomic_int _signal; + Traits::Yield* _yield; + std::chrono::microseconds _sleepDuration; + std::chrono::steady_clock::time_point _sleepTimestamp; }; template diff --git a/quantum/quantum_io_task.h b/quantum/quantum_io_task.h index 8169d66..5428597 100644 --- a/quantum/quantum_io_task.h +++ b/quantum/quantum_io_task.h @@ -66,12 +66,14 @@ class IoTask : public ITask //ITask int run() final; void setQueueId(int queueId) final; - int getQueueId() final; + int getQueueId() const final; Type getType() const final; + TaskId getTaskId() const final; bool isBlocked() const final; bool isSleeping(bool updateTimer = false) final; bool isHighPriority() const final; bool isSuspended() const final; + ITask::LocalStorage& getLocalStorage() final; //=================================== // NEW / DELETE @@ -85,6 +87,8 @@ class IoTask : public ITask std::atomic_bool _terminated; int _queueId; bool _isHighPriority; + TaskId _taskId; + ITask::LocalStorage _localStorage; // local storage of the IO task }; using IoTaskPtr = IoTask::Ptr; diff --git a/quantum/quantum_local.h b/quantum/quantum_local.h index fef0911..8d1284d 100644 --- a/quantum/quantum_local.h +++ b/quantum/quantum_local.h @@ -23,25 +23,29 @@ namespace Bloomberg { namespace quantum { namespace local { -/// @brief Accesses the pointer to a coro-local-variable -/// @param[in] key the variable name -/// @return the pointer to the coro-local-variable with the name @see key -/// @note If the variable with the name @see key has not been created within the current -/// coroutine, a pointer to it will be allocated, set to nullptr, and a reference to the -/// pointer will be returned here for reading/writing. -/// If the variable with the name @see key has already been created within the current -/// coroutine, then a reference to the previously set pointer will be returned. -/// @note If the function is called outside of a coroutine, then the thread-local storage is -/// used with the semantics described above. -/// @note Upon the termination of the coroutine, the storage occupied by the coro-local-variable -/// pointers will be freed. It is up to the user of the API to free the actual variable -/// storage. -template -T*& variable(const std::string& key); - -/// @brief Get the current coroutine context -/// @return The coroutine context if this function is called inside a coroutine or null otherwise -VoidContextPtr context(); + /// @brief Accesses the pointer to a coroutine or IO task local variable + /// @param[in] key The variable name + /// @return the pointer to the local variable with the same name + /// @note If the variable with the 'key' name has not been created within the current + /// coroutine or IO task, a pointer to it will be allocated, set to null, and a reference to the + /// pointer will be returned for reading/writing. + /// If the variable with the 'key' name has already been created, + /// then a reference to the previously set pointer will be returned. + /// @note If the function is called outside of a coroutine or IO task, then a default thread-local + /// storage will be used with the semantics described above. + /// @note Upon the termination of the coroutine, the storage occupied by the coro-local-variable + /// pointers will be freed. It is up to the user of the API to free the actual variable + /// storage. + template + T*& variable(const std::string& key); + + /// @brief Get the current coroutine context + /// @return The coroutine context if this function is called inside a coroutine or null otherwise + VoidContextPtr context(); + + /// @brief Returns the task id of the currently executing coroutine or IO task + /// @return The task id + TaskId taskId(); }}} diff --git a/quantum/quantum_macros.h b/quantum/quantum_macros.h index 5985596..d9a54ba 100644 --- a/quantum/quantum_macros.h +++ b/quantum/quantum_macros.h @@ -24,4 +24,9 @@ #define DEPRECATED #endif +#define QUANTUM_BACKOFF_LINEAR 0 +#define QUANTUM_BACKOFF_EXPONENTIAL 1 +#define QUANTUM_BACKOFF_EQUALSTEP 2 +#define QUANTUM_BACKOFF_RANDOM 3 + #endif //BLOOMBERG_QUANTUM_MACROS_H diff --git a/quantum/quantum_mutex.h b/quantum/quantum_mutex.h index 2394cee..26f60ed 100644 --- a/quantum/quantum_mutex.h +++ b/quantum/quantum_mutex.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace Bloomberg { namespace quantum { @@ -36,9 +37,11 @@ namespace quantum { class Mutex { public: - /// @brief Default constructor. - /// @note Mutex object is in unlocked state. - Mutex(); + using TryToLock = std::try_to_lock_t; + using AdoptLock = std::adopt_lock_t; + + /// @brief Constructor. The object is in the unlocked state. + Mutex() = default; Mutex(const Mutex& other) = delete; Mutex& operator=(const Mutex& other) = delete; @@ -75,35 +78,43 @@ class Mutex public: /// @brief Construct this object and lock the passed-in mutex. /// @param[in] mutex Mutex which protects a scope during the lifetime of the Guard. - /// @param[in] tryLock If set to true, tries to lock the mutex instead of unconditionally locking it. - /// @note If tryLock is set to true, ownership of the mutex may fail in which case it can be verified - /// with ownsLock(). This constructor must be used in a non-coroutine context. - /// @warning Wrongfully calling this method from a coroutine will block all coroutines running in the - /// same queue and thus result in noticeable performance degradation. - explicit Guard(Mutex& mutex, - bool tryLock = false); + /// @param[in] TryToLock If supplied, tries to lock the mutex instead of unconditionally locking it. + /// @param[in] AdoptLock If supplied, assumes the lock is already taken by the application. + explicit Guard(Mutex& mutex); + Guard(Mutex& mutex, + Mutex::TryToLock); + Guard(Mutex& mutex, + Mutex::AdoptLock); - /// @brief Construct this object and lock the passed-in mutex. - /// @param[in] sync Pointer to a coroutine synchronization object. - /// @param[in] mutex Mutex which protects a scope during the lifetime of the Guard. - /// @param[in] tryLock If set to true, tries to lock the mutex instead of unconditionally locking it. - /// @note If tryLock is set to true, ownership of the mutex may fail in which case it can be verified - /// with ownsLock(). This constructor must be used in a coroutine context. + /// @brief Construct this object and lock the passed-in mutex. Same as above but using a coroutine + /// synchronization context. Guard(ICoroSync::Ptr sync, - Mutex& mutex, - bool tryLock = false); + Mutex& mutex); /// @brief Destructor. This will unlock the underlying mutex. ~Guard(); + /// @brief see Mutex::lock() + void lock(); + void lock(ICoroSync::Ptr sync); + + /// @brief see Mutex::tryLock() + bool tryLock(); + + /// @brief see Mutex::unlock() + void unlock(); + + /// @brief Releases the associated mutex without unlocking it + void release(); + /// @brief Determines if this object owns the underlying mutex. /// @return True if mutex is locked, false otherwise. bool ownsLock() const; private: //Members - Mutex& _mutex; - bool _ownsLock; + Mutex* _mutex{nullptr}; + bool _ownsLock{false}; }; //============================================================================================== @@ -134,15 +145,14 @@ class Mutex private: //Members - Mutex& _mutex; + Mutex* _mutex; ICoroSync::Ptr _sync; }; private: - void lockImpl(ICoroSync::Ptr sync); - //Members mutable SpinLock _spinlock; + mutable TaskId _taskId; }; }} diff --git a/quantum/quantum_promise.h b/quantum/quantum_promise.h index 43003b9..e741691 100644 --- a/quantum/quantum_promise.h +++ b/quantum/quantum_promise.h @@ -74,6 +74,7 @@ class Promise : public IPromiseBase, //IPromiseBase bool valid() const final; int setException(std::exception_ptr ex) final; + int setException(ICoroSync::Ptr sync, std::exception_ptr ex) final; //IThreadPromise template > @@ -92,6 +93,9 @@ class Promise : public IPromiseBase, template > int closeBuffer(); + template > + int closeBuffer(ICoroSync::Ptr sync); + //=================================== // NEW / DELETE //=================================== diff --git a/quantum/quantum_read_write_mutex.h b/quantum/quantum_read_write_mutex.h new file mode 100644 index 0000000..b1e321a --- /dev/null +++ b/quantum/quantum_read_write_mutex.h @@ -0,0 +1,210 @@ +/* +** Copyright 2020 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef BLOOMBERG_QUANTUM_READ_WRITE_MUTEX_H +#define BLOOMBERG_QUANTUM_READ_WRITE_MUTEX_H + +#include + +namespace Bloomberg { +namespace quantum { + +class ReadWriteMutex +{ +public: + using TryToLock = std::try_to_lock_t; + using AdoptLock = std::adopt_lock_t; + + /// @brief Constructor. The object is in the unlocked state. + ReadWriteMutex() = default; + + /// @brief Copy constructor + ReadWriteMutex(const ReadWriteMutex&) = delete; + + /// @brief Move constructor + ReadWriteMutex(ReadWriteMutex&&) = default; + + /// @brief Copy assignment operator + ReadWriteMutex& operator=(const ReadWriteMutex&) = delete; + + /// @brief Move assignment operator + ReadWriteMutex& operator=(ReadWriteMutex&&) = default; + + /// @brief Lock this object as a reader (shared with other readers) + /// @details The current context will be yielded until the lock is acquired. + /// @note From a non-coroutine context, call the first. From a coroutine context, + /// call the second. + /// @warning Wrongfully calling the first version from a coroutine context will + /// block all coroutines running on the same queue and result in noticeable + /// performance degradation. + void lockRead(); + void lockRead(ICoroSync::Ptr sync); + + /// @brief Lock this object as a writer (exclusive) + /// @details The current context will be yielded until the lock is acquired. + /// @note From a non-coroutine context, call the first. From a coroutine context, + /// call the second. + /// @warning Wrongfully calling the first version from a coroutine context will + /// block all coroutines running on the same queue and result in noticeable + /// performance degradation. + void lockWrite(); + void lockWrite(ICoroSync::Ptr sync); + + /// @brief Attempts to lock this object as a reader (shared with other readers) + /// @return True if the lock operation succeeded, false otherwise. + bool tryLockRead(); + + /// @brief Attempts to lock this object as a writer (exclusive) + /// @return True if the lock operation succeeded, false otherwise. + bool tryLockWrite(); + + /// @brief Unlocks the reader lock. + /// @warning Locking this object as a writer and incorrectly unlocking it as a reader results in undefined behavior. + void unlockRead(); + + /// @brief Unlocks the writer lock. + /// @warning Locking this object as a reader and incorrectly unlocking it as a writer results in undefined behavior. + void unlockWrite(); + + /// @bried Determines if this object is either read or write locked. + /// @return True if locked, false otherwise. + bool isLocked() const; + + /// @bried Determines if this object is read locked. + /// @return True if locked, false otherwise. + bool isReadLocked() const; + + /// @bried Determines if this object is write locked. + /// @return True if locked, false otherwise. + bool isWriteLocked() const; + + /// @brief Returns the number of readers holding the lock. + /// @return The number of readers. + int numReaders() const; + + class ReadGuard + { + public: + /// @brief Construct this object and lock the passed-in mutex as a reader. + /// @param[in] lock ReadWriteMutex which protects a scope during the lifetime of the Guard. + /// @note Blocks the current thread until the mutex is acquired. + explicit ReadGuard(ReadWriteMutex& lock); + ReadGuard(ICoroSync::Ptr sync, + ReadWriteMutex& lock); + + /// @brief Construct this object and tries to lock the passed-in mutex as a reader. + /// @param[in] lock ReadWriteMutex which protects a scope during the lifetime of the Guard. + /// @note Attempts to lock the mutex. Does not block. + ReadGuard(ReadWriteMutex& lock, + ReadWriteMutex::TryToLock); + + /// @brief Construct this object and does not lock the mutex. Assumes the application already + /// owns the read lock. + /// @param[in] lock ReadWriteMutex which protects a scope during the lifetime of the Guard. + /// @note Attempts to lock the mutex. Does not block. + ReadGuard(ReadWriteMutex& lock, + ReadWriteMutex::AdoptLock); + + /// @brief Destroy this object and unlock the underlying mutex if this object owns it. + ~ReadGuard(); + + /// @brief Acquire the underlying mutex as a reader. + /// @note Blocks. + void lock(); + void lock(ICoroSync::Ptr sync); + + /// @brief Try to acquire the underlying mutex. + /// @return True if mutex is locked, false otherwise. + /// @note Does not block. + bool tryLock(); + + /// @brief Releases the read lock on the underlying mutex. + /// @note Also releases ownership of the underlying mutex. + void unlock(); + + /// @brief Release the associated mutex without unlocking it. + void release(); + + /// @brief Indicates if this object owns the underlying mutex. + /// @return True if ownership is acquired. + bool ownsLock() const; + private: + ReadWriteMutex* _mutex{nullptr}; + bool _ownsLock{false}; + }; + + class WriteGuard + { + public: + /// @brief Construct this object and lock the passed-in mutex as a writer. + /// @param[in] lock ReadWriteMutex which protects a scope during the lifetime of the Guard. + /// @note Blocks the current thread until the mutex is acquired. + explicit WriteGuard(ReadWriteMutex& lock); + WriteGuard(ICoroSync::Ptr sync, + ReadWriteMutex& lock); + + /// @brief Construct this object and tries to lock the passed-in mutex as a writer. + /// @param[in] lock ReadWriteMutex which protects a scope during the lifetime of the Guard. + /// @note Attempts to lock the mutex. Does not block. + WriteGuard(ReadWriteMutex& lock, + ReadWriteMutex::TryToLock); + + /// @brief Construct this object and does not lock the mutex. Assumes the application + /// already owns the write lock. + /// @param[in] lock ReadWriteMutex which protects a scope during the lifetime of the Guard. + /// @note: Does no block. + WriteGuard(ReadWriteMutex& lock, + ReadWriteMutex::AdoptLock); + + /// @brief Destroy this object and unlock the underlying mutex if the object owns it. + ~WriteGuard(); + + /// @brief Acquire the underlying mutex as a writer. + /// @note Blocks. + void lock(); + void lock(ICoroSync::Ptr sync); + + /// @brief Try to acquire the underlying mutex as a writer. + /// @return True if mutex is locked, false otherwise. + /// @note Does not block. + bool tryLock(); + + /// @brief Releases the write lock on the underlying mutex. + /// @note Also releases ownership of the underlying mutex. + void unlock(); + + /// @brief Release the associated mutex without unlocking it. + void release(); + + /// @brief Indicates if this object owns the underlying mutex. + /// @return True if ownership is acquired. + bool ownsLock() const; + private: + ReadWriteMutex* _mutex{nullptr}; + bool _ownsLock{false}; + }; + +private: + // Members + mutable ReadWriteSpinLock _spinlock; + mutable TaskId _taskId; +}; + +} +} + +#include + +#endif //BLOOMBERG_QUANTUM_READ_WRITE_MUTEX_H diff --git a/quantum/quantum_read_write_spinlock.h b/quantum/quantum_read_write_spinlock.h index 374d2af..5093d65 100644 --- a/quantum/quantum_read_write_spinlock.h +++ b/quantum/quantum_read_write_spinlock.h @@ -26,8 +26,9 @@ class ReadWriteSpinLock { public: using TryToLock = std::try_to_lock_t; + using AdoptLock = std::adopt_lock_t; - /// @brief Constructor. The object is in the unlocked state. + /// @brief Spinlock is in unlocked state ReadWriteSpinLock() = default; /// @brief Copy constructor. @@ -57,12 +58,14 @@ class ReadWriteSpinLock bool tryLockWrite(); /// @brief Unlocks the reader lock. + /// @return True if succeeded /// @warning Locking this object as a writer and incorrectly unlocking it as a reader results in undefined behavior. - void unlockRead(); + bool unlockRead(); /// @brief Unlocks the writer lock. + /// @return True if succeeded /// @warning Locking this object as a reader and incorrectly unlocking it as a writer results in undefined behavior. - void unlockWrite(); + bool unlockWrite(); /// @bried Determines if this object is either read or write locked. /// @return True if locked, false otherwise. @@ -108,6 +111,9 @@ class ReadWriteSpinLock /// @brief Indicates if this object owns the underlying spinlock. /// @return True if ownership is acquired. bool ownsLock() const; + + /// @brief Unlocks the underlying spinlock + void unlock(); private: ReadWriteSpinLock& _spinlock; bool _ownsLock; @@ -141,13 +147,16 @@ class ReadWriteSpinLock /// @brief Indicates if this object owns the underlying spinlock. /// @return True if ownership is acquired. bool ownsLock() const; + + /// @brief Unlocks the underlying spinlock + void unlock(); private: ReadWriteSpinLock& _spinlock; bool _ownsLock; }; private: - std::atomic_int _count{0}; + alignas(128) std::atomic_int _count{0}; }; } diff --git a/quantum/quantum_shared_state.h b/quantum/quantum_shared_state.h index 361c7ec..9bf398f 100644 --- a/quantum/quantum_shared_state.h +++ b/quantum/quantum_shared_state.h @@ -57,6 +57,8 @@ class SharedState void breakPromise(); + void breakPromise(ICoroSync::Ptr sync); + void wait() const; void wait(ICoroSync::Ptr sync) const; @@ -113,6 +115,8 @@ class SharedState> void breakPromise(); + void breakPromise(ICoroSync::Ptr sync); + void wait() const; void wait(ICoroSync::Ptr sync) const; @@ -130,6 +134,8 @@ class SharedState> std::exception_ptr ex); int closeBuffer(); + + int closeBuffer(ICoroSync::Ptr sync); private: SharedState(); diff --git a/quantum/quantum_spinlock.h b/quantum/quantum_spinlock.h index 0573f73..6f96c43 100644 --- a/quantum/quantum_spinlock.h +++ b/quantum/quantum_spinlock.h @@ -32,9 +32,10 @@ class SpinLock { public: using TryToLock = std::try_to_lock_t; + using AdoptLock = std::adopt_lock_t; - /// @brief Constructor. The object is in the unlocked state. - SpinLock(); + /// @brief Spinlock is in unlocked state + SpinLock() = default; /// @brief Copy constructor. SpinLock(const SpinLock&) = delete; @@ -78,8 +79,18 @@ class SpinLock /// @brief Construct this object and tries to lock the passed-in spinlock. /// @param[in] lock Spinlock which protects a scope during the lifetime of the Guard. + /// @param[in] tryLock Tag. Not used. /// @note Attempts to lock the spinlock. Does not block. - Guard(SpinLock& lock, SpinLock::TryToLock); + Guard(SpinLock& lock, + SpinLock::TryToLock tryLock); + + /// @brief Construct this object and don't lock the spinlock. It is assumed that the + /// application already owns the lock. + /// @param[in] lock Spinlock which protects a scope during the lifetime of the Guard. + /// @param[in] adoptLock Tag. Not used. + /// @note ownsLock() will always return true. + Guard(SpinLock& lock, + SpinLock::AdoptLock adoptLock); /// @brief Destroy this object and unlock the underlying spinlock. ~Guard(); @@ -96,6 +107,9 @@ class SpinLock /// @brief Indicates if this object owns the underlying spinlock. /// @return True if ownership is acquired. bool ownsLock() const; + + /// @brief Unlocks the underlying spinlock + void unlock(); private: SpinLock& _spinlock; bool _ownsLock; @@ -122,7 +136,7 @@ class SpinLock }; private: - std::atomic_flag _flag; + alignas(128) std::atomic_int _flag{0}; }; }} diff --git a/quantum/quantum_spinlock_traits.h b/quantum/quantum_spinlock_traits.h new file mode 100644 index 0000000..0b01576 --- /dev/null +++ b/quantum/quantum_spinlock_traits.h @@ -0,0 +1,76 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef QUANTUM_QUANTUM_SPINLOCK_TRAITS_H +#define QUANTUM_QUANTUM_SPINLOCK_TRAITS_H + +#include + +namespace Bloomberg { +namespace quantum { + +#ifndef QUANTUM_SPINLOCK_MIN_SPINS +#define QUANTUM_SPINLOCK_MIN_SPINS 100; +#endif + +#ifndef QUANTUM_SPINLOCK_MAX_SPINS +#define QUANTUM_SPINLOCK_MAX_SPINS 5000 +#endif + +#ifndef QUANTUM_SPINLOCK_SLEEP_DURATION_US +#define QUANTUM_SPINLOCK_SLEEP_DURATION_US 200 +#endif + +#ifndef QUANTUM_SPINLOCK_NUM_YIELDS_BEFORE_SLEEP +#define QUANTUM_SPINLOCK_NUM_YIELDS_BEFORE_SLEEP 3 +#endif + +#ifndef QUANTUM_SPINLOCK_BACKOFF_POLICY +#define QUANTUM_SPINLOCK_BACKOFF_POLICY QUANTUM_BACKOFF_EXPONENTIAL +#endif + +//============================================================================== +// SpinLockTraits +//============================================================================== +struct SpinLockTraits { + enum class BackoffPolicy : int { + Linear = QUANTUM_BACKOFF_LINEAR, ///< Linear backoff + Exponential = QUANTUM_BACKOFF_EXPONENTIAL, ///< Exponential backoff (doubles every time) + EqualStep = QUANTUM_BACKOFF_EQUALSTEP, ///< Identical backoff amount + Random = QUANTUM_BACKOFF_RANDOM ///< Random backoff amount + }; + + /// @brief Initial number of spins for the backoff + static size_t &minSpins(); + + /// @brief The maximum number of spins to wait before yielding, + /// as well as the backoff limit. + static size_t &maxSpins(); + + /// @brief The number of yields before sleeping the thread + static size_t &numYieldsBeforeSleep(); + + /// @brief The sleep duration (after there are no more yields) + static std::chrono::microseconds &sleepDuration(); + + /// @brief Backoff policy while spinning + static BackoffPolicy &backoffPolicy(); +}; + +}} + +#include + +#endif //QUANTUM_QUANTUM_SPINLOCK_TRAITS_H diff --git a/quantum/quantum_task.h b/quantum/quantum_task.h index 329ef94..22cff45 100644 --- a/quantum/quantum_task.h +++ b/quantum/quantum_task.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -46,8 +45,6 @@ class Task : public ITaskContinuation, using WeakPtr = std::weak_ptr; enum class State : int { Running, Suspended, Terminated }; - - using CoroLocalStorage = std::unordered_map; template Task(std::false_type t, @@ -80,8 +77,9 @@ class Task : public ITaskContinuation, //ITask int run() final; void setQueueId(int queueId) final; - int getQueueId() final; + int getQueueId() const final; Type getType() const final; + TaskId getTaskId() const final; bool isBlocked() const final; bool isSleeping(bool updateTimer = false) final; bool isHighPriority() const final; @@ -97,9 +95,9 @@ class Task : public ITaskContinuation, //Returns a final or error handler task in the chain and in the process frees all //the subsequent continuation tasks ITaskContinuation::Ptr getErrorHandlerOrFinalTask() final; - + //Local storage accessors - CoroLocalStorage& getCoroLocalStorage(); + LocalStorage& getLocalStorage() final; ITaskAccessor::Ptr getTaskAccessor() const; //=================================== @@ -150,9 +148,10 @@ class Task : public ITaskContinuation, ITaskContinuation::Ptr _next; //Task scheduled to run after current completes. ITaskContinuation::WeakPtr _prev; //Previous task in the chain ITask::Type _type; + TaskId _taskId; std::atomic_bool _terminated; std::atomic_int _suspendedState; // stores values of State - CoroLocalStorage _coroLocalStorage; // local storage of the coroutine + ITask::LocalStorage _localStorage; // local storage of the coroutine }; using TaskPtr = Task::Ptr; diff --git a/quantum/quantum_task_id.h b/quantum/quantum_task_id.h new file mode 100644 index 0000000..f0d8ee5 --- /dev/null +++ b/quantum/quantum_task_id.h @@ -0,0 +1,103 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef BLOOMBERG_QUANTUM_TASK_ID_H +#define BLOOMBERG_QUANTUM_TASK_ID_H + +#include +#include +#include +#include + +namespace Bloomberg { +namespace quantum { + +struct CoroContextTag{}; +struct ThreadContextTag{}; + +// Fwd declarations +class TaskId; +namespace local { + TaskId taskId(); +} + +class TaskId +{ + friend class Task; + friend class IoTask; + friend std::ostream& operator<<(std::ostream&, const TaskId&); + friend TaskId local::taskId(); +public: + TaskId() = default; + + //Equality operators + bool operator==(const TaskId&) const; + bool operator!=(const TaskId&) const; + bool operator<(const TaskId&) const; + bool operator>(const TaskId&) const; + bool operator<=(const TaskId&) const; + bool operator>=(const TaskId&) const; + + /// @brief Produces a hash value suitable for unordered map insertions + /// @return A hash + size_t hashValue() const; + + /// @brief Get the id associated with this coroutine or IO task. + size_t id() const; + + /// @brief Get the thread id where the coroutine or IO task is executing. + std::thread::id threadId() const; + + /// @brief Checks if this id belongs to a coroutine. + /// @return True is it's a coroutine. + bool isCoroutine() const; + +protected: + struct ThisThreadTag{}; + /// @brief Create using current thread id + explicit TaskId(ThisThreadTag); + /// @brief Create a coroutine id. + explicit TaskId(CoroContextTag); + /// @brief Create an IO task id. + explicit TaskId(ThreadContextTag); + + void assignCurrentThread(); +private: + static ssize_t generate(); + + ssize_t _id{0}; //negative values reserved for coroutines + std::thread::id _threadId; +}; + +std::ostream& operator<<(std::ostream&, const TaskId&); + +} +} + +#include + +namespace std { + +template <> +struct hash +{ + size_t operator()(const Bloomberg::quantum::TaskId& rhs) const { + return rhs.hashValue(); + } +}; + +} + +#endif //BLOOMBERG_QUANTUM_TASK_ID_H diff --git a/quantum/quantum_task_queue.h b/quantum/quantum_task_queue.h index f283ca1..4ecde70 100644 --- a/quantum/quantum_task_queue.h +++ b/quantum/quantum_task_queue.h @@ -88,10 +88,6 @@ class TaskQueue : public IQueue const std::shared_ptr& getThread() const final; - static Task* getCurrentTask(); - - static void setCurrentTask(Task* task); - private: struct WorkItem { @@ -100,25 +96,18 @@ class TaskQueue : public IQueue bool isBlocked, unsigned int blockedQueueRound); - TaskPtr _task; // task pointer - TaskListIter _iter; // task iterator - bool _isBlocked; // true if the entire queue is blocked - unsigned int _blockedQueueRound; // blocked queue round id + TaskPtr _task; // task pointer + TaskListIter _iter; // task iterator + bool _isBlocked; // true if the entire queue is blocked + unsigned int _blockedQueueRound; // blocked queue round id }; struct ProcessTaskResult { ProcessTaskResult(bool isBlocked, unsigned int blockedQueueRound); - bool _isBlocked; // true if the entire queue is blocked - unsigned int _blockedQueueRound; // blocked queue round id - }; - struct CurrentTaskSetter - { - CurrentTaskSetter(TaskQueue& taskQueue, const TaskPtr & task); - ~CurrentTaskSetter(); - - TaskQueue& _taskQueue; + bool _isBlocked; // true if the entire queue is blocked + unsigned int _blockedQueueRound; // blocked queue round id }; //Coroutine result handlers bool handleNotCallable(const WorkItem& entry); diff --git a/quantum/quantum_thread_traits.h b/quantum/quantum_thread_traits.h index 5f4f84a..6cb178e 100644 --- a/quantum/quantum_thread_traits.h +++ b/quantum/quantum_thread_traits.h @@ -31,9 +31,10 @@ struct ThreadTraits { /// @brief Dictates how long any thread should sleep on blocking calls when interacting /// with coroutines (e.g. mutexes, condition variables, etc). - /// @return The modifiable sleep interval in microseconds. + /// @return The modifiable sleep interval in microseconds or milliseconds. /// @note: When set to 0, threads will yield() instead of sleeping which results in - /// increased performance at the detriment of higher CPU load. Default is 0ms. + /// increased performance at the detriment of higher CPU load. Default is to sleep for 10us. + /// @note: These two intervals are added together when sleeping. static std::chrono::milliseconds& yieldSleepIntervalMs() { static std::chrono::milliseconds value(0); diff --git a/quantum/util/impl/quantum_sequencer_impl.h b/quantum/util/impl/quantum_sequencer_impl.h index 0367e7b..a8b5dae 100644 --- a/quantum/util/impl/quantum_sequencer_impl.h +++ b/quantum/util/impl/quantum_sequencer_impl.h @@ -556,8 +556,8 @@ Sequencer::drain(std::chrono::millisecon ThreadFuturePtr future = promise->getIThreadFuture(); //enqueue a universal task and wait - enqueueAll([promise](VoidContextPtr)->int{ - return promise->set(0); + enqueueAll([promise](VoidContextPtr ctx)->int{ + return promise->set(ctx, 0); }); DrainGuard guard(_drain, !isFinal); diff --git a/quantum/util/impl/quantum_spinlock_util_impl.h b/quantum/util/impl/quantum_spinlock_util_impl.h new file mode 100644 index 0000000..0c3131a --- /dev/null +++ b/quantum/util/impl/quantum_spinlock_util_impl.h @@ -0,0 +1,219 @@ +/* +** Copyright 2020 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +//NOTE: DO NOT INCLUDE DIRECTLY + +//############################################################################################## +//#################################### IMPLEMENTATIONS ######################################### +//############################################################################################## + +#include +#include + +//Arch macros: https://sourceforge.net/p/predef/wiki/Architectures +#if defined(_MSC_VER) && \ + (defined(_M_IX86) || \ + defined(_M_IA64) || \ + defined(_M_X64)) +#include +#pragma intrinsic(_mm_pause) +#endif + +namespace Bloomberg { +namespace quantum { + +inline +void SpinLockUtil::lockExclusive(std::atomic_int &flag, + int exclusiveValue, + int unlockedValue) +{ + while (1) + { + spinWait(flag, exclusiveValue); + //Try acquiring the lock + int oldValue = unlockedValue; + if (!flag.compare_exchange_strong(oldValue, exclusiveValue, std::memory_order_acquire)) + { + backoff(); + } + else + { + //We obtained the lock + reset(); + break; + } + } +} + +inline +void SpinLockUtil::lockShared(std::atomic_int &flag, + int exclusiveValue, + int unlockedValue, + int sharedValue) +{ + while (1) + { + spinWait(flag, exclusiveValue); + //Try acquiring the lock + int oldValue = unlockedValue; + int newValue = sharedValue; + while (!flag.compare_exchange_weak(oldValue, newValue, std::memory_order_acquire)) + { + if (oldValue == exclusiveValue) + { + //lock is already taken + backoff(); + break; + } + newValue = oldValue + 1; + pauseCPU(); + } + if (oldValue < newValue) + { + //We obtained the lock + reset(); + break; + } + } +} + +inline +void SpinLockUtil::pauseCPU() +{ +#if defined(_MSC_VER) && \ + (defined(_M_IX86) || \ + defined(_M_IA64) || \ + defined(_M_X64)) + ::_mm_pause(); +#elif (defined(__GNUC__) || \ + defined(__llvm__)) && \ + (defined(__i386__) || \ + defined(__ia64__) || \ + defined(__x86_64__)) + __asm__ __volatile__( "pause" ::: "memory" ); +#else + __asm__ __volatile__( "rep;nop" ::: "memory" ); +#endif +} + +inline +void SpinLockUtil::reset() +{ + numYields() = 0; + numSpins() = 0; +} + +inline +void SpinLockUtil::yieldOrSleep() +{ + if (numYields() < SpinLockTraits::numYieldsBeforeSleep()) { + ++numYields(); + std::this_thread::yield(); + } + else { + assert(SpinLockTraits::sleepDuration() >= std::chrono::microseconds::zero()); + std::this_thread::sleep_for(SpinLockTraits::sleepDuration()); + } +} + +inline +void SpinLockUtil::backoff() +{ + using Distribution = std::uniform_int_distribution; + static thread_local std::mt19937_64 gen(std::random_device{}()); + static thread_local Distribution distribution; + if (numSpins() == 0) + { + //Initialize for the first time + assert(SpinLockTraits::minSpins() <= SpinLockTraits::maxSpins()); + if ((SpinLockTraits::backoffPolicy() == SpinLockTraits::BackoffPolicy::EqualStep) || + (SpinLockTraits::backoffPolicy() == SpinLockTraits::BackoffPolicy::Random)) + { + //Generate a number from the entire range + numSpins() = distribution(gen, Distribution::param_type + {SpinLockTraits::minSpins(), SpinLockTraits::maxSpins()}); + } + else + { + //Generate a number below min and add it to the min. + numSpins() = SpinLockTraits::minSpins() + + distribution(gen, Distribution::param_type + {0, SpinLockTraits::minSpins()}); + } + } + else if (numSpins() < SpinLockTraits::maxSpins()) + { + if (SpinLockTraits::backoffPolicy() == SpinLockTraits::BackoffPolicy::Linear) + { + numSpins() += numSpins(); + } + else if (SpinLockTraits::backoffPolicy() == SpinLockTraits::BackoffPolicy::Exponential) + { + numSpins() *= 2; + } + else if (SpinLockTraits::backoffPolicy() == SpinLockTraits::BackoffPolicy::Random) + { + //Generate a new value each time + numSpins() = distribution(gen, Distribution::param_type + {SpinLockTraits::minSpins(), SpinLockTraits::maxSpins()}); + } + //Check that we don't exceed max spins + if (numSpins() > SpinLockTraits::maxSpins()) + { + numSpins() = SpinLockTraits::maxSpins(); + } + } + //Spin + for (size_t i = 0; i < numSpins(); ++i) + { + pauseCPU(); + } +} + +inline +void SpinLockUtil::spinWait(std::atomic_int& flag, + int spinValue) +{ + size_t numIters = 0; + while (flag.load(std::memory_order_relaxed) == spinValue) + { + if (numIters < SpinLockTraits::maxSpins()) + { + ++numIters; + pauseCPU(); + } + else + { + //Yield or sleep the thread instead of spinning + yieldOrSleep(); + } + } +} + +inline +size_t& SpinLockUtil::numYields() +{ + static thread_local size_t _numYields = 0; + return _numYields; +} + +inline +size_t& SpinLockUtil::numSpins() +{ + static thread_local size_t _numSpins = 0; + return _numSpins; +} + +}} diff --git a/quantum/util/quantum_spinlock_util.h b/quantum/util/quantum_spinlock_util.h new file mode 100644 index 0000000..1c63aeb --- /dev/null +++ b/quantum/util/quantum_spinlock_util.h @@ -0,0 +1,46 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef QUANTUM_QUANTUM_SPINLOCK_UTIL_H +#define QUANTUM_QUANTUM_SPINLOCK_UTIL_H + +namespace Bloomberg { +namespace quantum { + +//Adapted from https://geidav.wordpress.com/tag/test-and-test-and-set/ +struct SpinLockUtil { + static void lockExclusive(std::atomic_int &flag, + int exclusiveValue, + int unlockedValue); + static void lockShared(std::atomic_int &flag, + int exclusiveValue, + int unlockedValue, + int sharedValue); + static void pauseCPU(); +private: + static void reset(); + static void yieldOrSleep(); + static void backoff(); + static void spinWait(std::atomic_int &flag, + int spinValue); + static size_t& numYields(); + static size_t& numSpins(); +}; + +}} + +#include + +#endif //QUANTUM_QUANTUM_SPINLOCK_UTIL_H diff --git a/tests/quantum_id_tests.cpp b/tests/quantum_id_tests.cpp new file mode 100644 index 0000000..e5c6684 --- /dev/null +++ b/tests/quantum_id_tests.cpp @@ -0,0 +1,151 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#include +#include +#include +#include + +using namespace Bloomberg; + +struct MockTaskId : public quantum::TaskId +{ + struct ThisThreadTag{}; + MockTaskId() : quantum::TaskId() {} + explicit MockTaskId(ThisThreadTag) : quantum::TaskId(quantum::TaskId::ThisThreadTag{}) {} + explicit MockTaskId(quantum::CoroContextTag t) : quantum::TaskId(t){} + explicit MockTaskId(quantum::ThreadContextTag t) : quantum::TaskId(t){} + void assign() { assignCurrentThread(); } +}; + +//============================================================================== +// TEST CASES +//============================================================================== +TEST(TaskId, DefaultInitialization) +{ + MockTaskId idMain; //empty + MockTaskId idMain2(MockTaskId::ThisThreadTag{}); //has main's thread id + MockTaskId idCoro(quantum::CoroContextTag{}); + MockTaskId idCoroCopy = idCoro; + idCoro.assign(); //set current thread id + MockTaskId idCoro2(quantum::CoroContextTag{}); + MockTaskId idIo(quantum::ThreadContextTag{}); + idIo.assign(); //set current thread id + MockTaskId idIo2(quantum::ThreadContextTag{}); + + EXPECT_NE(idMain, idMain2); + EXPECT_EQ(idMain2, quantum::local::taskId()); + EXPECT_NE(idCoro, idCoro2); + EXPECT_EQ(idCoro, idCoroCopy); //do not compare thread ids, only coroutine ids + EXPECT_NE(idIo, idIo2); + EXPECT_NE(idCoro, idIo); + EXPECT_NE(idCoro, idMain); + EXPECT_NE(idIo, idMain); + + //check if it's a coroutine + EXPECT_TRUE(idCoro.isCoroutine()); + EXPECT_FALSE(idMain.isCoroutine()); + EXPECT_FALSE(idIo.isCoroutine()); + + //check the thread ids + EXPECT_NE(idMain.threadId(), idCoro.threadId()); + EXPECT_EQ(idMain2.threadId(), idCoro.threadId()); + EXPECT_EQ(idCoro.threadId(), idIo.threadId()); + EXPECT_EQ(std::this_thread::get_id(), idCoro.threadId()); + EXPECT_EQ(std::thread::id(), idCoro2.threadId()); + EXPECT_EQ(0, idMain.id()); + EXPECT_NE(idCoro.id(), idCoro2.id()); + EXPECT_NE(idIo.id(), idIo2.id()); +} + +TEST(TaskId, Uniqueness) +{ + std::vector coro, io; + MockTaskId firstCoro(quantum::CoroContextTag{}); + for (int i = 0; i < 10; ++i) { + coro.emplace_back(MockTaskId(quantum::CoroContextTag{})); + } + //Check if all id's are different and are in decreasing order + for (const auto& id : coro) { + EXPECT_EQ(firstCoro.id()-1, id.id()); + firstCoro = id; + } + + //IO ids are in increasing order + MockTaskId firstIo(quantum::ThreadContextTag{}); + for (int i = 0; i < 10; ++i) { + io.emplace_back(MockTaskId(quantum::ThreadContextTag{})); + } + //Check if all id's are different and are in decreasing order + for (const auto& id : io) { + EXPECT_EQ(firstIo.id()+1, id.id()); + firstIo = id; + } +} + +TEST(TaskId, LocalContext) +{ + quantum::Configuration config; + config.setNumCoroutineThreads(2); + config.setNumIoThreads(2); + quantum::Dispatcher dispatcher(config); + std::vector coroIds, ioIds; + + auto coroFunc = [](quantum::VoidCoroContextPtr ctx) mutable->quantum::TaskId { + return ctx->taskId(); + }; + auto ioFunc = []() mutable->quantum::TaskId { + return quantum::local::taskId(); + }; + //Collect task ids + coroIds.emplace_back(dispatcher.post(0,false,coroFunc)->get()); + coroIds.emplace_back(dispatcher.post(0,false,coroFunc)->get()); + coroIds.emplace_back(dispatcher.post(1,false,coroFunc)->get()); + coroIds.emplace_back(dispatcher.post(1,false,coroFunc)->get()); + ioIds.emplace_back(dispatcher.postAsyncIo(0,false,ioFunc)->get()); + ioIds.emplace_back(dispatcher.postAsyncIo(0,false,ioFunc)->get()); + ioIds.emplace_back(dispatcher.postAsyncIo(1,false,ioFunc)->get()); + ioIds.emplace_back(dispatcher.postAsyncIo(1,false,ioFunc)->get()); + + //Compare + EXPECT_TRUE(coroIds[0].isCoroutine()); + for (size_t i = 1; i < coroIds.size(); ++i) { + EXPECT_TRUE(coroIds[i].isCoroutine()); + EXPECT_EQ(coroIds[i-1].id()-1, coroIds[i].id()); + } + //Make sure the executing thread id is properly set + EXPECT_EQ(coroIds[0].threadId(), coroIds[1].threadId()); + EXPECT_EQ(coroIds[2].threadId(), coroIds[3].threadId()); + EXPECT_NE(coroIds[0].threadId(), coroIds[2].threadId()); + EXPECT_NE(quantum::TaskId().threadId(), coroIds[0].threadId()); + + EXPECT_FALSE(ioIds[0].isCoroutine()); + for (size_t i = 1; i < ioIds.size(); ++i) { + EXPECT_FALSE(ioIds[i].isCoroutine()); + EXPECT_EQ(ioIds[i-1].id()+1, ioIds[i].id()); + } + //Make sure the executing thread id is properly set + EXPECT_EQ(ioIds[0].threadId(), ioIds[1].threadId()); + EXPECT_EQ(ioIds[2].threadId(), ioIds[3].threadId()); + EXPECT_NE(ioIds[0].threadId(), ioIds[2].threadId()); + EXPECT_NE(quantum::TaskId().threadId(), ioIds[0].threadId()); + + //Check hashing + std::unordered_set idSet; + idSet.insert(coroIds.begin(), coroIds.end()); + idSet.insert(ioIds.begin(), ioIds.end()); + EXPECT_EQ(coroIds.size() + ioIds.size(), idSet.size()); +} + diff --git a/tests/quantum_locks_tests.cpp b/tests/quantum_locks_tests.cpp new file mode 100644 index 0000000..3faec03 --- /dev/null +++ b/tests/quantum_locks_tests.cpp @@ -0,0 +1,502 @@ +/* +** Copyright 2018 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#include +#include +#include +#include +#include +#include + +using namespace Bloomberg::quantum; + +#ifdef BOOST_USE_VALGRIND + int spins = 100; +#else + int spins = 1000000; +#endif + int val = 0; + int numThreads = 20; + int numLockAcquires = 100; + +void runnable(SpinLock* exclusiveLock) { + int locksTaken = 0; + while (locksTaken < numLockAcquires) { + exclusiveLock->lock(); + locksTaken++; + val++; + std::this_thread::sleep_for(std::chrono::microseconds(500)); + exclusiveLock->unlock(); + } +} + +void runThreads(int num) +{ + SpinLock exclusiveLock; + //Create 50 threads + exclusiveLock.lock(); //lock it so that all threads block + std::vector> threads; + auto start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < numThreads; ++i) { + threads.push_back(std::make_shared(runnable, &exclusiveLock)); + } + exclusiveLock.unlock(); //unlock to start contention + for (auto& t : threads) { + t->join(); + } + auto end = std::chrono::high_resolution_clock::now(); + std::cout << "Total spin time " << num << ": " + << std::chrono::duration_cast(end-start).count() + << "ms" << std::endl; +} + +void spinlockSettings( + size_t min, + size_t max, + std::chrono::microseconds sleepUs, + size_t numYields, + int num, + int enable) +{ + if (enable == -1 || enable == num) { + val = 0; + SpinLockTraits::minSpins() = min; + SpinLockTraits::maxSpins() = max; + SpinLockTraits::numYieldsBeforeSleep() = numYields; + SpinLockTraits::sleepDuration() = sleepUs; + runThreads(num); + EXPECT_EQ(numThreads*numLockAcquires, val); + } +} + +TEST(Spinlock, Spinlock) +{ + int num = spins; + val = 0; + SpinLock spin; + std::thread t1([&, num]() mutable { + while (num--) { + SpinLock::Guard guard(spin); + ++val; + } + }); + std::thread t2([&, num]() mutable { + while (num--) { + SpinLock::Guard guard(spin); + --val; + } + }); + t1.join(); + t2.join(); + EXPECT_EQ(0, val); +} + +TEST(Spinlock, HighContention) +{ + val = 0; + int enable = -1; + int i = 0; + spinlockSettings(500, 10000, std::chrono::microseconds(100), 2, i++, enable); //0 + spinlockSettings(0, 20000, std::chrono::microseconds(100), 3, i++, enable); //1 + spinlockSettings(100, 5000, std::chrono::microseconds(200), 3, i++, enable); //2 + spinlockSettings(500, 200000, std::chrono::microseconds(0), 5, i++, enable); //3 + spinlockSettings(500, 20000, std::chrono::microseconds(1000), 0, i++, enable); //4 + spinlockSettings(500, 2000, std::chrono::microseconds(0), 0, i++, enable); //5 + spinlockSettings(0, 0, std::chrono::microseconds(10), 2000, i++, enable); //6 +} + +TEST(ReadWriteSpinLock, LockReadMultipleTimes) +{ + ReadWriteSpinLock spin; + EXPECT_EQ(0, spin.numReaders()); + EXPECT_FALSE(spin.isLocked()); + spin.lockRead(); + EXPECT_TRUE(spin.isLocked()); + EXPECT_EQ(1, spin.numReaders()); + spin.lockRead(); + EXPECT_TRUE(spin.isLocked()); + EXPECT_EQ(2, spin.numReaders()); + spin.unlockRead(); + spin.unlockRead(); + EXPECT_EQ(0, spin.numReaders()); + EXPECT_FALSE(spin.isLocked()); +} + +TEST(ReadWriteSpinLock, LockReadAndWrite) +{ + int num = spins; + int val = 0; + ReadWriteSpinLock spin; + std::thread t1([&, num]() mutable { + while (num--) { + ReadWriteSpinLock::ReadGuard guard(spin); + } + }); + std::thread t2([&, num]() mutable { + while (num--) { + ReadWriteSpinLock::ReadGuard guard(spin); + } + }); + std::thread t3([&, num]() mutable { + while (num--) { + ReadWriteSpinLock::ReadGuard guard(spin); + } + }); + std::thread t4([&, num]() mutable { + while (num--) { + ReadWriteSpinLock::WriteGuard guard(spin); + ++val; + } + }); + std::thread t5([&, num]() mutable { + while (num--) { + ReadWriteSpinLock::WriteGuard guard(spin); + --val; + } + }); + t1.join(); + t2.join(); + t3.join(); + t4.join(); + t5.join(); + EXPECT_EQ(0, val); +} + +TEST(ReadWriteSpinLock, LockReadAndWriteList) +{ + int num = spins; + std::list val; + ReadWriteSpinLock spin; + bool exit = false; + std::thread t1([&]() mutable { + while (!exit) { + ReadWriteSpinLock::ReadGuard guard(spin); + auto it = val.rbegin(); + if (it != val.rend()) --it; + } + }); + std::thread t2([&]() mutable { + while (!exit) { + ReadWriteSpinLock::ReadGuard guard(spin); + auto it = val.rbegin(); + if (it != val.rend()) --it; + } + }); + std::thread t3([&]() mutable { + while (!exit) { + ReadWriteSpinLock::ReadGuard guard(spin); + auto it = val.rbegin(); + if (it != val.rend()) --it; + } + }); + std::thread t4([&, num]() mutable { + while (num--) { + ReadWriteSpinLock::WriteGuard guard(spin); + val.push_back(1); + } + }); + std::thread t5([&, num]() mutable { + while (num) { + ReadWriteSpinLock::WriteGuard guard(spin); + if (!val.empty()) { + val.pop_back(); + --num; + } + } + }); + t4.join(); + t5.join(); + exit = true; + t1.join(); + t2.join(); + t3.join(); + EXPECT_EQ(0, val.size()); +} + +TEST(ReadWriteSpinLock, SingleLocks) +{ + ReadWriteSpinLock lock; + + EXPECT_FALSE(lock.isLocked()); + EXPECT_FALSE(lock.isReadLocked()); + EXPECT_FALSE(lock.isWriteLocked()); + EXPECT_EQ(0, lock.numReaders()); + + lock.lockRead(); + EXPECT_TRUE(lock.isLocked()); + EXPECT_TRUE(lock.isReadLocked()); + EXPECT_FALSE(lock.isWriteLocked()); + EXPECT_EQ(1, lock.numReaders()); + + lock.unlockRead(); + EXPECT_FALSE(lock.isLocked()); + EXPECT_FALSE(lock.isReadLocked()); + EXPECT_FALSE(lock.isWriteLocked()); + EXPECT_EQ(0, lock.numReaders()); + + lock.lockWrite(); + EXPECT_TRUE(lock.isLocked()); + EXPECT_FALSE(lock.isReadLocked()); + EXPECT_TRUE(lock.isWriteLocked()); + EXPECT_EQ(0, lock.numReaders()); +} + +TEST(ReadWriteSpinLock, UnlockingUnlockedIsNoOp) +{ + ReadWriteSpinLock lock; + + ASSERT_FALSE(lock.isLocked()); + + lock.unlockRead(); + EXPECT_FALSE(lock.isLocked()); + + lock.unlockWrite(); + EXPECT_FALSE(lock.isLocked()); +} + +TEST(ReadWriteSpinLock, TryLocks) +{ + ReadWriteSpinLock lock; + + ASSERT_FALSE(lock.isLocked()); + + EXPECT_TRUE(lock.tryLockRead()); + EXPECT_TRUE(lock.isReadLocked()); + EXPECT_FALSE(lock.tryLockWrite()); + + lock.unlockRead(); + EXPECT_TRUE (lock.tryLockWrite()); + EXPECT_TRUE(lock.isWriteLocked()); + + EXPECT_FALSE(lock.tryLockRead()); +} + +TEST(ReadWriteSpinLock, Guards) +{ + ReadWriteSpinLock lock; + + ASSERT_FALSE(lock.isLocked()); + + // ReadGuard + { + ReadWriteSpinLock::ReadGuard guard(lock); + EXPECT_TRUE(lock.isReadLocked()); + } + EXPECT_FALSE(lock.isLocked()); + + // ReadGuard, TryLock + { + ReadWriteSpinLock::ReadGuard guard(lock, ReadWriteSpinLock::TryToLock()); + EXPECT_TRUE(lock.isReadLocked()); + } + EXPECT_FALSE(lock.isLocked()); + + // WriteGuard + { + ReadWriteSpinLock::WriteGuard guard(lock); + EXPECT_TRUE(lock.isWriteLocked()); + } + EXPECT_FALSE(lock.isLocked()); + + // WriteGuard, TryLock + { + ReadWriteSpinLock::WriteGuard guard(lock, ReadWriteSpinLock::TryToLock()); + EXPECT_TRUE(lock.isWriteLocked()); + } + EXPECT_FALSE(lock.isLocked()); + + // ReadGuard, WriteGuard, TryLock (fails) + { + ReadWriteSpinLock::ReadGuard guard(lock); + EXPECT_TRUE(lock.isReadLocked()); + ReadWriteSpinLock::WriteGuard writeGuard(lock, ReadWriteSpinLock::TryToLock()); + EXPECT_FALSE(lock.isWriteLocked()); + } + EXPECT_FALSE(lock.isLocked()); + + // ReadGuard, unlock, WriteGuard + { + ReadWriteSpinLock::ReadGuard guard(lock); + EXPECT_TRUE(lock.isReadLocked()); + guard.unlock(); + EXPECT_FALSE(lock.isLocked()); + ReadWriteSpinLock::WriteGuard writeGuard(lock, ReadWriteSpinLock::TryToLock()); + EXPECT_TRUE(lock.isWriteLocked()); + } + EXPECT_FALSE(lock.isLocked()); + + // ReadGuard, unlock, WriteGuard, unlock + { + ReadWriteSpinLock::ReadGuard guard(lock); + EXPECT_TRUE(lock.isReadLocked()); + guard.unlock(); + EXPECT_FALSE(lock.isLocked()); + ReadWriteSpinLock::WriteGuard writeGuard(lock, ReadWriteSpinLock::TryToLock()); + EXPECT_TRUE(lock.isWriteLocked()); + writeGuard.unlock(); + EXPECT_FALSE(lock.isLocked()); + } + EXPECT_FALSE(lock.isLocked()); +} + +//============================================================================== +// READWRITEMUTEX TESTS +//============================================================================== + +TEST(ReadWriteMutex, SingleLocks) +{ + ReadWriteMutex mutex; + + EXPECT_FALSE(mutex.isLocked()); + EXPECT_FALSE(mutex.isReadLocked()); + EXPECT_FALSE(mutex.isWriteLocked()); + EXPECT_EQ(0, mutex.numReaders()); + + mutex.lockRead(); + EXPECT_TRUE(mutex.isLocked()); + EXPECT_TRUE(mutex.isReadLocked()); + EXPECT_FALSE(mutex.isWriteLocked()); + EXPECT_EQ(1, mutex.numReaders()); + + mutex.unlockRead(); + EXPECT_FALSE(mutex.isLocked()); + EXPECT_FALSE(mutex.isReadLocked()); + EXPECT_FALSE(mutex.isWriteLocked()); + EXPECT_EQ(0, mutex.numReaders()); + + mutex.lockWrite(); + EXPECT_TRUE(mutex.isLocked()); + EXPECT_FALSE(mutex.isReadLocked()); + EXPECT_TRUE(mutex.isWriteLocked()); + EXPECT_EQ(0, mutex.numReaders()); +} + +TEST(ReadWriteMutex, TryLocks) +{ + ReadWriteMutex mutex; + + ASSERT_FALSE(mutex.isLocked()); + + EXPECT_TRUE(mutex.tryLockRead()); + EXPECT_TRUE(mutex.isReadLocked()); + EXPECT_FALSE(mutex.tryLockWrite()); + EXPECT_TRUE(mutex.isReadLocked()); + + mutex.unlockRead(); + + EXPECT_TRUE(mutex.tryLockWrite()); + EXPECT_TRUE(mutex.isWriteLocked()); + EXPECT_FALSE(mutex.tryLockRead()); + EXPECT_FALSE(mutex.isReadLocked()); +} + +TEST(ReadWriteMutex, Guards) +{ + ReadWriteMutex mutex; + + ASSERT_FALSE(mutex.isLocked()); + + // ReadGuard + { + ReadWriteMutex::ReadGuard guard(mutex); + EXPECT_TRUE(mutex.isReadLocked()); + } + EXPECT_FALSE(mutex.isLocked()); + + // ReadGuard, TryLock + { + ReadWriteMutex::ReadGuard guard(mutex, ReadWriteMutex::TryToLock()); + EXPECT_TRUE(mutex.isReadLocked()); + } + EXPECT_FALSE(mutex.isLocked()); + + // WriteGuard + { + ReadWriteMutex::WriteGuard guard(mutex); + EXPECT_TRUE(mutex.isWriteLocked()); + } + EXPECT_FALSE(mutex.isLocked()); + + // WriteGuard, TryLock + { + ReadWriteMutex::WriteGuard guard(mutex, ReadWriteMutex::TryToLock()); + EXPECT_TRUE(mutex.isWriteLocked()); + } + EXPECT_FALSE(mutex.isLocked()); + + // ReadGuard, WriteGuard, TryLock (fails) + { + ReadWriteMutex::ReadGuard guard(mutex); + EXPECT_TRUE(mutex.isReadLocked()); + ReadWriteMutex::WriteGuard writeGuard(mutex, ReadWriteMutex::TryToLock()); + EXPECT_FALSE(mutex.isWriteLocked()); + } + EXPECT_FALSE(mutex.isLocked()); + + // ReadGuard, unlock, WriteGuard + { + ReadWriteMutex::ReadGuard guard(mutex); + EXPECT_TRUE(mutex.isReadLocked()); + guard.unlock(); + EXPECT_FALSE(mutex.isLocked()); + ReadWriteMutex::WriteGuard writeGuard(mutex, ReadWriteMutex::TryToLock()); + EXPECT_TRUE(mutex.isWriteLocked()); + } + EXPECT_FALSE(mutex.isLocked()); + + // ReadGuard, unlock, WriteGuard, unlock + { + ReadWriteMutex::ReadGuard guard(mutex); + EXPECT_TRUE(mutex.isReadLocked()); + guard.unlock(); + EXPECT_FALSE(mutex.isLocked()); + ReadWriteMutex::WriteGuard writeGuard(mutex, ReadWriteMutex::TryToLock()); + EXPECT_TRUE(mutex.isWriteLocked()); + writeGuard.unlock(); + EXPECT_FALSE(mutex.isLocked()); + } + EXPECT_FALSE(mutex.isLocked()); +} + +TEST(ReadWriteMutex, MultipleReadLocks) { + ReadWriteMutex mutex; + bool run = true; + + ASSERT_FALSE(mutex.isLocked()); + + auto getReadLock = [&]() { + ReadWriteMutex::ReadGuard guard(mutex); + while (run) {} + }; + + std::thread t1(getReadLock); + std::thread t2(getReadLock); + std::thread t3(getReadLock); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + EXPECT_TRUE(mutex.isLocked()); + EXPECT_TRUE(mutex.isReadLocked()); + EXPECT_EQ(3, mutex.numReaders()); + + run = false; + + t1.join(); + t2.join(); + t3.join(); + + EXPECT_FALSE(mutex.isLocked()); + EXPECT_EQ(0, mutex.numReaders()); +} diff --git a/tests/quantum_spinlocks_tests.cpp b/tests/quantum_spinlocks_tests.cpp deleted file mode 100644 index add3af1..0000000 --- a/tests/quantum_spinlocks_tests.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* -** Copyright 2018 Bloomberg Finance L.P. -** -** Licensed under the Apache License, Version 2.0 (the "License"); -** you may not use this file except in compliance with the License. -** You may obtain a copy of the License at -** -** http://www.apache.org/licenses/LICENSE-2.0 -** -** Unless required by applicable law or agreed to in writing, software -** distributed under the License is distributed on an "AS IS" BASIS, -** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -** See the License for the specific language governing permissions and -** limitations under the License. -*/ -#include -#include - -using namespace Bloomberg::quantum; - -#ifdef BOOST_USE_VALGRIND - int spins = 100; -#else - int spins = 1000000; -#endif - -TEST(Spinlock, Spinlock) -{ - int num = spins; - int val = 0; - SpinLock spin; - std::thread t1([&, num]() mutable { - while (num--) { - SpinLock::Guard guard(spin); - ++val; - } - }); - std::thread t2([&, num]() mutable { - while (num--) { - SpinLock::Guard guard(spin); - --val; - } - }); - t1.join(); - t2.join(); - EXPECT_EQ(0, val); -} - -TEST(ReadWriteSpinLock, LockReadMultipleTimes) -{ - ReadWriteSpinLock spin; - EXPECT_EQ(0, spin.numReaders()); - EXPECT_FALSE(spin.isLocked()); - spin.lockRead(); - EXPECT_TRUE(spin.isLocked()); - EXPECT_EQ(1, spin.numReaders()); - spin.lockRead(); - EXPECT_TRUE(spin.isLocked()); - EXPECT_EQ(2, spin.numReaders()); - spin.unlockRead(); - spin.unlockRead(); - EXPECT_EQ(0, spin.numReaders()); - EXPECT_FALSE(spin.isLocked()); -} - -TEST(ReadWriteSpinLock, LockReadAndWrite) -{ - int num = spins; - int val = 0; - ReadWriteSpinLock spin; - std::thread t1([&, num]() mutable { - while (num--) { - ReadWriteSpinLock::ReadGuard guard(spin); - } - }); - std::thread t2([&, num]() mutable { - while (num--) { - ReadWriteSpinLock::ReadGuard guard(spin); - } - }); - std::thread t3([&, num]() mutable { - while (num--) { - ReadWriteSpinLock::ReadGuard guard(spin); - } - }); - std::thread t4([&, num]() mutable { - while (num--) { - ReadWriteSpinLock::WriteGuard guard(spin); - ++val; - } - }); - std::thread t5([&, num]() mutable { - while (num--) { - ReadWriteSpinLock::WriteGuard guard(spin); - --val; - } - }); - t1.join(); - t2.join(); - t3.join(); - t4.join(); - t5.join(); - EXPECT_EQ(0, val); -} - -TEST(ReadWriteSpinLock, LockReadAndWriteList) -{ - int num = spins; - std::list val; - ReadWriteSpinLock spin; - bool exit = false; - std::thread t1([&]() mutable { - while (!exit) { - ReadWriteSpinLock::ReadGuard guard(spin); - auto it = val.rbegin(); - if (it != val.rend()) --it; - } - }); - std::thread t2([&]() mutable { - while (!exit) { - ReadWriteSpinLock::ReadGuard guard(spin); - auto it = val.rbegin(); - if (it != val.rend()) --it; - } - }); - std::thread t3([&]() mutable { - while (!exit) { - ReadWriteSpinLock::ReadGuard guard(spin); - auto it = val.rbegin(); - if (it != val.rend()) --it; - } - }); - std::thread t4([&, num]() mutable { - while (num--) { - ReadWriteSpinLock::WriteGuard guard(spin); - val.push_back(1); - } - }); - std::thread t5([&, num]() mutable { - while (num) { - ReadWriteSpinLock::WriteGuard guard(spin); - if (!val.empty()) { - val.pop_back(); - --num; - } - } - }); - t4.join(); - t5.join(); - exit = true; - t1.join(); - t2.join(); - t3.join(); - EXPECT_EQ(0, val.size()); -}