Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ multi-threaded = []
[dependencies]
futures-lite = "2.0.1"
async-executor = "1.7.2"
async-channel = "2.1.0"
event-listener = "4.0.3"
async-io = { version = "2.0.0", optional = true }
async-task = "4.2.0"
concurrent-queue = "2.0.0"
Expand Down
18 changes: 8 additions & 10 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use async_task::FallibleTask;
use concurrent_queue::ConcurrentQueue;
use event_listener::Event;
use futures_lite::FutureExt;

use crate::{
Expand Down Expand Up @@ -115,7 +116,7 @@ pub struct TaskPool {

/// Inner state of the pool
threads: Vec<JoinHandle<()>>,
shutdown_tx: async_channel::Sender<()>,
shutdown: Arc<event_listener::Event>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for fully qualified path

}

impl TaskPool {
Expand All @@ -135,8 +136,7 @@ impl TaskPool {
}

fn new_internal(builder: TaskPoolBuilder) -> Self {
let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>();

let shutdown = Arc::new(Event::new());
let executor = Arc::new(async_executor::Executor::new());

let num_threads = builder
Expand All @@ -146,7 +146,7 @@ impl TaskPool {
let threads = (0..num_threads)
.map(|i| {
let ex = Arc::clone(&executor);
let shutdown_rx = shutdown_rx.clone();
let shutdown = shutdown.clone();

let thread_name = if let Some(thread_name) = builder.thread_name.as_deref() {
format!("{thread_name} ({i})")
Expand Down Expand Up @@ -177,11 +177,9 @@ impl TaskPool {
local_executor.tick().await;
}
};
block_on(ex.run(tick_forever.or(shutdown_rx.recv())))
block_on(ex.run(tick_forever.or(shutdown.listen())));
});
if let Ok(value) = res {
// Use unwrap_err because we expect a Closed error
value.unwrap_err();
if res.is_ok() {
break;
}
}
Expand All @@ -194,7 +192,7 @@ impl TaskPool {
Self {
executor,
threads,
shutdown_tx,
shutdown,
}
}

Expand Down Expand Up @@ -584,7 +582,7 @@ impl Default for TaskPool {

impl Drop for TaskPool {
fn drop(&mut self) {
self.shutdown_tx.close();
self.shutdown.notify(self.threads.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth trying usize::MAX here and seeing if it helps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already tried this above and it doesn't seem to work.


let panicking = thread::panicking();
for join_handle in self.threads.drain(..) {
Expand Down