From 30ffdf9d682b500c5eff5ecbde323a31b1006a3e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 14 Dec 2025 15:40:18 +0800 Subject: [PATCH 1/3] Previously, equality delete predicates in DeleteFilter were case-insensitive. This change adds optional case-sensitive matching when binding equality delete predicates. - Updated build_equality_delete_predicate to handle case-sensitive option. - Added unit tests for case-sensitive equality deletes. --- .../src/arrow/caching_delete_file_loader.rs | 1 + crates/iceberg/src/arrow/delete_filter.rs | 74 ++++++++++++++++++- crates/iceberg/src/arrow/reader.rs | 15 ++++ crates/iceberg/src/scan/context.rs | 5 ++ crates/iceberg/src/scan/mod.rs | 2 + crates/iceberg/src/scan/task.rs | 3 + 6 files changed, 98 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 250fc5e8d9..aceeae49f7 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -911,6 +911,7 @@ mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; // Load the deletes - should handle both types without error diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 14b5124ee6..8c365b6d93 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -141,8 +141,7 @@ impl DeleteFilter { return Ok(None); } - // TODO: handle case-insensitive case - let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; Ok(Some(bound_predicate)) } @@ -344,6 +343,7 @@ pub(crate) mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }, FileScanTask { start: 0, @@ -358,6 +358,7 @@ pub(crate) mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }, ]; @@ -380,4 +381,73 @@ pub(crate) mod tests { ]; Arc::new(arrow_schema::Schema::new(fields)) } + + #[cfg(test)] + mod tests { + use super::*; + use std::sync::Arc; + use crate::expr::{Reference}; + use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; + use crate::spec::{NestedField, PrimitiveType, Type}; + use crate::spec::{DataContentType, Datum, Schema}; + + #[tokio::test] + async fn test_build_equality_delete_predicate_case_sensitive() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "Id", + Type::Primitive(PrimitiveType::Long), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // ---------- fake FileScanTask ---------- + let task = FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "data.parquet".to_string(), + data_file_format: crate::spec::DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: "eq-del.parquet".to_string(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + }; + + let filter = DeleteFilter::default(); + + // ---------- insert equality delete predicate ---------- + let pred = Reference::new("id") + .equal_to(Datum::long(10)); + + let (tx, rx) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del.parquet", rx); + + tx.send(pred).unwrap(); + + // ---------- should FAIL ---------- + let result = filter.build_equality_delete_predicate(&task).await; + + assert!( + result.is_err(), + "case_sensitive=true should fail when column case mismatches" + ); + } + } } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6209c1e261..5ddbaec82c 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2082,6 +2082,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -2403,6 +2404,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false }; // Task 2: read the second and third row groups @@ -2419,6 +2421,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2546,6 +2549,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false })] .into_iter(), )) as FileScanTaskStream; @@ -2717,6 +2721,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2934,6 +2939,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3144,6 +3150,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3247,6 +3254,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3344,6 +3352,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3430,6 +3439,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false })] .into_iter(), )) as FileScanTaskStream; @@ -3530,6 +3540,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false })] .into_iter(), )) as FileScanTaskStream; @@ -3659,6 +3670,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false })] .into_iter(), )) as FileScanTaskStream; @@ -3755,6 +3767,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3864,6 +3877,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false })] .into_iter(), )) as FileScanTaskStream; @@ -4003,6 +4017,7 @@ message schema { partition: Some(partition_data), partition_spec: Some(partition_spec), name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index fe3f5c8f7e..da7fb46312 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext { snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, + case_sensitive: bool, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext { pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, + pub case_sensitive: bool, } impl ManifestFileContext { @@ -89,6 +91,7 @@ impl ManifestFileContext { bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), delete_file_index: delete_file_index.clone(), + case_sensitive: self.case_sensitive, }; sender @@ -135,6 +138,7 @@ impl ManifestEntryContext { partition_spec: None, // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" name_mapping: None, + case_sensitive: self.case_sensitive, }) } } @@ -267,6 +271,7 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, + case_sensitive: self.case_sensitive, } } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index d83da8a879..9ec61dcd7d 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1794,6 +1794,7 @@ pub mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; test_fn(task); @@ -1811,6 +1812,7 @@ pub mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; test_fn(task); } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index e1ef241a57..5349a9bdd2 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -104,6 +104,9 @@ pub struct FileScanTask { #[serde(serialize_with = "serialize_not_implemented")] #[serde(deserialize_with = "deserialize_not_implemented")] pub name_mapping: Option>, + + /// Whether this scan task should treat column names as case-sensitive when binding predicates. + pub case_sensitive: bool, } impl FileScanTask { From a15289f3d01d70268534b870fe0bcca79a50beed Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 14 Dec 2025 15:43:57 +0800 Subject: [PATCH 2/3] Fix CheckStyle Issue. --- crates/iceberg/src/arrow/delete_filter.rs | 21 ++++++++------------- crates/iceberg/src/arrow/reader.rs | 18 +++++++++--------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 8c365b6d93..2dc6bd4b5a 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -141,7 +141,8 @@ impl DeleteFilter { return Ok(None); } - let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; + let bound_predicate = combined_predicate + .bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; Ok(Some(bound_predicate)) } @@ -384,12 +385,12 @@ pub(crate) mod tests { #[cfg(test)] mod tests { - use super::*; use std::sync::Arc; - use crate::expr::{Reference}; + + use super::*; + use crate::expr::Reference; use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; - use crate::spec::{NestedField, PrimitiveType, Type}; - use crate::spec::{DataContentType, Datum, Schema}; + use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type}; #[tokio::test] async fn test_build_equality_delete_predicate_case_sensitive() { @@ -397,12 +398,7 @@ pub(crate) mod tests { Schema::builder() .with_schema_id(1) .with_fields(vec![ - NestedField::required( - 1, - "Id", - Type::Primitive(PrimitiveType::Long), - ) - .into(), + NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(), ]) .build() .unwrap(), @@ -433,8 +429,7 @@ pub(crate) mod tests { let filter = DeleteFilter::default(); // ---------- insert equality delete predicate ---------- - let pred = Reference::new("id") - .equal_to(Datum::long(10)); + let pred = Reference::new("id").equal_to(Datum::long(10)); let (tx, rx) = tokio::sync::oneshot::channel(); filter.insert_equality_delete("eq-del.parquet", rx); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5ddbaec82c..f7f90663a5 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2404,7 +2404,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, }; // Task 2: read the second and third row groups @@ -2421,7 +2421,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2549,7 +2549,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -2721,7 +2721,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2939,7 +2939,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3439,7 +3439,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3540,7 +3540,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3670,7 +3670,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3877,7 +3877,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, - case_sensitive: false + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; From e711f8b4a30f5fd0092e67848dfa08dd79b68d11 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 14 Dec 2025 17:20:04 +0800 Subject: [PATCH 3/3] Fix CheckStyle Issue. --- crates/iceberg/src/arrow/delete_filter.rs | 103 ++++++++++------------ 1 file changed, 47 insertions(+), 56 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 2dc6bd4b5a..d05e028997 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -211,8 +211,9 @@ pub(crate) mod tests { use super::*; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; + use crate::expr::Reference; use crate::io::FileIO; - use crate::spec::{DataFileFormat, Schema}; + use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; type ArrowSchemaRef = Arc; @@ -383,66 +384,56 @@ pub(crate) mod tests { Arc::new(arrow_schema::Schema::new(fields)) } - #[cfg(test)] - mod tests { - use std::sync::Arc; - - use super::*; - use crate::expr::Reference; - use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; - use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type}; - - #[tokio::test] - async fn test_build_equality_delete_predicate_case_sensitive() { - let schema = Arc::new( - Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(), - ]) - .build() - .unwrap(), - ); - - // ---------- fake FileScanTask ---------- - let task = FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: "data.parquet".to_string(), - data_file_format: crate::spec::DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![], - predicate: None, - deletes: vec![FileScanTaskDeleteFile { - file_path: "eq-del.parquet".to_string(), - file_type: DataContentType::EqualityDeletes, - partition_spec_id: 0, - equality_ids: None, - }], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: true, - }; + #[tokio::test] + async fn test_build_equality_delete_predicate_case_sensitive() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(), + ); + + // ---------- fake FileScanTask ---------- + let task = FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "data.parquet".to_string(), + data_file_format: crate::spec::DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: "eq-del.parquet".to_string(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + }; - let filter = DeleteFilter::default(); + let filter = DeleteFilter::default(); - // ---------- insert equality delete predicate ---------- - let pred = Reference::new("id").equal_to(Datum::long(10)); + // ---------- insert equality delete predicate ---------- + let pred = Reference::new("id").equal_to(Datum::long(10)); - let (tx, rx) = tokio::sync::oneshot::channel(); - filter.insert_equality_delete("eq-del.parquet", rx); + let (tx, rx) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del.parquet", rx); - tx.send(pred).unwrap(); + tx.send(pred).unwrap(); - // ---------- should FAIL ---------- - let result = filter.build_equality_delete_predicate(&task).await; + // ---------- should FAIL ---------- + let result = filter.build_equality_delete_predicate(&task).await; - assert!( - result.is_err(), - "case_sensitive=true should fail when column case mismatches" - ); - } + assert!( + result.is_err(), + "case_sensitive=true should fail when column case mismatches" + ); } }