Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 87 additions & 0 deletions protos/filtered_read.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

syntax = "proto3";

package lance.datafusion;

import "table_identifier.proto";

message U64Range {
uint64 start = 1;
uint64 end = 2;
}

message ProjectionProto {
repeated int32 field_ids = 1;
bool with_row_id = 2;
bool with_row_addr = 3;
bool with_row_last_updated_at_version = 4;
bool with_row_created_at_version = 5;
BlobHandlingProto blob_handling = 6;
}

message BlobHandlingProto {
oneof mode {
// All blobs read as binary
bool all_binary = 1;
// Blobs as descriptions, other binary as binary (default)
bool blobs_descriptions = 2;
// All binary columns as descriptions
bool all_descriptions = 3;
// Specific blobs read as binary, rest as descriptions (non-blob binary stays binary)
FieldIdSet some_blobs_binary = 4;
// Specific columns as binary, all other binary as descriptions
FieldIdSet some_binary = 5;
}
}

message FieldIdSet {
repeated uint32 field_ids = 1;
}

message FilteredReadThreadingModeProto {
oneof mode {
uint64 one_partition_multiple_threads = 1;
uint64 multiple_partitions = 2;
}
}

// Serializable form of FilteredReadOptions.
message FilteredReadOptionsProto {
optional U64Range scan_range_before_filter = 1;
optional U64Range scan_range_after_filter = 2;
bool with_deleted_rows = 3;
optional uint32 batch_size = 4;
optional uint64 fragment_readahead = 5;
repeated uint64 fragment_ids = 6;
ProjectionProto projection = 7;
optional bytes refine_filter_substrait = 8;
optional bytes full_filter_substrait = 9;
FilteredReadThreadingModeProto threading_mode = 10;
optional uint64 io_buffer_size_bytes = 11;
// Arrow IPC schema for decoding Substrait filters (may be wider than projection).
optional bytes filter_schema_ipc = 12;
}

// Serializable form of FilteredReadPlan (planned/distributed mode).
// RowAddrTreeMap serialized via its built-in serialize_into/deserialize_from.
// Per-fragment filters are Substrait-encoded and deduplicated.
message FilteredReadPlanProto {
bytes row_addr_tree_map = 1;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR does it make sense to make this a oneof over bytes and repeated U64Range? Then we could just use the smaller representation on serialization (easy computation).

In my plan_splits PR I merged the FilteredReadPlan and FilteredReadIntenralPlan because IIUC the only reason we have differentiation is that serializing a bitmap is smaller than ranges. I would challenge that under the intuition (or maybe "hope") that we have many dense splits where the seriaization is actually smaller for ranges (ex. all 4k values a range is 16 bytes (u64 start, u64 end) whereas a bitmap is 4k / 8 = 500 bytes). If this is the case, then we would do a series of inefficient serializations to move to bitmap for every query.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that having FilteredReadPlan as a outside plan is mainly not for serialization / deserialization. I keep FilteredReadPlan with external plan using RoaringBitmap for the following reasons

  • Range is ambiguous in Lance. Most user-facing APIs use logical rows (without deletions). Vec<Range> here means physical rows (with deletions). This is confusing. RowAddrTreeMap/RoaringBitmap avoids that ambiguity because it's clearly a set of physical row addresses, not a "range" that could be misread as logical.
  • Ranges fragment badly with deletions. A dataset with scattered deletions turns a single range 0..4096 into many small ranges. The Vec<Range> grows proportionally to the number of deletion holes. RoaringBitmap handles this natively with compressed containers.
  • Bitmap enables efficient row unselection. Whether it's 4K row group pruning or custom index results, you need to remove specific rows from the selection. Flipping bits is O(1); splitting ranges is O(n) and produces more ranges.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't have a good sense for where the trade-offs are for the two approaches but it sounds like you've both thought about it. My preference is to move forward with what we've got written and revisit if serialization time seems to be a bottleneck?

optional U64Range scan_range_after_filter = 2;
// Arrow IPC schema for decoding Substrait filters (matches the schema used at encode time).
optional bytes filter_schema_ipc = 3;
// Deduplicated filter storage: frag_id → index into filter_expressions.
map<uint32, uint32> fragment_filter_ids = 4;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what the key and value represent? I think the key is fragment id and the value is an index into filter_expressions? Also, I assume that multiple keys can share the same value?

// Unique Substrait-encoded filter expressions (indexed by fragment_filter_ids values).
repeated bytes filter_expressions = 5;
}

// Top-level wrapper for FilteredReadExec serialization.
message FilteredReadExecProto {
TableIdentifier table = 1;
FilteredReadOptionsProto options = 2;
optional FilteredReadPlanProto plan = 3;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is optional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilteredReadExec can run in two modes:

  1. Plan-then-execute (distributed): The planner creates a FilteredReadPlan and sends it to a remote executor. plan is Some.
  2. Plan-and-execute (local): The executor creates the plan itself at execution time. No pre-computed plan is needed. plan is None. I can only think of the case that you know if one executor has good locality for a dataset, you probably send the whole plan to let it execute?

// index_input (child plan) handled by DataFusion's codec via inputs[]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure what this means

@LuQQiu LuQQiu Feb 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Top-level wrapper for FilteredReadExec serialization.
message FilteredReadExecProto {
TableIdentifier table = 1;
FilteredReadOptionsProto options = 2;
optional FilteredReadPlanProto plan = 3;
// Note: FilteredReadExec.index_input (child ExecutionPlan) is NOT serialized here.
// DataFusion's PhysicalExtensionCodec serializes child plans separately via the
// plan tree traversal (children() / with_new_children()), and passes them back
// as the inputs parameter in try_decode.
}

basically datafusion will give the deserialized index input to FilteredReadExec when constructing it, we don't need to deal with that.
Oh but if the index_input is also need to be sent through the network, then those execs will also need to be seralizable. I could add a comment there.

PhysicalExtensionCodec.try_encode, try_decode method is also needed ... 

}
18 changes: 18 additions & 0 deletions protos/table_identifier.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

syntax = "proto3";

package lance.datafusion;

// Identifies a Lance dataset for remote reconstruction.
//
// Two modes:
// 1. uri + serialized_manifest (fast): remote executor skips manifest read.
// 2. uri + version + etag (lightweight): remote executor loads manifest from storage.
message TableIdentifier {
string uri = 1;
uint64 version = 2;
optional string manifest_etag = 3;
optional bytes serialized_manifest = 4;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 do you think we need to serialize the manifest_etag and even send the serialized manifest itself?

}
1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ impl Projectable for Schema {
}

/// Specifies how to handle blob columns when projecting
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq)]
pub enum BlobHandling {
/// Read all blobs as binary
AllBinary,
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ snafu.workspace = true
tokio.workspace = true
tracing.workspace = true

[build-dependencies]
prost-build.workspace = true
protobuf-src = {version = "2.1", optional = true}

[dev-dependencies]
lance-datagen.workspace = true

[features]
substrait = ["dep:datafusion-substrait"]
protoc = ["dep:protobuf-src"]

[lints]
workspace = true
25 changes: 25 additions & 0 deletions rust/lance-datafusion/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::io::Result;

fn main() -> Result<()> {
println!("cargo:rerun-if-changed=protos");

#[cfg(feature = "protoc")]
// Use vendored protobuf compiler if requested.
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut prost_build = prost_build::Config::new();
prost_build.protoc_arg("--experimental_allow_proto3_optional");
prost_build.enable_type_names();
prost_build.compile_protos(
&[
"./protos/table_identifier.proto",
"./protos/filtered_read.proto",
],
&["./protos"],
)?;

Ok(())
}
1 change: 1 addition & 0 deletions rust/lance-datafusion/protos
11 changes: 11 additions & 0 deletions rust/lance-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ pub mod expr;
pub mod logical_expr;
pub mod planner;
pub mod projection;
pub mod pb {
#![allow(clippy::all)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(unused)]
#![allow(improper_ctypes)]
#![allow(clippy::upper_case_acronyms)]
#![allow(clippy::use_self)]
include!(concat!(env!("OUT_DIR"), "/lance.datafusion.rs"));
}
pub mod spill;
pub mod sql;
#[cfg(feature = "substrait")]
Expand Down
27 changes: 26 additions & 1 deletion rust/lance-datafusion/src/substrait.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_schema::Schema as ArrowSchema;
use arrow_schema::{DataType, Schema as ArrowSchema};
use datafusion::{execution::SessionState, logical_expr::Expr};

use crate::aggregate::Aggregate;
Expand All @@ -27,6 +27,31 @@ use snafu::location;
use std::collections::HashMap;
use std::sync::Arc;

/// Substrait doesn't yet support all data types.
fn is_substrait_compatible(data_type: &DataType) -> bool {
match data_type {
DataType::Null | DataType::FixedSizeList(_, _) | DataType::Float16 => false,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace do you know if this is still the limitation for substrait?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this is correct though maybe there is support for Float16 now in DF if I recall correctly.

DataType::List(inner) => is_substrait_compatible(inner.data_type()),
DataType::Struct(fields) => fields
.iter()
.all(|f| is_substrait_compatible(f.data_type())),
_ => true,
}
}

/// Removes top-level fields that contain data types that Substrait
/// is not capable of serializing.
pub fn prune_schema_for_substrait(schema: &ArrowSchema) -> ArrowSchema {
ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| is_substrait_compatible(f.data_type()))
.cloned()
.collect::<Vec<_>>(),
)
}

/// Convert a DF Expr into a Substrait ExtendedExpressions message
///
/// The schema needs to contain all of the fields that are referenced in the expression.
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/io/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

mod filter;
pub mod filtered_read;
#[cfg(feature = "substrait")]
pub mod filtered_read_proto;
pub mod fts;
pub(crate) mod knn;
mod optimizer;
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/src/io/exec/filtered_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,11 @@ impl FilteredReadExec {
pub fn index_input(&self) -> Option<&Arc<dyn ExecutionPlan>> {
self.index_input.as_ref()
}

/// Return the pre-computed plan if one exists, without triggering initialization.
pub fn plan(&self) -> Option<FilteredReadPlan> {
self.plan.get().map(|p| p.to_external_plan())
}
}

impl DisplayAs for FilteredReadExec {
Expand Down
Loading
Loading