From 029faf148d3ad4e01594afe2c77df245f7725202 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Mon, 6 Jan 2025 14:50:13 -0500 Subject: [PATCH] Use intrusive_heap in timed_single_thread_context Use a heap to avoid possibly linear enqueue operation time. Fixes #648 --- include/unifex/detail/intrusive_heap.hpp | 8 ++- .../unifex/timed_single_thread_context.hpp | 21 +++++-- source/timed_single_thread_context.cpp | 58 +++++-------------- 3 files changed, 37 insertions(+), 50 deletions(-) diff --git a/include/unifex/detail/intrusive_heap.hpp b/include/unifex/detail/intrusive_heap.hpp index c9b01a901..153c07072 100644 --- a/include/unifex/detail/intrusive_heap.hpp +++ b/include/unifex/detail/intrusive_heap.hpp @@ -89,7 +89,12 @@ class intrusive_heap { } } - void remove(T* item) noexcept { + // Returns true if the item was in the heap + bool remove(T* item) noexcept { + if (item->*Prev == nullptr && item != head_) { + return false; + } + auto* prev = item->*Prev; auto* next = item->*Next; if (prev != nullptr) { @@ -101,6 +106,7 @@ class intrusive_heap { if (next != nullptr) { next->*Prev = prev; } + return true; } private: diff --git a/include/unifex/timed_single_thread_context.hpp b/include/unifex/timed_single_thread_context.hpp index e1cfd8a56..ab831727b 100644 --- a/include/unifex/timed_single_thread_context.hpp +++ b/include/unifex/timed_single_thread_context.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -46,7 +47,7 @@ struct task_base { timed_single_thread_context* const context_; task_base* next_ = nullptr; - task_base** prevNextPtr_ = nullptr; + task_base* prev_ = nullptr; execute_fn* execute_; time_point dueTime_; @@ -267,6 +268,11 @@ class scheduler { } // namespace _timed_single_thread_context class timed_single_thread_context { +public: + using clock_t = _timed_single_thread_context::clock_t; + using time_point = _timed_single_thread_context::time_point; + +private: using scheduler = _timed_single_thread_context::scheduler; using task_base = _timed_single_thread_context::task_base; using cancel_callback = _timed_single_thread_context::cancel_callback; @@ -283,16 +289,19 @@ class timed_single_thread_context { std::mutex mutex_; std::condition_variable cv_; - // Head of a linked-list in ascending order of due-time. - task_base* head_ = nullptr; + using schedule_heap = intrusive_heap< + task_base, + &task_base::next_, + &task_base::prev_, + time_point, + &task_base::dueTime_>; + + schedule_heap heap_; bool stop_ = false; std::thread thread_; public: - using clock_t = _timed_single_thread_context::clock_t; - using time_point = _timed_single_thread_context::time_point; - timed_single_thread_context(); ~timed_single_thread_context(); diff --git a/source/timed_single_thread_context.cpp b/source/timed_single_thread_context.cpp index 4e6979c84..cb79bd765 100644 --- a/source/timed_single_thread_context.cpp +++ b/source/timed_single_thread_context.cpp @@ -29,59 +29,38 @@ timed_single_thread_context::~timed_single_thread_context() { } thread_.join(); - UNIFEX_ASSERT(head_ == nullptr); + UNIFEX_ASSERT(heap_.empty()); } void timed_single_thread_context::enqueue(task_base* task) noexcept { std::lock_guard lock{mutex_}; - if (head_ == nullptr || task->dueTime_ < head_->dueTime_) { - // Insert at the head of the queue. - task->next_ = head_; - task->prevNextPtr_ = &head_; - if (head_ != nullptr) { - head_->prevNextPtr_ = &task->next_; - } - head_ = task; - - // New minimum due-time has changed, wake the thread. - cv_.notify_one(); + bool wasEmpty = heap_.empty(); + heap_.insert(task); + if (wasEmpty) { + cv_.notify_one(); } else { - auto* queuedTask = head_; - while (queuedTask->next_ != nullptr && - queuedTask->next_->dueTime_ <= task->dueTime_) { - queuedTask = queuedTask->next_; - } - - // Insert after queuedTask - task->prevNextPtr_ = &queuedTask->next_; - task->next_ = queuedTask->next_; - if (task->next_ != nullptr) { - task->next_->prevNextPtr_ = &task->next_; + auto now = clock_t::now(); + if (task->dueTime_ <= now) { + // New task is ready, wake the thread. + cv_.notify_one(); } - queuedTask->next_ = task; } + } void timed_single_thread_context::run() { std::unique_lock lock{mutex_}; while (!stop_) { - if (head_ != nullptr) { + if (!heap_.empty()) { auto now = clock_t::now(); - auto nextDueTime = head_->dueTime_; + auto nextDueTime = heap_.top()->dueTime_; if (nextDueTime <= now) { // Ready to run // Dequeue item - auto* task = head_; - head_ = task->next_; - if (head_ != nullptr) { - head_->prevNextPtr_ = &head_; - } - - // Flag the task as dequeued. - task->prevNextPtr_ = nullptr; + auto* task = heap_.pop(); lock.unlock(); task->execute(); @@ -104,15 +83,8 @@ void _timed_single_thread_context::cancel_callback::operator()() noexcept { if (now < task_->dueTime_) { task_->dueTime_ = now; - if (task_->prevNextPtr_ != nullptr) { - // Task is still in the queue, dequeue and requeue it. - - // Remove from the queue. - *task_->prevNextPtr_ = task_->next_; - if (task_->next_ != nullptr) { - task_->next_->prevNextPtr_ = task_->prevNextPtr_; - } - task_->prevNextPtr_ = nullptr; + if (task_->context_->heap_.remove(task_)) { + // Task was still in the queue, requeue it. lock.unlock(); // And requeue with an updated time.