Skip to content

Commit

Permalink
Fix race condition in scheduler (#1763)
Browse files Browse the repository at this point in the history
lodoyun authored Oct 18, 2021
1 parent dd417f6 commit 10414fa
Showing 4 changed files with 40 additions and 9 deletions.
15 changes: 10 additions & 5 deletions erizo/src/erizo/thread/Scheduler.cpp
Original file line number Diff line number Diff line change
@@ -20,8 +20,8 @@ Scheduler::~Scheduler() {
assert(n_threads_servicing_queue_ == 0);
}

std::chrono::system_clock::time_point Scheduler::getFirstTime() {
return task_queue_.empty() ? std::chrono::system_clock::now() : task_queue_.begin()->first;
std::chrono::steady_clock::time_point Scheduler::getFirstTime() {
return task_queue_.empty() ? std::chrono::steady_clock::now() : task_queue_.begin()->first;
}

void Scheduler::serviceQueue() {
@@ -33,11 +33,16 @@ void Scheduler::serviceQueue() {
new_task_scheduled_.wait(lock);
}

std::chrono::system_clock::time_point time = getFirstTime();
std::chrono::steady_clock::time_point time = getFirstTime();
while (!stop_requested_ && !task_queue_.empty() &&
new_task_scheduled_.wait_until(lock, time) != std::cv_status::timeout) {
time = getFirstTime();
}
// check time again after acquiring lock to ensure the first task is due
time = getFirstTime();
if (std::chrono::steady_clock::now() < time) {
continue;
}
if (stop_requested_) {
break;
}
@@ -73,7 +78,7 @@ void Scheduler::stop(bool drain) {
group_.join_all();
}

void Scheduler::schedule(Scheduler::Function f, std::chrono::system_clock::time_point t) {
void Scheduler::schedule(Scheduler::Function f, std::chrono::steady_clock::time_point t) {
{
std::unique_lock<std::mutex> lock(new_task_mutex_);
// Pairs in this multimap are sorted by the Key value, so begin() will always point to the
@@ -84,7 +89,7 @@ void Scheduler::schedule(Scheduler::Function f, std::chrono::system_clock::time_
}

void Scheduler::scheduleFromNow(Scheduler::Function f, std::chrono::milliseconds delta_ms) {
schedule(f, std::chrono::system_clock::now() + delta_ms);
schedule(f, std::chrono::steady_clock::now() + delta_ms);
}

// TODO(javier): Make it possible to unschedule repeated tasks before enable this code
6 changes: 3 additions & 3 deletions erizo/src/erizo/thread/Scheduler.h
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ class Scheduler {

typedef boost::function<void(void)> Function;

void schedule(Function f, std::chrono::system_clock::time_point t);
void schedule(Function f, std::chrono::steady_clock::time_point t);

void scheduleFromNow(Function f, std::chrono::milliseconds delta_ms);

@@ -34,10 +34,10 @@ class Scheduler {

private:
void serviceQueue();
std::chrono::system_clock::time_point getFirstTime();
std::chrono::steady_clock::time_point getFirstTime();

private:
std::multimap<std::chrono::system_clock::time_point, Function> task_queue_;
std::multimap<std::chrono::steady_clock::time_point, Function> task_queue_;
std::condition_variable new_task_scheduled_;
mutable std::mutex new_task_mutex_;
std::atomic<int> n_threads_servicing_queue_;
26 changes: 26 additions & 0 deletions erizo/src/test/thread/SchedulerTest.cpp
Original file line number Diff line number Diff line change
@@ -70,6 +70,19 @@ TEST_P(SchedulerTest, execute_multiple_concurrent_tasks) {
EXPECT_THAT(counter, Eq(counter_limit));
}

TEST_P(SchedulerTest, timely_execute_multiple_concurrent_tasks) {
counter_limit = 2;
scheduleCounterIncrement(20);
scheduleCounterIncrement(20);
scheduleCounterIncrement(10000);
scheduleCounterIncrement(20000);

auto reason = wait_until(300);

EXPECT_THAT(reason, Not(Eq(std::cv_status::timeout)));
EXPECT_THAT(counter, Eq(counter_limit));
}

TEST_P(SchedulerTest, execute_tasks_on_different_times) {
counter_limit = 4;
scheduleCounterIncrement(20);
@@ -83,6 +96,19 @@ TEST_P(SchedulerTest, execute_tasks_on_different_times) {
EXPECT_THAT(counter, Eq(counter_limit));
}

TEST_P(SchedulerTest, timely_execute_tasks_on_different_times) {
counter_limit = 3;
scheduleCounterIncrement(20);
scheduleCounterIncrement(40);
scheduleCounterIncrement(100);
scheduleCounterIncrement(600);

auto reason = wait_until(500);

EXPECT_THAT(reason, Not(Eq(std::cv_status::timeout)));
EXPECT_THAT(counter, Eq(counter_limit));
}

TEST_P(SchedulerTest, stop_can_drain_scheduled_tasks) {
counter_limit = 2;
scheduleCounterIncrement(200);
2 changes: 1 addition & 1 deletion scripts/installMacDeps.sh
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ install_homebrew(){
}

install_brew_deps(){
brew install pkg-config cmake yasm gettext coreutils conan
brew install pkg-config glib cmake yasm gettext coreutils conan
install_nvm_node
nvm use
npm install

0 comments on commit 10414fa

Please sign in to comment.