Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
308d804
.
AdamGS Feb 13, 2025
6978c28
.
AdamGS Feb 13, 2025
bea1dbc
.
AdamGS Feb 13, 2025
29e1da2
enable feature
AdamGS Feb 13, 2025
c78ed3a
.
AdamGS Feb 13, 2025
ef2bf80
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
322c72a
nicer version
AdamGS Feb 14, 2025
b4a5b08
.
AdamGS Feb 14, 2025
a90fd9d
.
AdamGS Feb 14, 2025
96e119d
.
AdamGS Feb 14, 2025
aead48a
.
AdamGS Feb 14, 2025
be57a8f
some work, need to unifiy to one spawn
AdamGS Feb 14, 2025
c744f90
.
AdamGS Feb 14, 2025
bd4c6ff
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
6dbab42
.
AdamGS Feb 14, 2025
defc115
VortexResult::flatten -> unnest (#2361)
AdamGS Feb 14, 2025
45b39a1
.
AdamGS Feb 14, 2025
fadcc6d
.
AdamGS Feb 14, 2025
8a127ec
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
bf14120
.
AdamGS Feb 14, 2025
539bacb
.
AdamGS Feb 14, 2025
6032cbe
go back to higher default concurrency, not sure how to pick that number
AdamGS Feb 14, 2025
625789d
.
AdamGS Feb 14, 2025
3e91ef8
.
AdamGS Feb 14, 2025
8c568e9
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
d6381e4
.
AdamGS Feb 17, 2025
cab3012
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 17, 2025
df579e9
.
AdamGS Feb 17, 2025
78f4eb0
.
AdamGS Feb 17, 2025
41aa3d1
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 17, 2025
a643a5d
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
b1133f1
cleanup ScanExecutor, move some tests to rstest
AdamGS Feb 18, 2025
169bea9
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
f210bbc
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
5e171f2
Merge branch 'develop' into adamg/spawn-evaluate
gatesn Feb 18, 2025
f3ecbb7
typo
AdamGS Feb 18, 2025
32067e1
CR comments
AdamGS Feb 18, 2025
ea6103c
.
AdamGS Feb 18, 2025
a775f89
remove unused code
AdamGS Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store", "tokio"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-layout = { workspace = true }
vortex-layout = { workspace = true, features = ["tokio"] }
[features]
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"]

Expand Down
4 changes: 4 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener
use datafusion_common::Result as DFResult;
use futures::{FutureExt as _, StreamExt};
use object_store::{ObjectStore, ObjectStoreScheme};
use tokio::runtime::Handle;
use vortex_array::{ContextRef, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::{ExprRef, VortexExpr};
use vortex_file::executor::{TaskExecutor, TokioExecutor};
use vortex_file::{SplitBy, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

Expand Down Expand Up @@ -65,6 +67,7 @@ impl FileOpener for VortexFileOpener {
let object_store = self.object_store.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone();
let batch_size = self.batch_size;
let executor = TaskExecutor::Tokio(TokioExecutor::new(Handle::current()));

Ok(async move {
let vxf = VortexOpenOptions::file(read_at)
Expand All @@ -86,6 +89,7 @@ impl FileOpener for VortexFileOpener {
// but at the moment our scanner has too much overhead to process small
// batches efficiently.
.with_split_by(SplitBy::RowCount(8 * batch_size))
.with_task_executor(executor)
.into_array_stream()?
.map(move |array| {
let st = array?.into_struct()?;
Expand Down
1 change: 1 addition & 0 deletions vortex-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rancor = { workspace = true, optional = true }
thiserror = { workspace = true }
url = { workspace = true }
worker = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["rt"] }

[lints]
workspace = true
8 changes: 8 additions & 0 deletions vortex-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ pub enum VortexError {
#[backtrace]
TryFromIntError,
),
/// A wrapper for Tokio join error.
#[cfg(feature = "tokio")]
#[error(transparent)]
JoinError(
#[from]
#[backtrace]
tokio::task::JoinError,
),
}

impl VortexError {
Expand Down
6 changes: 4 additions & 2 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ bit-vec = { workspace = true }
bytes = { workspace = true }
exponential-decay-histogram = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
flume = { workspace = true }
Comment thread
AdamGS marked this conversation as resolved.
futures = { workspace = true, features = ["alloc", "executor"] }
itertools = { workspace = true }
log = { workspace = true }
pin-project-lite = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-error = { workspace = true, features = ["tokio"] }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-mask = { workspace = true }
vortex-scalar = { workspace = true }
tokio = { workspace = true, optional = true }

[dev-dependencies]
futures = { workspace = true, features = ["executor"] }
Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/chunked/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ mod test {
],
)
.unwrap();
(ScanExecutor::inline(Arc::new(segments)), layout)
(ScanExecutor::new(Arc::new(segments)), layout)
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ExprEvaluator for FlatReader {
tasks.push(ScanTask::Expr(expr));
}

self.executor().evaluate(&array, &tasks).await
self.executor().evaluate(&array, &tasks)
}

async fn prune_mask(&self, row_mask: RowMask, _expr: ExprRef) -> VortexResult<RowMask> {
Expand Down Expand Up @@ -72,7 +72,7 @@ mod test {
.unwrap();

let result = layout
.reader(ScanExecutor::inline(Arc::new(segments)), Default::default())
.reader(ScanExecutor::new(Arc::new(segments)), Default::default())
.unwrap()
.evaluate_expr(
RowMask::new_valid_between(0, layout.row_count()),
Expand All @@ -98,7 +98,7 @@ mod test {

let expr = gt(Identity::new_expr(), lit(3i32));
let result = layout
.reader(ScanExecutor::inline(Arc::new(segments)), Default::default())
.reader(ScanExecutor::new(Arc::new(segments)), Default::default())
.unwrap()
.evaluate_expr(RowMask::new_valid_between(0, layout.row_count()), expr)
.await
Expand All @@ -123,7 +123,7 @@ mod test {
.unwrap();

let result = layout
.reader(ScanExecutor::inline(Arc::new(segments)), Default::default())
.reader(ScanExecutor::new(Arc::new(segments)), Default::default())
.unwrap()
.evaluate_expr(RowMask::new_valid_between(2, 4), ident())
.await
Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/flat/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ mod tests {
.unwrap();

let result = layout
.reader(ScanExecutor::inline(Arc::new(segments)), Default::default())
.reader(ScanExecutor::new(Arc::new(segments)), Default::default())
.unwrap()
.evaluate_expr(RowMask::new_valid_between(0, layout.row_count()), ident())
.await
Expand Down
3 changes: 1 addition & 2 deletions vortex-layout/src/layouts/struct_/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl ExprEvaluator for StructReader {

self.executor()
.evaluate(&root_scope, &[ScanTask::Expr(partitioned.root.clone())])
.await
}

async fn prune_mask(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<RowMask> {
Expand Down Expand Up @@ -128,7 +127,7 @@ mod tests {
.map(IntoArray::into_array)],
)
.unwrap();
(ScanExecutor::inline(Arc::new(segments)), layout)
(ScanExecutor::new(Arc::new(segments)), layout)
}

#[test]
Expand Down
49 changes: 49 additions & 0 deletions vortex-layout/src/scan/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#[cfg(feature = "tokio")]
mod tokio;

mod threads;

use std::future::Future;

use futures::future::BoxFuture;
pub use threads::*;
#[cfg(feature = "tokio")]
pub use tokio::*;
use vortex_error::VortexResult;

pub trait Executor {
/// Spawns a future to run on a different runtime. The shouldn't be polled to make progress.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix up doc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LMK what you think of the new one

fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to return a result? Can it just be BoxFuture<'static, VortexResult<..>>? Since the impl can always return ready(Err(...)) if it needs to.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don' think we want to return ready(Err(...)) here, but if we're ok with panicking in cases where we can't submit work/tasks start getting cancelled, we can get the nicer function signature.

where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static;
}

/// Generic wrapper around different async runtimes. Used to spawn async tasks to run in the background, concurrently with other tasks.
#[derive(Clone)]
pub enum TaskExecutor {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the performance hit is to just have a dyn Executor? Seems like a lot of code here for basically |f| tokio::spawn(f)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK if we can have dyn Executor here without a major change due to the generics

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would have to take / return BoxFuture. You already return one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its that much code (probably less than what happens in Arc), but in order to make it dyn-compatible we'll also have to move the generic bound from the function to the trait itself (example), which makes this thing less useful in other cases IMO.

Threads(ThreadsExecutor),
#[cfg(feature = "tokio")]
Tokio(TokioExecutor),
}

pub enum TaskExecutorMode {
Comment thread
AdamGS marked this conversation as resolved.
Outdated
Threads,
#[cfg(feature = "tokio")]
Tokio,
}

#[async_trait::async_trait]
impl Executor for TaskExecutor {
fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
match self {
TaskExecutor::Threads(threads_executor) => threads_executor.spawn(f),
#[cfg(feature = "tokio")]
TaskExecutor::Tokio(tokio_executor) => tokio_executor.spawn(f),
}
}
}
96 changes: 96 additions & 0 deletions vortex-layout/src/scan/executor/threads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::future::Future;
use std::num::NonZeroUsize;
use std::sync::Arc;

use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::{FutureExt as _, TryFutureExt as _};
use vortex_error::{vortex_err, VortexResult};

use super::Executor;

trait Task {
fn run(self: Box<Self>);
}

struct ExecutorTask<F, R> {
task: F,
result: oneshot::Sender<R>,
}

impl<F, R> Task for ExecutorTask<F, R>

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely not taken from @a10y 😇

where
F: Future<Output = R> + Send,
R: Send,
{
fn run(self: Box<Self>) {
let Self { task, result } = *self;
futures::executor::block_on(async move {
let output = task.await;
_ = result.send(output);
})
}
}

/// Multithreaded task executor, runs tasks on a dedicated thread pool.
#[derive(Clone, Default)]
pub struct ThreadsExecutor {
inner: Arc<Inner>,
}

impl ThreadsExecutor {
pub fn new(num_threads: NonZeroUsize) -> Self {
Self {
inner: Arc::new(Inner::new(num_threads)),
}
}
}

struct Inner {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure is needed to achieve the behavior we want, where arbitrary futures can be spawned on this runtime. I've tried to structure it differently but I honestly think this is pretty nice, and we can definitely try and improve that later.

submitter: flume::Sender<Box<dyn Task + Send>>,
}

impl Default for Inner {
fn default() -> Self {
// Safety:
// 1 isn't 0
Self::new(unsafe { NonZeroUsize::new_unchecked(1) })
}
}

impl Inner {
fn new(num_threads: NonZeroUsize) -> Self {
let (tx, rx) = flume::unbounded::<Box<dyn Task + Send>>();
(0..num_threads.get()).for_each(|_| {
let rx = rx.clone();
std::thread::spawn(move || {
// The channel errors if all senders are dropped, which means we probably don't care about the task anymore,
// and we can break and let the thread end.
while let Ok(task) = rx.recv() {
task.run()
}
});
});

Self { submitter: tx }
}
}

impl Executor for ThreadsExecutor {
fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let task = Box::new(ExecutorTask {
task: f,
result: tx,
});
self.inner
.submitter
.send(task)
.map_err(|e| vortex_err!("Failed to submit work: {e}"))?;
Ok(rx.map_err(|e| vortex_err!("Future canceled: {e}")).boxed())
}
}
29 changes: 29 additions & 0 deletions vortex-layout/src/scan/executor/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::future::Future;

use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use tokio::runtime::Handle;
use vortex_error::{VortexError, VortexResult};

use super::Executor;

/// Tokio-based async task executor, runs task on the provided runtime.
#[derive(Clone)]
pub struct TokioExecutor(Handle);

impl TokioExecutor {
pub fn new(handle: Handle) -> Self {
Self(handle)
}
}

#[async_trait::async_trait]
impl Executor for TokioExecutor {
fn spawn<F>(&self, f: F) -> VortexResult<BoxFuture<'static, VortexResult<F::Output>>>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
Ok(self.0.spawn(f).map_err(VortexError::from).boxed())
}
}
Loading