Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
308d804
.
AdamGS Feb 13, 2025
6978c28
.
AdamGS Feb 13, 2025
bea1dbc
.
AdamGS Feb 13, 2025
29e1da2
enable feature
AdamGS Feb 13, 2025
c78ed3a
.
AdamGS Feb 13, 2025
ef2bf80
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
322c72a
nicer version
AdamGS Feb 14, 2025
b4a5b08
.
AdamGS Feb 14, 2025
a90fd9d
.
AdamGS Feb 14, 2025
96e119d
.
AdamGS Feb 14, 2025
aead48a
.
AdamGS Feb 14, 2025
be57a8f
some work, need to unifiy to one spawn
AdamGS Feb 14, 2025
c744f90
.
AdamGS Feb 14, 2025
bd4c6ff
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
6dbab42
.
AdamGS Feb 14, 2025
defc115
VortexResult::flatten -> unnest (#2361)
AdamGS Feb 14, 2025
45b39a1
.
AdamGS Feb 14, 2025
fadcc6d
.
AdamGS Feb 14, 2025
8a127ec
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
bf14120
.
AdamGS Feb 14, 2025
539bacb
.
AdamGS Feb 14, 2025
6032cbe
go back to higher default concurrency, not sure how to pick that number
AdamGS Feb 14, 2025
625789d
.
AdamGS Feb 14, 2025
3e91ef8
.
AdamGS Feb 14, 2025
8c568e9
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
d6381e4
.
AdamGS Feb 17, 2025
cab3012
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 17, 2025
df579e9
.
AdamGS Feb 17, 2025
78f4eb0
.
AdamGS Feb 17, 2025
41aa3d1
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 17, 2025
a643a5d
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
b1133f1
cleanup ScanExecutor, move some tests to rstest
AdamGS Feb 18, 2025
169bea9
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
f210bbc
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
5e171f2
Merge branch 'develop' into adamg/spawn-evaluate
gatesn Feb 18, 2025
f3ecbb7
typo
AdamGS Feb 18, 2025
32067e1
CR comments
AdamGS Feb 18, 2025
ea6103c
.
AdamGS Feb 18, 2025
a775f89
remove unused code
AdamGS Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store", "tokio"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-layout = { workspace = true }
vortex-layout = { workspace = true, features = ["tokio"] }
[features]
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"]

Expand Down
4 changes: 4 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener
use datafusion_common::Result as DFResult;
use futures::{FutureExt as _, StreamExt};
use object_store::{ObjectStore, ObjectStoreScheme};
use tokio::runtime::Handle;
use vortex_array::{ContextRef, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::{ExprRef, VortexExpr};
use vortex_file::executor::{TaskExecutor, TokioExecutor};
use vortex_file::{SplitBy, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

Expand Down Expand Up @@ -65,6 +67,7 @@ impl FileOpener for VortexFileOpener {
let object_store = self.object_store.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone();
let batch_size = self.batch_size;
let executor = TaskExecutor::Tokio(TokioExecutor::new(Handle::current()));

Ok(async move {
let vxf = VortexOpenOptions::file(read_at)
Expand All @@ -86,6 +89,7 @@ impl FileOpener for VortexFileOpener {
// but at the moment our scanner has too much overhead to process small
// batches efficiently.
.with_split_by(SplitBy::RowCount(8 * batch_size))
.with_task_executor(executor)
.into_array_stream()?
.map(move |array| {
let st = array?.into_struct()?;
Expand Down
26 changes: 0 additions & 26 deletions vortex-file/src/exec/inline.rs

This file was deleted.

23 changes: 0 additions & 23 deletions vortex-file/src/exec/mod.rs

This file was deleted.

51 changes: 0 additions & 51 deletions vortex-file/src/exec/mode.rs

This file was deleted.

31 changes: 0 additions & 31 deletions vortex-file/src/exec/tokio.rs

This file was deleted.

1 change: 0 additions & 1 deletion vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
//! buffers, and [cloud storage](vortex_io::ObjectStoreReadAt), can be used as the "linear and
//! contiguous memory".

pub mod exec;
mod file;
mod footer;
mod generic;
Expand Down
6 changes: 4 additions & 2 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ bit-vec = { workspace = true }
bytes = { workspace = true }
exponential-decay-histogram = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
flume = { workspace = true }
futures = { workspace = true, features = ["alloc", "executor"] }
itertools = { workspace = true }
log = { workspace = true }
pin-project-lite = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-error = { workspace = true, features = ["tokio"] }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-mask = { workspace = true }
vortex-scalar = { workspace = true }
tokio = { workspace = true, optional = true }

[dev-dependencies]
futures = { workspace = true, features = ["executor"] }
Expand Down
7 changes: 3 additions & 4 deletions vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use vortex_flatbuffers::{layout as fb, layout, FlatBufferRoot, WriteFlatBuffer};

use crate::context::LayoutContextRef;
use crate::reader::LayoutReader;
use crate::scan::ScanExecutor;
use crate::segments::SegmentId;
use crate::segments::{AsyncSegmentReader, SegmentId};
use crate::vtable::LayoutVTableRef;
use crate::LayoutId;

Expand Down Expand Up @@ -292,10 +291,10 @@ impl Layout {
/// Create a reader for this layout.
pub fn reader(
&self,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,
ctx: ContextRef,
) -> VortexResult<Arc<dyn LayoutReader + 'static>> {
self.encoding().reader(self.clone(), ctx, executor)
self.encoding().reader(self.clone(), ctx, segment_reader)
}

/// Register splits for this layout.
Expand Down
23 changes: 13 additions & 10 deletions vortex-layout/src/layouts/chunked/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ mod test {
use std::sync::Arc;

use futures::executor::block_on;
use rstest::{fixture, rstest};
use vortex_array::array::{BoolArray, ChunkedArray, ConstantArray};
use vortex_array::{IntoArray, IntoArrayVariant};
use vortex_buffer::buffer;
Expand All @@ -154,13 +155,14 @@ mod test {
use vortex_expr::{gt, lit, Identity};

use crate::layouts::chunked::writer::ChunkedLayoutWriter;
use crate::scan::ScanExecutor;
use crate::segments::test::TestSegments;
use crate::segments::AsyncSegmentReader;
use crate::writer::LayoutWriterExt;
use crate::{Layout, RowMask};

#[fixture]
/// Create a chunked layout with three chunks of primitive arrays.
fn chunked_layout() -> (Arc<ScanExecutor>, Layout) {
fn chunked_layout() -> (Arc<dyn AsyncSegmentReader>, Layout) {
let mut segments = TestSegments::default();
let layout = ChunkedLayoutWriter::new(
&DType::Primitive(PType::I32, NonNullable),
Expand All @@ -175,14 +177,14 @@ mod test {
],
)
.unwrap();
(ScanExecutor::inline(Arc::new(segments)), layout)
(Arc::new(segments), layout)
}

#[test]
fn test_chunked_evaluator() {
#[rstest]
fn test_chunked_evaluator(
#[from(chunked_layout)] (segments, layout): (Arc<dyn AsyncSegmentReader>, Layout),
) {
block_on(async {
let (segments, layout) = chunked_layout();

let result = layout
.reader(segments, Default::default())
.unwrap()
Expand All @@ -200,10 +202,11 @@ mod test {
})
}

#[test]
fn test_chunked_pruning_mask() {
#[rstest]
fn test_chunked_pruning_mask(
#[from(chunked_layout)] (segments, layout): (Arc<dyn AsyncSegmentReader>, Layout),
) {
block_on(async {
let (segments, layout) = chunked_layout();
let row_count = layout.row_count();
let reader = layout.reader(segments, Default::default()).unwrap();

Expand Down
6 changes: 3 additions & 3 deletions vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vortex_error::VortexResult;
use crate::data::Layout;
use crate::layouts::chunked::reader::ChunkedReader;
use crate::reader::{LayoutReader, LayoutReaderExt};
use crate::scan::ScanExecutor;
use crate::segments::AsyncSegmentReader;
use crate::vtable::LayoutVTable;
use crate::{LayoutId, CHUNKED_LAYOUT_ID};

Expand All @@ -33,9 +33,9 @@ impl LayoutVTable for ChunkedLayout {
&self,
layout: Layout,
ctx: ContextRef,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(ChunkedReader::try_new(layout, ctx, executor)?.into_arc())
Ok(ChunkedReader::try_new(layout, ctx, segment_reader)?.into_arc())
}

fn register_splits(
Expand Down
12 changes: 6 additions & 6 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_mask::Mask;
use crate::layouts::chunked::stats_table::StatsTable;
use crate::layouts::chunked::ChunkedLayout;
use crate::reader::LayoutReader;
use crate::scan::ScanExecutor;
use crate::segments::AsyncSegmentReader;
use crate::{ExprEvaluator, Layout, LayoutVTable, RowMask};

type PruningCache = Arc<OnceCell<Option<Mask>>>;
Expand All @@ -23,7 +23,7 @@ type PruningCache = Arc<OnceCell<Option<Mask>>>;
pub struct ChunkedReader {
layout: Layout,
ctx: ContextRef,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,

/// A cache of expr -> optional pruning result (applying the pruning expr to the stats table)
pruning_result: Arc<RwLock<HashMap<ExprRef, PruningCache>>>,
Expand All @@ -39,7 +39,7 @@ impl ChunkedReader {
pub(super) fn try_new(
layout: Layout,
ctx: ContextRef,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Self> {
if layout.encoding().id() != ChunkedLayout.id() {
vortex_panic!("Mismatched layout ID")
Expand Down Expand Up @@ -71,7 +71,7 @@ impl ChunkedReader {
Ok(Self {
layout,
ctx,
executor,
segment_reader,
pruning_result: Arc::new(RwLock::new(HashMap::new())),
stats_table: Arc::new(OnceCell::new()),
chunk_readers,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl ChunkedReader {
let stats_layout = self.layout.child(nchunks, stats_dtype.clone(), "stats")?;

let stats_array = stats_layout
.reader(self.executor.clone(), self.ctx.clone())?
.reader(self.segment_reader.clone(), self.ctx.clone())?
.evaluate_expr(
RowMask::new_valid_between(0, nchunks as u64),
Identity::new_expr(),
Expand Down Expand Up @@ -161,7 +161,7 @@ impl ChunkedReader {
let child_layout =
self.layout
.child(idx, self.layout.dtype().clone(), format!("[{}]", idx))?;
child_layout.reader(self.executor.clone(), self.ctx.clone())
child_layout.reader(self.segment_reader.clone(), self.ctx.clone())
})
}

Expand Down
Loading
Loading