-
Notifications
You must be signed in to change notification settings - Fork 172
feat: Spawn layout evaluation #2348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
308d804
6978c28
bea1dbc
29e1da2
c78ed3a
ef2bf80
322c72a
b4a5b08
a90fd9d
96e119d
aead48a
be57a8f
c744f90
bd4c6ff
6dbab42
defc115
45b39a1
fadcc6d
8a127ec
bf14120
539bacb
6032cbe
625789d
3e91ef8
8c568e9
d6381e4
cab3012
df579e9
78f4eb0
41aa3d1
a643a5d
b1133f1
169bea9
f210bbc
5e171f2
f3ecbb7
32067e1
ea6103c
a775f89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| #[cfg(feature = "tokio")] | ||
| mod tokio; | ||
|
|
||
| mod threads; | ||
|
|
||
| use std::future::Future; | ||
|
|
||
| use futures::future::BoxFuture; | ||
| pub use threads::*; | ||
| #[cfg(feature = "tokio")] | ||
| pub use tokio::*; | ||
| use vortex_error::VortexResult; | ||
|
|
||
| pub trait Spawn { | ||
| // Spawns a future to run on a different runtime. The returning future should be polled to ensure its running. | ||
| fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to return a result? Can it just be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don' think we want to return |
||
| where | ||
| F: Future + Send + 'static, | ||
| <F as Future>::Output: Send + 'static; | ||
| } | ||
|
|
||
| /// Generic wrapper around different async runtimes. Can be used to spawn futures to run in the background, concurrently with other tasks. | ||
| #[derive(Clone)] | ||
| pub enum Executor { | ||
| Threads(ThreadsExecutor), | ||
| #[cfg(feature = "tokio")] | ||
| Tokio(TokioExecutor), | ||
| } | ||
|
|
||
| #[async_trait::async_trait] | ||
| impl Spawn for Executor { | ||
| fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>> | ||
| where | ||
| F: Future + Send + 'static, | ||
| <F as Future>::Output: Send + 'static, | ||
| { | ||
| match self { | ||
| Executor::Threads(threads_executor) => threads_executor.spawn(f), | ||
| #[cfg(feature = "tokio")] | ||
| Executor::Tokio(tokio_executor) => tokio_executor.spawn(f), | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| use std::future::Future; | ||
| use std::num::NonZeroUsize; | ||
| use std::sync::Arc; | ||
|
|
||
| use futures::channel::oneshot; | ||
| use futures::future::BoxFuture; | ||
| use futures::{FutureExt as _, TryFutureExt as _}; | ||
| use vortex_error::{vortex_err, VortexResult}; | ||
|
|
||
| use super::Spawn; | ||
|
|
||
| trait Task { | ||
| fn run(self: Box<Self>); | ||
| } | ||
|
|
||
| struct ExecutorTask<F, R> { | ||
| task: F, | ||
| result: oneshot::Sender<R>, | ||
| } | ||
|
|
||
| impl<F, R> Task for ExecutorTask<F, R> | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. definitely not taken from @a10y 😇 |
||
| where | ||
| F: Future<Output = R> + Send, | ||
| R: Send, | ||
| { | ||
| fn run(self: Box<Self>) { | ||
| let Self { task, result } = *self; | ||
| futures::executor::block_on(async move { | ||
| let output = task.await; | ||
| _ = result.send(output); | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone, Default)] | ||
| pub struct ThreadsExecutor { | ||
| inner: Arc<Inner>, | ||
| } | ||
|
|
||
| impl ThreadsExecutor { | ||
| pub fn new(num_threads: NonZeroUsize) -> Self { | ||
| Self { | ||
| inner: Arc::new(Inner::new(num_threads)), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| struct Inner { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This structure is needed to achieve the behavior we want, where arbitrary futures can be spawned on this runtime. I've tried to structure it differently but I honestly think this is pretty nice, and we can definitely try and improve that later. |
||
| submitter: flume::Sender<Box<dyn Task + Send>>, | ||
| } | ||
|
|
||
| impl Default for Inner { | ||
| fn default() -> Self { | ||
| // Safety: | ||
| // 1 isn't 0 | ||
| Self::new(unsafe { NonZeroUsize::new_unchecked(1) }) | ||
| } | ||
| } | ||
|
|
||
| impl Inner { | ||
| fn new(num_threads: NonZeroUsize) -> Self { | ||
| let (tx, rx) = flume::unbounded::<Box<dyn Task + Send>>(); | ||
| (0..num_threads.get()).for_each(|_| { | ||
| let rx = rx.clone(); | ||
| std::thread::spawn(move || loop { | ||
| match rx.recv() { | ||
| Ok(task) => task.run(), | ||
| // we error if all senders dropped, which means we probably don't care about the task anymore, | ||
| // and we can break and let the thread end. | ||
| Err(_e) => break, | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| Self { submitter: tx } | ||
| } | ||
| } | ||
|
|
||
| impl Spawn for ThreadsExecutor { | ||
| fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>> | ||
| where | ||
| F: Future + Send + 'static, | ||
| <F as Future>::Output: Send + 'static, | ||
| { | ||
| let (tx, rx) = oneshot::channel(); | ||
| let task = Box::new(ExecutorTask { | ||
| task: f, | ||
| result: tx, | ||
| }); | ||
| self.inner | ||
| .submitter | ||
| .send(task) | ||
| .map_err(|e| vortex_err!("Failed to submit work: {e}"))?; | ||
| Ok(rx.map_err(|e| vortex_err!("Future canceled: {e}")).boxed()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| use std::future::Future; | ||
|
|
||
| use futures::future::BoxFuture; | ||
| use futures::{FutureExt, TryFutureExt}; | ||
| use tokio::runtime::Handle; | ||
| use vortex_error::{VortexError, VortexResult}; | ||
|
|
||
| use super::Spawn; | ||
|
|
||
| #[derive(Clone)] | ||
| pub struct TokioExecutor(Handle); | ||
|
|
||
| impl TokioExecutor { | ||
| pub fn new(handle: Handle) -> Self { | ||
| Self(handle) | ||
| } | ||
| } | ||
|
|
||
| #[async_trait::async_trait] | ||
| impl Spawn for TokioExecutor { | ||
| fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>> | ||
| where | ||
| F: Future + Send + 'static, | ||
| <F as Future>::Output: Send + 'static, | ||
| { | ||
| Ok(self.0.spawn(f).map_err(VortexError::from).boxed()) | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.