From 9f1338ddde40f5e746a26493c5b58bad3f8f6fcb Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Fri, 14 Apr 2023 13:00:04 -0700 Subject: [PATCH] Add Task::yieldIfDue Adds a function to get a Task off thread after a minimum slice of running time. --- velox/exec/Driver.cpp | 5 +++-- velox/exec/Task.cpp | 18 +++++++++++++++++- velox/exec/Task.h | 15 +++++++++++++-- velox/exec/tests/DriverTest.cpp | 7 ++++++- 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8d7cf5a5d06..e8d58ea2075 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -281,9 +281,10 @@ StopReason Driver::runInternal( std::shared_ptr& self, std::shared_ptr& blockingState, RowVectorPtr& result) { - auto queuedTime = (getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000; + const auto now = getCurrentTimeMicro(); + const auto queuedTime = (now - queueTimeStartMicros_) * 1'000; // Update the next operator's queueTime. - auto stop = closed_ ? StopReason::kTerminate : task()->enter(state_); + auto stop = closed_ ? StopReason::kTerminate : task()->enter(state_, now); if (stop != StopReason::kNone) { if (stop == StopReason::kTerminate) { // ctx_ still has a reference to the Task. 'this' is not on diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 5bcebd6c90e..787d1b69a1d 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1890,7 +1890,7 @@ std::string Task::errorMessage() const { return errorMessageLocked(); } -StopReason Task::enter(ThreadState& state) { +StopReason Task::enter(ThreadState& state, uint64_t nowMicros) { std::lock_guard l(mutex_); VELOX_CHECK(state.isEnqueued); state.isEnqueued = false; @@ -1906,6 +1906,9 @@ StopReason Task::enter(ThreadState& state) { } if (reason == StopReason::kNone) { ++numThreads_; + if (numThreads_ == 1) { + onThreadSince_ = nowMicros; + } state.setThread(); state.hasBlockingFuture = false; } @@ -2006,6 +2009,19 @@ StopReason Task::shouldStop() { return StopReason::kNone; } +int32_t Task::yieldIfDue(uint64_t startTimeMicros) { + if (onThreadSince_ < startTimeMicros) { + std::lock_guard l(mutex_); + // Reread inside the mutex + if (onThreadSince_ < startTimeMicros && numThreads_ && !toYield_ && + !terminateRequested_ && !pauseRequested_) { + toYield_ = numThreads_; + return numThreads_; + } + } + return 0; +} + void Task::finishedLocked() { for (auto& promise : threadFinishPromises_) { promise.setValue(); diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 663cd2441e5..3f3842e3795 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -445,8 +445,10 @@ class Task : public std::enable_shared_from_this { /// Returns kNone if no pause or terminate is requested. The thread count is /// incremented if kNone is returned. If something else is returned the - /// calling thread should unwind and return itself to its pool. - StopReason enter(ThreadState& state); + /// calling thread should unwind and return itself to its pool. If 'this' goes + /// from no threads running to one thread running, sets 'onThreadSince_' to + /// 'nowMicros'. + StopReason enter(ThreadState& state, uint64_t nowMicros = 0); /// Sets the state to terminated. Returns kAlreadyOnThread if the /// Driver is running. In this case, the Driver will free resources @@ -511,6 +513,11 @@ class Task : public std::enable_shared_from_this { toYield_ = numThreads_; } + /// Requests yield if 'this' is running and has had at least one Driver on + /// thread since before 'startTimeMicros'. Returns the number of threads in + /// 'this' at the time of requesting yield. Returns 0 if yield not requested. + int32_t yieldIfDue(uint64_t startTimeMicros); + /// Once 'pauseRequested_' is set, it will not be cleared until /// task::resume(). It is therefore OK to read it without a mutex /// from a thread that this flag concerns. @@ -972,6 +979,10 @@ class Task : public std::enable_shared_from_this { std::atomic terminateRequested_{false}; std::atomic toYield_ = 0; int32_t numThreads_ = 0; + // Microsecond real time when 'this' last went from no threads to + // one thread running. Used to decide if continuous run should be + // interrupted by yieldIfDue(). + tsan_atomic onThreadSince_{0}; // Promises for the futures returned to callers of requestPause() or // terminate(). They are fulfilled when the last thread stops // running for 'this'. diff --git a/velox/exec/tests/DriverTest.cpp b/velox/exec/tests/DriverTest.cpp index 32fdee3503e..b83927953bc 100644 --- a/velox/exec/tests/DriverTest.cpp +++ b/velox/exec/tests/DriverTest.cpp @@ -203,7 +203,12 @@ class DriverTest : public OperatorTestBase { } else if (operation == ResultOperation::kTerminate) { cancelFuture_ = cursor->task()->requestAbort(); } else if (operation == ResultOperation::kYield) { - cursor->task()->requestYield(); + if (*counter % 2 == 0) { + auto time = getCurrentTimeMicro(); + cursor->task()->yieldIfDue(time - 10); + } else { + cursor->task()->requestYield(); + } } else if (operation == ResultOperation::kPause) { auto& executor = folly::QueuedImmediateExecutor::instance(); auto future = cursor->task()->requestPause().via(&executor);