Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,10 @@ StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>& blockingState,
RowVectorPtr& result) {
auto queuedTime = (getCurrentTimeMicro() - queueTimeStartMicros_) * 1'000;
const auto now = getCurrentTimeMicro();
const auto queuedTime = (now - queueTimeStartMicros_) * 1'000;
// Update the next operator's queueTime.
auto stop = closed_ ? StopReason::kTerminate : task()->enter(state_);
auto stop = closed_ ? StopReason::kTerminate : task()->enter(state_, now);
if (stop != StopReason::kNone) {
if (stop == StopReason::kTerminate) {
// ctx_ still has a reference to the Task. 'this' is not on
Expand Down
18 changes: 17 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ std::string Task::errorMessage() const {
return errorMessageLocked();
}

StopReason Task::enter(ThreadState& state) {
StopReason Task::enter(ThreadState& state, uint64_t nowMicros) {
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(state.isEnqueued);
state.isEnqueued = false;
Expand All @@ -1906,6 +1906,9 @@ StopReason Task::enter(ThreadState& state) {
}
if (reason == StopReason::kNone) {
++numThreads_;
if (numThreads_ == 1) {
onThreadSince_ = nowMicros;
}
state.setThread();
state.hasBlockingFuture = false;
}
Expand Down Expand Up @@ -2006,6 +2009,19 @@ StopReason Task::shouldStop() {
return StopReason::kNone;
}

int32_t Task::yieldIfDue(uint64_t startTimeMicros) {
if (onThreadSince_ < startTimeMicros) {
std::lock_guard<std::mutex> l(mutex_);
// Reread inside the mutex
if (onThreadSince_ < startTimeMicros && numThreads_ && !toYield_ &&
!terminateRequested_ && !pauseRequested_) {
toYield_ = numThreads_;
return numThreads_;
}
}
return 0;
}

void Task::finishedLocked() {
for (auto& promise : threadFinishPromises_) {
promise.setValue();
Expand Down
15 changes: 13 additions & 2 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,10 @@ class Task : public std::enable_shared_from_this<Task> {

/// Returns kNone if no pause or terminate is requested. The thread count is
/// incremented if kNone is returned. If something else is returned the
/// calling thread should unwind and return itself to its pool.
StopReason enter(ThreadState& state);
/// calling thread should unwind and return itself to its pool. If 'this' goes
/// from no threads running to one thread running, sets 'onThreadSince_' to
/// 'nowMicros'.
StopReason enter(ThreadState& state, uint64_t nowMicros = 0);

/// Sets the state to terminated. Returns kAlreadyOnThread if the
/// Driver is running. In this case, the Driver will free resources
Expand Down Expand Up @@ -511,6 +513,11 @@ class Task : public std::enable_shared_from_this<Task> {
toYield_ = numThreads_;
}

/// Requests yield if 'this' is running and has had at least one Driver on
/// thread since before 'startTimeMicros'. Returns the number of threads in
/// 'this' at the time of requesting yield. Returns 0 if yield not requested.
int32_t yieldIfDue(uint64_t startTimeMicros);

/// Once 'pauseRequested_' is set, it will not be cleared until
/// task::resume(). It is therefore OK to read it without a mutex
/// from a thread that this flag concerns.
Expand Down Expand Up @@ -972,6 +979,10 @@ class Task : public std::enable_shared_from_this<Task> {
std::atomic<bool> terminateRequested_{false};
std::atomic<int32_t> toYield_ = 0;
int32_t numThreads_ = 0;
// Microsecond real time when 'this' last went from no threads to
// one thread running. Used to decide if continuous run should be
// interrupted by yieldIfDue().
tsan_atomic<uint64_t> onThreadSince_{0};
// Promises for the futures returned to callers of requestPause() or
// terminate(). They are fulfilled when the last thread stops
// running for 'this'.
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,12 @@ class DriverTest : public OperatorTestBase {
} else if (operation == ResultOperation::kTerminate) {
cancelFuture_ = cursor->task()->requestAbort();
} else if (operation == ResultOperation::kYield) {
cursor->task()->requestYield();
if (*counter % 2 == 0) {
auto time = getCurrentTimeMicro();
cursor->task()->yieldIfDue(time - 10);
} else {
cursor->task()->requestYield();
}
} else if (operation == ResultOperation::kPause) {
auto& executor = folly::QueuedImmediateExecutor::instance();
auto future = cursor->task()->requestPause().via(&executor);
Expand Down