Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use intrusive_heap in timed_single_thread_context #650

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/unifex/detail/intrusive_heap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ class intrusive_heap {
}
}

void remove(T* item) noexcept {
// Returns true if the item was in the heap
bool remove(T* item) noexcept {
if (item->*Prev == nullptr && item != head_) {
return false;
}

auto* prev = item->*Prev;
auto* next = item->*Next;
if (prev != nullptr) {
Expand All @@ -101,6 +106,7 @@ class intrusive_heap {
if (next != nullptr) {
next->*Prev = prev;
}
return true;
}

private:
Expand Down
21 changes: 15 additions & 6 deletions include/unifex/timed_single_thread_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <unifex/detail/intrusive_heap.hpp>

#include <chrono>
#include <condition_variable>
Expand All @@ -46,7 +47,7 @@ struct task_base {

timed_single_thread_context* const context_;
task_base* next_ = nullptr;
task_base** prevNextPtr_ = nullptr;
task_base* prev_ = nullptr;
execute_fn* execute_;
time_point dueTime_;

Expand Down Expand Up @@ -267,6 +268,11 @@ class scheduler {
} // namespace _timed_single_thread_context

class timed_single_thread_context {
public:
using clock_t = _timed_single_thread_context::clock_t;
using time_point = _timed_single_thread_context::time_point;

private:
using scheduler = _timed_single_thread_context::scheduler;
using task_base = _timed_single_thread_context::task_base;
using cancel_callback = _timed_single_thread_context::cancel_callback;
Expand All @@ -283,16 +289,19 @@ class timed_single_thread_context {
std::mutex mutex_;
std::condition_variable cv_;

// Head of a linked-list in ascending order of due-time.
task_base* head_ = nullptr;
using schedule_heap = intrusive_heap<
task_base,
&task_base::next_,
&task_base::prev_,
time_point,
&task_base::dueTime_>;

schedule_heap heap_;
bool stop_ = false;

std::thread thread_;

public:
using clock_t = _timed_single_thread_context::clock_t;
using time_point = _timed_single_thread_context::time_point;

timed_single_thread_context();
~timed_single_thread_context();

Expand Down
58 changes: 15 additions & 43 deletions source/timed_single_thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,59 +29,38 @@ timed_single_thread_context::~timed_single_thread_context() {
}
thread_.join();

UNIFEX_ASSERT(head_ == nullptr);
UNIFEX_ASSERT(heap_.empty());
}

void timed_single_thread_context::enqueue(task_base* task) noexcept {
std::lock_guard lock{mutex_};

if (head_ == nullptr || task->dueTime_ < head_->dueTime_) {
// Insert at the head of the queue.
task->next_ = head_;
task->prevNextPtr_ = &head_;
if (head_ != nullptr) {
head_->prevNextPtr_ = &task->next_;
}
head_ = task;

// New minimum due-time has changed, wake the thread.
cv_.notify_one();
bool wasEmpty = heap_.empty();
heap_.insert(task);
if (wasEmpty) {
cv_.notify_one();
} else {
auto* queuedTask = head_;
while (queuedTask->next_ != nullptr &&
queuedTask->next_->dueTime_ <= task->dueTime_) {
queuedTask = queuedTask->next_;
}

// Insert after queuedTask
task->prevNextPtr_ = &queuedTask->next_;
task->next_ = queuedTask->next_;
if (task->next_ != nullptr) {
task->next_->prevNextPtr_ = &task->next_;
auto now = clock_t::now();
if (task->dueTime_ <= now) {
// New task is ready, wake the thread.
cv_.notify_one();
}
queuedTask->next_ = task;
}

}

void timed_single_thread_context::run() {
std::unique_lock lock{mutex_};

while (!stop_) {
if (head_ != nullptr) {
if (!heap_.empty()) {
auto now = clock_t::now();
auto nextDueTime = head_->dueTime_;
auto nextDueTime = heap_.top()->dueTime_;
if (nextDueTime <= now) {
// Ready to run

// Dequeue item
auto* task = head_;
head_ = task->next_;
if (head_ != nullptr) {
head_->prevNextPtr_ = &head_;
}

// Flag the task as dequeued.
task->prevNextPtr_ = nullptr;
auto* task = heap_.pop();
lock.unlock();

task->execute();
Expand All @@ -104,15 +83,8 @@ void _timed_single_thread_context::cancel_callback::operator()() noexcept {
if (now < task_->dueTime_) {
task_->dueTime_ = now;

if (task_->prevNextPtr_ != nullptr) {
// Task is still in the queue, dequeue and requeue it.

// Remove from the queue.
*task_->prevNextPtr_ = task_->next_;
if (task_->next_ != nullptr) {
task_->next_->prevNextPtr_ = task_->prevNextPtr_;
}
task_->prevNextPtr_ = nullptr;
if (task_->context_->heap_.remove(task_)) {
// Task was still in the queue, requeue it.
lock.unlock();

// And requeue with an updated time.
Expand Down
Loading