Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 105 additions & 60 deletions crates/rspack_core/src/utils/task_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use rspack_error::Result;
use rspack_util::ext::AsAny;
use tokio::{
sync::mpsc::{self, error::TryRecvError},
sync::mpsc::{self, error::TryRecvError, UnboundedReceiver, UnboundedSender},
task,
};
use tracing::Instrument;
Expand Down Expand Up @@ -51,78 +51,123 @@ pub trait Task<Ctx>: Debug + Send + Any + AsAny {
}
}

/// Run task loop
pub async fn run_task_loop<Ctx: 'static>(
ctx: &mut Ctx,
init_tasks: Vec<Box<dyn Task<Ctx>>>,
) -> Result<()> {
// create channel to receive async task result
let (tx, mut rx) = mpsc::unbounded_channel::<TaskResult<Ctx>>();
// mark whether the task loop has been returned
// the async task should not call `tx.send` after this mark to true
let is_expected_shutdown: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let mut queue: VecDeque<Box<dyn Task<Ctx>>> = VecDeque::from(init_tasks);
let mut active_task_count = 0;
loop {
let task = queue.pop_front();
if task.is_none() && active_task_count == 0 {
return Ok(());
struct TaskLoop<Ctx> {
/// Main tasks run sequentially in the queue
main_task_queue: VecDeque<Box<dyn Task<Ctx>>>,
/// The count of the running background tasks which run immediately in tokio thread workers when they are returned
background_task_count: u32,
/// Mark whether the task loop has been returned.
/// The async task should not call `tx.send` after this mark to true
is_expected_shutdown: Arc<AtomicBool>,
/// Used for sending async task results in background tasks
task_result_sender: UnboundedSender<TaskResult<Ctx>>,
/// Used for receiving async task results
task_result_receiver: UnboundedReceiver<TaskResult<Ctx>>,
}

impl<Ctx: 'static> TaskLoop<Ctx> {
fn new(init_main_tasks: Vec<Box<dyn Task<Ctx>>>) -> Self {
let (tx, rx) = mpsc::unbounded_channel::<TaskResult<Ctx>>();
Self {
main_task_queue: VecDeque::from(init_main_tasks),
is_expected_shutdown: Arc::new(AtomicBool::new(false)),
background_task_count: 0,
task_result_sender: tx,
task_result_receiver: rx,
}
}

async fn run_task_loop(
&mut self,
ctx: &mut Ctx,
init_background_tasks: Vec<Box<dyn Task<Ctx>>>,
) -> Result<()> {
for background_task in init_background_tasks {
self.spawn_background(background_task);
}

loop {
let task = self.main_task_queue.pop_front();

// If there's no main tasks and background tasksm
if task.is_none() && self.background_task_count == 0 {
return Ok(());
}

// Background tasks are launched as soon as they are returned, so we don't put them into the queue.
if let Some(task) = task {
debug_assert!(matches!(task.get_task_type(), TaskType::Main));
self.handle_task_result(task.main_run(ctx).await)?;
}

let data = if self.main_task_queue.is_empty() && self.background_task_count != 0 {
let res = self
.task_result_receiver
.recv()
.await
.expect("should recv success");
Ok(res)
} else {
self.task_result_receiver.try_recv()
};

if let Some(task) = task {
match task.get_task_type() {
TaskType::Background => {
let tx = tx.clone();
let is_expected_shutdown = is_expected_shutdown.clone();
active_task_count += 1;
tokio::spawn(task::unconstrained(
async move {
let r = task.background_run().await;
if !is_expected_shutdown.load(Ordering::Relaxed) {
tx.send(r).expect("failed to send task result");
}
}
.in_current_span(),
));
match data {
Ok(r) => {
self.background_task_count -= 1;
self.handle_task_result(r)?;
}
TaskType::Main => {
// merge sync task result directly
match task.main_run(ctx).await {
Ok(r) => queue.extend(r),
Err(e) => {
is_expected_shutdown.store(true, Ordering::Relaxed);
return Err(e);
}
}
Err(TryRecvError::Empty) => {}
_ => {
panic!("unexpected recv error")
}
}
}
}

let data = if queue.is_empty() && active_task_count != 0 {
let res = rx.recv().await.expect("should recv success");
Ok(res)
} else {
rx.try_recv()
};

match data {
Ok(r) => {
active_task_count -= 1;
// merge async task result
match r {
Ok(r) => queue.extend(r),
Err(e) => {
is_expected_shutdown.store(true, Ordering::Relaxed);
return Err(e);
/// Merge sync task result directly
fn handle_task_result(&mut self, result: TaskResult<Ctx>) -> Result<()> {
match result {
Ok(tasks) => {
for task in tasks {
match task.get_task_type() {
TaskType::Main => self.main_task_queue.push_back(task),
TaskType::Background => self.spawn_background(task),
}
}
Ok(())
}
Err(TryRecvError::Empty) => {}
_ => {
panic!("unexpected recv error")
Err(e) => {
self.is_expected_shutdown.store(true, Ordering::Relaxed);
Err(e)
}
}
}

fn spawn_background(&mut self, task: Box<dyn Task<Ctx>>) {
let tx = self.task_result_sender.clone();
let is_expected_shutdown = self.is_expected_shutdown.clone();
self.background_task_count += 1;
tokio::spawn(task::unconstrained(
async move {
let r = task.background_run().await;
if !is_expected_shutdown.load(Ordering::Relaxed) {
tx.send(r).expect("failed to send task result");
}
}
.in_current_span(),
));
}
}

pub async fn run_task_loop<Ctx: 'static>(
ctx: &mut Ctx,
init_tasks: Vec<Box<dyn Task<Ctx>>>,
) -> Result<()> {
let (background_tasks, main_tasks) = init_tasks
.into_iter()
.partition(|task| matches!(task.get_task_type(), TaskType::Background));
let mut task_loop = TaskLoop::new(main_tasks);
task_loop.run_task_loop(ctx, background_tasks).await
}

#[cfg(test)]
Expand Down
Loading