Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 99 additions & 0 deletions protos/filtered_read.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

syntax = "proto3";

package lance.datafusion;

import "table_identifier.proto";

message U64Range {
uint64 start = 1;
uint64 end = 2;
}

message ProjectionProto {
repeated int32 field_ids = 1;
bool with_row_id = 2;
bool with_row_addr = 3;
bool with_row_last_updated_at_version = 4;
bool with_row_created_at_version = 5;
BlobHandlingProto blob_handling = 6;
}

message BlobHandlingProto {
oneof mode {
// All blobs read as binary
bool all_binary = 1;
// Blobs as descriptions, other binary as binary (default)
bool blobs_descriptions = 2;
// All binary columns as descriptions
bool all_descriptions = 3;
// Specific blobs read as binary, rest as descriptions (non-blob binary stays binary)
FieldIdSet some_blobs_binary = 4;
// Specific columns as binary, all other binary as descriptions
FieldIdSet some_binary = 5;
}
}

message FieldIdSet {
repeated uint32 field_ids = 1;
}

message FilteredReadThreadingModeProto {
oneof mode {
uint64 one_partition_multiple_threads = 1;
uint64 multiple_partitions = 2;
}
}

// Serializable form of FilteredReadOptions.
message FilteredReadOptionsProto {
optional U64Range scan_range_before_filter = 1;
optional U64Range scan_range_after_filter = 2;
bool with_deleted_rows = 3;
optional uint32 batch_size = 4;
optional uint64 fragment_readahead = 5;
repeated uint64 fragment_ids = 6;
ProjectionProto projection = 7;
optional bytes refine_filter_substrait = 8;
optional bytes full_filter_substrait = 9;
FilteredReadThreadingModeProto threading_mode = 10;
optional uint64 io_buffer_size_bytes = 11;
// Arrow IPC schema for decoding Substrait filters (may be wider than projection).
optional bytes filter_schema_ipc = 12;
}

// Serializable form of FilteredReadPlan (planned/distributed mode).
// RowAddrTreeMap serialized via its built-in serialize_into/deserialize_from.
// Per-fragment filters are Substrait-encoded and deduplicated.
message FilteredReadPlanProto {
bytes row_addr_tree_map = 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR does it make sense to make this a oneof over bytes and repeated U64Range? Then we could just use the smaller representation on serialization (easy computation).

In my plan_splits PR I merged the FilteredReadPlan and FilteredReadIntenralPlan because IIUC the only reason we have differentiation is that serializing a bitmap is smaller than ranges. I would challenge that under the intuition (or maybe "hope") that we have many dense splits where the seriaization is actually smaller for ranges (ex. all 4k values a range is 16 bytes (u64 start, u64 end) whereas a bitmap is 4k / 8 = 500 bytes). If this is the case, then we would do a series of inefficient serializations to move to bitmap for every query.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that having FilteredReadPlan as a outside plan is mainly not for serialization / deserialization. I keep FilteredReadPlan with external plan using RoaringBitmap for the following reasons

  • Range is ambiguous in Lance. Most user-facing APIs use logical rows (without deletions). Vec<Range> here means physical rows (with deletions). This is confusing. RowAddrTreeMap/RoaringBitmap avoids that ambiguity because it's clearly a set of physical row addresses, not a "range" that could be misread as logical.
  • Ranges fragment badly with deletions. A dataset with scattered deletions turns a single range 0..4096 into many small ranges. The Vec<Range> grows proportionally to the number of deletion holes. RoaringBitmap handles this natively with compressed containers.
  • Bitmap enables efficient row unselection. Whether it's 4K row group pruning or custom index results, you need to remove specific rows from the selection. Flipping bits is O(1); splitting ranges is O(n) and produces more ranges.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't have a good sense for where the trade-offs are for the two approaches but it sounds like you've both thought about it. My preference is to move forward with what we've got written and revisit if serialization time seems to be a bottleneck?

optional U64Range scan_range_after_filter = 2;
// Arrow IPC schema for decoding Substrait filters (matches the schema used at encode time).
optional bytes filter_schema_ipc = 3;
// Per-fragment filter mapping. Key is fragment id, value is a list index into
// filter_expressions. Multiple fragments can share the same list index when
// they have the same filter, avoiding duplicate Substrait encoding.
map<uint32, uint32> fragment_filter_ids = 4;
// Deduplicated Substrait-encoded filter expressions. Each entry is referenced
// by one or more values in fragment_filter_ids.
repeated bytes filter_expressions = 5;
}

// Top-level wrapper for FilteredReadExec serialization.
message FilteredReadExecProto {
TableIdentifier table = 1;
FilteredReadOptionsProto options = 2;
// FilteredRead has two modes
// Plan-then-execute (distributed): The planner creates a FilteredReadPlan and sends it to a remote executor.
// Plan-and-execute (local): The executor creates the plan itself at execution time.
optional FilteredReadPlanProto plan = 3;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is optional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilteredReadExec can run in two modes:

  1. Plan-then-execute (distributed): The planner creates a FilteredReadPlan and sends it to a remote executor. plan is Some.
  2. Plan-and-execute (local): The executor creates the plan itself at execution time. No pre-computed plan is needed. plan is None. I can only think of the case that you know if one executor has good locality for a dataset, you probably send the whole plan to let it execute?

// Note: FilteredReadExec.index_input (child ExecutionPlan) is NOT serialized here.
// DataFusion's PhysicalExtensionCodec handles child plans automatically: it walks
// the plan tree via children() / with_new_children(), serializes each node, and
// passes deserialized children back as the `inputs` parameter in try_decode.
// This means any ExecutionPlan in the tree (including index_input) must also
// implement try_encode/try_decode in the PhysicalExtensionCodec.
// TODO: implement serialize/deserialize for lance-specific index input ExecutionPlans.
}
18 changes: 18 additions & 0 deletions protos/table_identifier.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

syntax = "proto3";

package lance.datafusion;

// Identifies a Lance dataset for remote reconstruction.
//
// Two modes:
// 1. uri + serialized_manifest (fast): remote executor skips manifest read.
// 2. uri + version + etag (lightweight): remote executor loads manifest from storage.
message TableIdentifier {
string uri = 1;
uint64 version = 2;
optional string manifest_etag = 3;
optional bytes serialized_manifest = 4;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 do you think we need to serialize the manifest_etag and even send the serialized manifest itself?

}
1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ impl Projectable for Schema {
}

/// Specifies how to handle blob columns when projecting
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq)]
pub enum BlobHandling {
/// Read all blobs as binary
AllBinary,
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ snafu.workspace = true
tokio.workspace = true
tracing.workspace = true

[build-dependencies]
prost-build.workspace = true
protobuf-src = {version = "2.1", optional = true}

[dev-dependencies]
lance-datagen.workspace = true

[features]
substrait = ["dep:datafusion-substrait"]
protoc = ["dep:protobuf-src"]

[lints]
workspace = true
25 changes: 25 additions & 0 deletions rust/lance-datafusion/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::io::Result;

fn main() -> Result<()> {
println!("cargo:rerun-if-changed=protos");

#[cfg(feature = "protoc")]
// Use vendored protobuf compiler if requested.
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut prost_build = prost_build::Config::new();
prost_build.protoc_arg("--experimental_allow_proto3_optional");
prost_build.enable_type_names();
prost_build.compile_protos(
&[
"./protos/table_identifier.proto",
"./protos/filtered_read.proto",
],
&["./protos"],
)?;

Ok(())
}
1 change: 1 addition & 0 deletions rust/lance-datafusion/protos
11 changes: 11 additions & 0 deletions rust/lance-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ pub mod expr;
pub mod logical_expr;
pub mod planner;
pub mod projection;
pub mod pb {
#![allow(clippy::all)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(unused)]
#![allow(improper_ctypes)]
#![allow(clippy::upper_case_acronyms)]
#![allow(clippy::use_self)]
include!(concat!(env!("OUT_DIR"), "/lance.datafusion.rs"));
}
pub mod spill;
pub mod sql;
#[cfg(feature = "substrait")]
Expand Down
67 changes: 66 additions & 1 deletion rust/lance-datafusion/src/substrait.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_schema::Schema as ArrowSchema;
use arrow_schema::{DataType, Schema as ArrowSchema};
use datafusion::{execution::SessionState, logical_expr::Expr};

use crate::aggregate::Aggregate;
Expand All @@ -27,6 +27,33 @@ use snafu::location;
use std::collections::HashMap;
use std::sync::Arc;

/// FixedSizeList has no Substrait producer support in datafusion-substrait.
/// Other unsupported types (Null, Float16) are encoded as UserDefined and
/// handled by `remove_extension_types` on the decode side.
fn is_substrait_compatible(data_type: &DataType) -> bool {
match data_type {
DataType::FixedSizeList(_, _) => false,
DataType::List(inner) => is_substrait_compatible(inner.data_type()),
DataType::Struct(fields) => fields
.iter()
.all(|f| is_substrait_compatible(f.data_type())),
_ => true,
}
}

/// Removes top-level fields that contain data types that the Substrait
/// producer cannot encode (currently only FixedSizeList).
pub fn prune_schema_for_substrait(schema: &ArrowSchema) -> ArrowSchema {
ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| is_substrait_compatible(f.data_type()))
.cloned()
.collect::<Vec<_>>(),
)
}

/// Convert a DF Expr into a Substrait ExtendedExpressions message
///
/// The schema needs to contain all of the fields that are referenced in the expression.
Expand Down Expand Up @@ -824,6 +851,44 @@ mod tests {
assert_substrait_roundtrip(schema, id_filter("test-id")).await;
}

#[tokio::test]
async fn test_substrait_roundtrip_with_null_and_float16_columns() {
// Float16 and Null are encoded as UserDefined types in Substrait.
// The decode side (remove_extension_types) strips them and remaps
// field references, so filters on other columns still work.
let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("embedding", DataType::Float16, true),
Field::new("empty", DataType::Null, true),
Field::new("name", DataType::Utf8, true),
]);

assert_substrait_roundtrip(schema, id_filter("test-id")).await;
}

#[tokio::test]
async fn test_substrait_roundtrip_with_fixed_size_list_column() {
// FixedSizeList has no Substrait producer support, so it must be
// pruned from the schema before encoding. Verify that a schema with
// FSL columns works when the filter references a different column.
use crate::substrait::prune_schema_for_substrait;

let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new(
"vector",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 128),
true,
),
Field::new("name", DataType::Utf8, true),
]);

// Encoding with the full schema would fail, but pruning removes the FSL column
let pruned = prune_schema_for_substrait(&schema);
assert_eq!(pruned.fields().len(), 2); // id and name only
assert_substrait_roundtrip(pruned, id_filter("test-id")).await;
}

// ==================== Aggregate parsing tests ====================

use datafusion_substrait::substrait::proto::{
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/io/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

mod filter;
pub mod filtered_read;
#[cfg(feature = "substrait")]
pub mod filtered_read_proto;
pub mod fts;
pub(crate) mod knn;
mod optimizer;
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/src/io/exec/filtered_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,11 @@ impl FilteredReadExec {
pub fn index_input(&self) -> Option<&Arc<dyn ExecutionPlan>> {
self.index_input.as_ref()
}

/// Return the pre-computed plan if one exists, without triggering initialization.
pub fn plan(&self) -> Option<FilteredReadPlan> {
self.plan.get().map(|p| p.to_external_plan())
}
}

impl DisplayAs for FilteredReadExec {
Expand Down
Loading
Loading