Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 49 additions & 47 deletions fml/message_loop_task_queues_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "flutter/fml/message_loop_task_queues.h"

#include <algorithm>
#include <cstdlib>
#include <thread>

#include "flutter/fml/synchronization/count_down_latch.h"
Expand Down Expand Up @@ -192,63 +194,63 @@ TEST(MessageLoopTaskQueue, QueueDoNotOwnUnmergedTaskQueueId) {
ASSERT_FALSE(task_queue->Owns(_kUnmerged, _kUnmerged));
}

// TODO(chunhtai): This unit-test is flaky and sometimes fails asynchronizely
// after the test has finished.
// https://github.com/flutter/flutter/issues/43858
TEST(MessageLoopTaskQueue, DISABLED_ConcurrentQueueAndTaskCreatingCounts) {
//------------------------------------------------------------------------------
/// Verifies that tasks can be added to task queues concurrently.
///
TEST(MessageLoopTaskQueue, ConcurrentQueueAndTaskCreatingCounts) {
auto task_queues = fml::MessageLoopTaskQueues::GetInstance();
const int base_queue_id = task_queues->CreateTaskQueue();

const int num_queues = 100;
std::atomic_bool created[num_queues * 3];
std::atomic_int num_tasks[num_queues * 3];
std::mutex task_count_mutex[num_queues * 3];
std::atomic_int done = 0;
// kThreadCount threads post kThreadTaskCount tasks each to kTaskQueuesCount
// task queues. Each thread picks a task queue randomly for each task.
constexpr size_t kThreadCount = 4;
constexpr size_t kTaskQueuesCount = 2;
constexpr size_t kThreadTaskCount = 500;

for (int i = 0; i < num_queues * 3; i++) {
num_tasks[i] = 0;
created[i] = false;
std::vector<TaskQueueId> task_queue_ids;
for (size_t i = 0; i < kTaskQueuesCount; ++i) {
task_queue_ids.emplace_back(task_queues->CreateTaskQueue());
}

auto creation_func = [&] {
for (int i = 0; i < num_queues; i++) {
fml::TaskQueueId queue_id = task_queues->CreateTaskQueue();
int limit = queue_id - base_queue_id;
created[limit] = true;

for (int cur_q = 1; cur_q < limit; cur_q++) {
if (created[cur_q]) {
std::scoped_lock counter(task_count_mutex[cur_q]);
int cur_num_tasks = rand() % 10;
for (int k = 0; k < cur_num_tasks; k++) {
task_queues->RegisterTask(
fml::TaskQueueId(base_queue_id + cur_q), [] {},
fml::TimePoint::Now());
}
num_tasks[cur_q] += cur_num_tasks;
}
}
ASSERT_EQ(task_queue_ids.size(), kTaskQueuesCount);

fml::CountDownLatch tasks_posted_latch(kThreadCount);

auto thread_main = [&]() {
for (size_t i = 0; i < kThreadTaskCount; i++) {
const auto current_task_queue_id =
task_queue_ids[std::rand() % kTaskQueuesCount];
const auto empty_task = []() {};
// The timepoint doesn't matter as the queue is never going to be drained.
const auto task_timepoint = fml::TimePoint::Now();

task_queues->RegisterTask(current_task_queue_id, empty_task,
task_timepoint);
}
done++;

tasks_posted_latch.CountDown();
};

std::thread creation_1(creation_func);
std::thread creation_2(creation_func);

while (done < 2) {
for (int i = 0; i < num_queues * 3; i++) {
if (created[i]) {
std::scoped_lock counter(task_count_mutex[i]);
int num_pending = task_queues->GetNumPendingTasks(
fml::TaskQueueId(base_queue_id + i));
int num_added = num_tasks[i];
ASSERT_EQ(num_pending, num_added);
}
}
std::vector<std::thread> threads;

for (size_t i = 0; i < kThreadCount; i++) {
threads.emplace_back(std::thread{thread_main});
}

creation_1.join();
creation_2.join();
ASSERT_EQ(threads.size(), kThreadCount);

for (size_t i = 0; i < kThreadCount; i++) {
threads[i].join();
}

// All tasks have been posted by now. Check that they are all pending.

size_t pending_tasks = 0u;
std::for_each(task_queue_ids.begin(), task_queue_ids.end(),
[&](const auto& queue) {
pending_tasks += task_queues->GetNumPendingTasks(queue);
});

ASSERT_EQ(pending_tasks, kThreadCount * kThreadTaskCount);
}

TEST(MessageLoopTaskQueue, RegisterTaskWakesUpOwnerQueue) {
Expand Down