Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spinlock improvements and TaskId #127

Merged
merged 1 commit into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,30 @@ addons:
packages:
- libboost1.70-dev
- valgrind
- gdb
env:
- CXXSTANDARD=11
- CXXSTANDARD=14
- CXXSTANDARD=17

after_failure:
- COREFILE=$(find $TRAVIS_BUILD_DIR/cores -maxdepth 1 -name "core.*" -print | head -1)
- echo $COREFILE
- if [[ -f "$COREFILE" ]]; then gdb -c "$COREFILE" $EXEC -ex "thread apply all bt" -ex "set pagination 0" -batch; fi

before_script:
- ulimit -c
- ulimit -a -S
- ulimit -a -H
- mkdir -p -m 0777 $TRAVIS_BUILD_DIR/cores
- sudo sysctl -w kernel.core_pattern=$TRAVIS_BUILD_DIR/cores/core.%e.%p
- cat /proc/sys/kernel/core_pattern
- git clone https://github.com/google/googletest.git
- cd googletest && cmake -DCMAKE_INSTALL_PREFIX=install .
- make install
- cd ..

script:
- cmake -DQUANTUM_ENABLE_TESTS=ON -DQUANTUM_BOOST_USE_VALGRIND=ON -DCMAKE_INSTALL_PREFIX=tests -DGTEST_ROOT=googletest/install .
- ulimit -c unlimited -S #enable core dumps
- cmake -DCMAKE-BUILD_TYPE=Debug -DQUANTUM_ENABLE_TESTS=ON -DQUANTUM_BOOST_USE_VALGRIND=ON -DCMAKE_INSTALL_PREFIX=tests -DGTEST_ROOT=googletest/install .
- make QuantumTests && ./tests/QuantumTests.Linux64
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ if (NOT CMAKE_CXX_STANDARD)
endif()
#Set compile flags if CXXFLAGS environment variable is not set
if (NOT CMAKE_CXX_FLAGS)
set(CMAKE_CXX_FLAGS "-Wall -Wextra -O0 -m${MODE} -std=c++${CMAKE_CXX_STANDARD} -ftemplate-backtrace-limit=0")
set(CMAKE_CXX_FLAGS "-Wall -Wextra -O0 -m${MODE} -std=c++${CMAKE_CXX_STANDARD} -ftemplate-backtrace-limit=0 -faligned-new")
endif()
if (QUANTUM_VERBOSE_MAKEFILE)
message(STATUS "CMAKE_CXX_FLAGS = ${CMAKE_CXX_FLAGS}")
Expand Down
41 changes: 12 additions & 29 deletions quantum/impl/quantum_condition_variable_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,18 @@ ConditionVariable::ConditionVariable() :
inline
ConditionVariable::~ConditionVariable()
{
Mutex::Guard lock(_thisLock);
Mutex::Guard lock(local::context(), _thisLock);
_destroyed = true;
}

inline
void ConditionVariable::notifyOne()
{
notifyOneImpl(nullptr);
notifyOne(nullptr);
}

inline
void ConditionVariable::notifyOne(ICoroSync::Ptr sync)
{
notifyOneImpl(sync);
}

inline
void ConditionVariable::notifyOneImpl(ICoroSync::Ptr sync)
{
//LOCKED OR UNLOCKED SCOPE
Mutex::Guard lock(sync, _thisLock);
Expand All @@ -64,17 +58,11 @@ void ConditionVariable::notifyOneImpl(ICoroSync::Ptr sync)
inline
void ConditionVariable::notifyAll()
{
notifyAllImpl(nullptr);
notifyAll(nullptr);
}

inline
void ConditionVariable::notifyAll(ICoroSync::Ptr sync)
{
notifyAllImpl(sync);
}

inline
void ConditionVariable::notifyAllImpl(ICoroSync::Ptr sync)
{
//LOCKED OR UNLOCKED SCOPE
Mutex::Guard lock(sync, _thisLock);
Expand All @@ -88,7 +76,7 @@ void ConditionVariable::notifyAllImpl(ICoroSync::Ptr sync)
inline
void ConditionVariable::wait(Mutex& mutex)
{
waitImpl(nullptr, mutex);
wait(nullptr, mutex);
}

inline
Expand All @@ -102,7 +90,7 @@ template <class PREDICATE>
void ConditionVariable::wait(Mutex& mutex,
PREDICATE predicate)
{
waitImpl(nullptr, mutex, std::move(predicate));
wait(nullptr, mutex, std::move(predicate));
}

template <class PREDICATE>
Expand All @@ -117,12 +105,7 @@ template <class REP, class PERIOD>
bool ConditionVariable::waitFor(Mutex& mutex,
const std::chrono::duration<REP, PERIOD>& time)
{
if (time == std::chrono::milliseconds(-1))
{
waitImpl(nullptr, mutex);
return true;
}
return waitForImpl(nullptr, mutex, time);
return waitFor(nullptr, mutex, time);
}

template <class REP, class PERIOD>
Expand All @@ -143,12 +126,7 @@ bool ConditionVariable::waitFor(Mutex& mutex,
const std::chrono::duration<REP, PERIOD>& time,
PREDICATE predicate)
{
if (time == std::chrono::milliseconds(-1))
{
waitImpl(nullptr, mutex, std::move(predicate));
return true;
}
return waitForImpl(nullptr, mutex, time, std::move(predicate));
return waitFor(nullptr, mutex, time, predicate);
}

template <class REP, class PERIOD, class PREDICATE>
Expand All @@ -157,6 +135,11 @@ bool ConditionVariable::waitFor(ICoroSync::Ptr sync,
const std::chrono::duration<REP, PERIOD>& time,
PREDICATE predicate)
{
if (time == std::chrono::milliseconds(-1))
{
waitImpl(sync, mutex, predicate);
return true;
}
return waitForImpl(sync, mutex, time, std::move(predicate));
}

Expand Down
68 changes: 37 additions & 31 deletions quantum/impl/quantum_context_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ template <class RET>
template <class V, class>
int ICoroContext<RET>::closeBuffer()
{
return static_cast<Impl*>(this)->closeBuffer();
std::shared_ptr<Impl> ctx = static_cast<Impl*>(this)->shared_from_this();
return ctx->closeBuffer(ctx);
}

template <class RET>
Expand Down Expand Up @@ -664,6 +665,12 @@ void Context<RET>::terminate()
}
}

template <class RET>
TaskId Context<RET>::taskId() const
{
return _task->getTaskId();
}

template <class RET>
bool Context<RET>::validAt(int num) const
{
Expand All @@ -679,7 +686,7 @@ bool Context<RET>::valid() const
template <class RET>
int Context<RET>::setException(std::exception_ptr ex)
{
return _promises.back()->setException(ex);
return _promises.back()->setException(this->shared_from_this(), ex);
}

template <class RET>
Expand Down Expand Up @@ -1109,18 +1116,11 @@ Context<RET>::mapReduceBatch(INPUT_IT first,
Functions::ReduceFunc<KEY, MAPPED_TYPE, REDUCED_TYPE>{std::move(reducer)});
}

template <class RET>
template <class V, class>
int Context<RET>::set(V&& value)
{
return std::static_pointer_cast<Promise<RET>>(_promises.back())->set(std::forward<V>(value));
}

template <class RET>
template <class V, class>
void Context<RET>::push(V&& value)
{
std::static_pointer_cast<Promise<RET>>(_promises.back())->push(std::forward<V>(value));
push(nullptr, std::forward<V>(value));
}

template <class RET>
Expand All @@ -1134,7 +1134,7 @@ template <class RET>
template <class V>
BufferRetType<V> Context<RET>::pull(bool& isBufferClosed)
{
return std::static_pointer_cast<Promise<RET>>(_promises.back())->getIThreadFuture()->pull(isBufferClosed);
return pull(nullptr, isBufferClosed);
}

template <class RET>
Expand All @@ -1148,73 +1148,79 @@ template <class RET>
template <class V, class>
int Context<RET>::closeBuffer()
{
return std::static_pointer_cast<Promise<RET>>(_promises.back())->closeBuffer();
return closeBuffer(nullptr);
}

template <class RET>
template <class V, class>
int Context<RET>::closeBuffer(ICoroSync::Ptr sync)
{
return std::static_pointer_cast<Promise<RET>>(_promises.back())->closeBuffer(sync);
}

template <class RET>
template <class OTHER_RET>
NonBufferRetType<OTHER_RET> Context<RET>::getAt(int num)
{
return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getIThreadFuture()->get();
return getAt<OTHER_RET>(num, nullptr);
}

template <class RET>
template <class OTHER_RET>
const NonBufferRetType<OTHER_RET>& Context<RET>::getRefAt(int num) const
{
return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getIThreadFuture()->getRef();
return getRefAt<OTHER_RET>(num, nullptr);
}

template <class RET>
template <class V>
NonBufferRetType<V> Context<RET>::get()
{
return getAt<RET>(-1);
return get(nullptr);
}

template <class RET>
template <class V>
const NonBufferRetType<V>& Context<RET>::getRef() const
{
return getRefAt<RET>(-1);
return getRef(nullptr);
}

template <class RET>
void Context<RET>::waitAt(int num) const
{
_promises[index(num)]->getIThreadFutureBase()->wait();
waitAt(num, nullptr);
}

template <class RET>
std::future_status Context<RET>::waitForAt(int num, std::chrono::milliseconds timeMs) const
{
return _promises[index(num)]->getIThreadFutureBase()->waitFor(timeMs);
return waitForAt(num, nullptr, timeMs);
}

template <class RET>
void Context<RET>::wait() const
{
waitAt(-1);
wait(nullptr);
}

template <class RET>
std::future_status Context<RET>::waitFor(std::chrono::milliseconds timeMs) const
{
return waitForAt(-1, timeMs);
return waitFor(nullptr, timeMs);
}

template <class RET>
void Context<RET>::waitAll() const
{
for (auto&& promise : _promises)
{
try
{
promise->getIThreadFutureBase()->wait();
}
catch(...) //catch all broken promises or any other exception
{}
}
waitAll(nullptr);
}

template <class RET>
template <class V, class>
int Context<RET>::set(V&& value)
{
return set(nullptr, std::forward<V>(value));
}

template <class RET>
Expand All @@ -1227,7 +1233,7 @@ int Context<RET>::set(ICoroSync::Ptr sync, V&& value)
template <class RET>
template <class OTHER_RET>
NonBufferRetType<OTHER_RET> Context<RET>::getAt(int num,
ICoroSync::Ptr sync)
ICoroSync::Ptr sync)
{
validateContext(sync);
return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getICoroFuture()->get(sync);
Expand All @@ -1236,7 +1242,7 @@ NonBufferRetType<OTHER_RET> Context<RET>::getAt(int num,
template <class RET>
template <class OTHER_RET>
const NonBufferRetType<OTHER_RET>& Context<RET>::getRefAt(int num,
ICoroSync::Ptr sync) const
ICoroSync::Ptr sync) const
{
validateContext(sync);
return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getICoroFuture()->getRef(sync);
Expand Down
15 changes: 5 additions & 10 deletions quantum/impl/quantum_future_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,30 +99,26 @@ template <class T>
template <class V>
NonBufferRetType<V> Future<T>::get()
{
if (!_sharedState) ThrowFutureException(FutureState::NoState);
return _sharedState->get();
return get(nullptr);
}

template <class T>
template <class V>
const NonBufferRetType<V>& Future<T>::getRef() const
{
if (!_sharedState) ThrowFutureException(FutureState::NoState);
return _sharedState->getRef();
return getRef(nullptr);
}

template <class T>
void Future<T>::wait() const
{
if (!_sharedState) ThrowFutureException(FutureState::NoState);
return _sharedState->wait();
return wait(nullptr);
}

template <class T>
std::future_status Future<T>::waitFor(std::chrono::milliseconds timeMs) const
{
if (!_sharedState) ThrowFutureException(FutureState::NoState);
return _sharedState->waitFor(timeMs);
return waitFor(nullptr, timeMs);
}

template <class T>
Expand Down Expand Up @@ -160,8 +156,7 @@ template <class T>
template <class V>
BufferRetType<V> Future<T>::pull(bool& isBufferClosed)
{
if (!_sharedState) ThrowFutureException(FutureState::NoState);
return _sharedState->pull(isBufferClosed);
return pull(nullptr, isBufferClosed);
}

template <class T>
Expand Down
2 changes: 2 additions & 0 deletions quantum/impl/quantum_io_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ void IoQueue::run()
}
}

// set the current task
IQueue::TaskSetterGuard taskSetter(*this, task);
//========================= START TASK =========================
int rc = task->run();
//========================== END TASK ==========================
Expand Down
Loading