Skip to content

Commit

Permalink
Merge pull request #2 from hymm/pool-intern-experiment
Browse files Browse the repository at this point in the history
Fix stealing
  • Loading branch information
james7132 authored May 19, 2022
2 parents 2f65f20 + fedb3a4 commit b6aa476
Showing 1 changed file with 19 additions and 37 deletions.
56 changes: 19 additions & 37 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::{
future::Future,
mem,
pin::Pin,
sync::Arc,
thread::{self, JoinHandle},
};

use futures_lite::{future, pin, FutureExt};
use futures_lite::{future, FutureExt};

use crate::{Task, TaskGroup};

Expand Down Expand Up @@ -303,8 +302,13 @@ impl TaskPool {
let shutdown_rx = shutdown_rx.clone();
make_thread_builder(&builder, "Compute", i)
.spawn(move || {
let future = async {
loop {
compute.tick().await;
}
};
// Use unwrap_err because we expect a Closed error
future::block_on(compute.run(shutdown_rx.recv())).unwrap_err();
future::block_on(shutdown_rx.recv().or(future)).unwrap_err();
})
.expect("Failed to spawn thread.")
})
Expand All @@ -322,7 +326,7 @@ impl TaskPool {
}
};
// Use unwrap_err because we expect a Closed error
future::block_on(future.or(shutdown_rx.recv())).unwrap_err();
future::block_on(shutdown_rx.recv().or(future)).unwrap_err();
})
.expect("Failed to spawn thread.")
})
Expand All @@ -341,7 +345,7 @@ impl TaskPool {
}
};
// Use unwrap_err because we expect a Closed error
future::block_on(future.or(shutdown_rx.recv())).unwrap_err();
future::block_on(shutdown_rx.recv().or(future)).unwrap_err();
})
.expect("Failed to spawn thread.")
})
Expand Down Expand Up @@ -436,12 +440,8 @@ impl TaskPool {

f(&mut scope);

if scope.spawned.is_empty() {
Vec::default()
} else if scope.spawned.len() == 1 {
vec![future::block_on(&mut scope.spawned[0])]
} else {
let fut = async move {
future::block_on(async move {
let get_results = async move {
let mut results = Vec::with_capacity(scope.spawned.len());
for task in scope.spawned {
results.push(task.await);
Expand All @@ -450,32 +450,14 @@ impl TaskPool {
results
};

// Pin the futures on the stack.
pin!(fut);

// SAFETY: This function blocks until all futures complete, so we do not read/write
// the data from futures outside of the 'scope lifetime. However,
// rust has no way of knowing this so we must convert to 'static
// here to appease the compiler as it is unable to validate safety.
let fut: Pin<&mut (dyn Future<Output = Vec<T>>)> = fut;
let fut: Pin<&'static mut (dyn Future<Output = Vec<T>> + 'static)> =
unsafe { mem::transmute(fut) };

// The thread that calls scope() will participate in driving tasks in the pool
// forward until the tasks that are spawned by this scope() call
// complete. (If the caller of scope() happens to be a thread in
// this thread pool, and we only have one thread in the pool, then
// simply calling future::block_on(spawned) would deadlock.)
let mut spawned = local_executor.spawn(fut);
loop {
if let Some(result) = future::block_on(future::poll_once(&mut spawned)) {
break result;
};

executor.try_tick();
local_executor.try_tick();
}
}
let tick_forever = async move {
loop {
local_executor.tick().or(executor.tick()).await;
}
};

get_results.or(tick_forever).await
})
})
}

Expand Down

0 comments on commit b6aa476

Please sign in to comment.