Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

// Load the deletes - should handle both types without error
Expand Down
62 changes: 59 additions & 3 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ impl DeleteFilter {
return Ok(None);
}

// TODO: handle case-insensitive case
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
let bound_predicate = combined_predicate
.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
Ok(Some(bound_predicate))
}

Expand Down Expand Up @@ -211,8 +211,9 @@ pub(crate) mod tests {

use super::*;
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::expr::Reference;
use crate::io::FileIO;
use crate::spec::{DataFileFormat, Schema};
use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type};

type ArrowSchemaRef = Arc<ArrowSchema>;

Expand Down Expand Up @@ -344,6 +345,7 @@ pub(crate) mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
FileScanTask {
start: 0,
Expand All @@ -358,6 +360,7 @@ pub(crate) mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
];

Expand All @@ -380,4 +383,57 @@ pub(crate) mod tests {
];
Arc::new(arrow_schema::Schema::new(fields))
}

#[tokio::test]
async fn test_build_equality_delete_predicate_case_sensitive() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(),
])
.build()
.unwrap(),
);

// ---------- fake FileScanTask ----------
let task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "data.parquet".to_string(),
data_file_format: crate::spec::DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
file_path: "eq-del.parquet".to_string(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: true,
};

let filter = DeleteFilter::default();

// ---------- insert equality delete predicate ----------
let pred = Reference::new("id").equal_to(Datum::long(10));

let (tx, rx) = tokio::sync::oneshot::channel();
filter.insert_equality_delete("eq-del.parquet", rx);

tx.send(pred).unwrap();

// ---------- should FAIL ----------
let result = filter.build_equality_delete_predicate(&task).await;

assert!(
result.is_err(),
"case_sensitive=true should fail when column case mismatches"
);
}
}
15 changes: 15 additions & 0 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2082,6 +2082,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2403,6 +2404,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

// Task 2: read the second and third row groups
Expand All @@ -2419,6 +2421,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2546,6 +2549,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2717,6 +2721,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2934,6 +2939,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -3144,6 +3150,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -3247,6 +3254,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3344,6 +3352,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3430,6 +3439,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3530,6 +3540,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3659,6 +3670,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3755,6 +3767,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3864,6 +3877,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -4003,6 +4017,7 @@ message schema {
partition: Some(partition_data),
partition_spec: Some(partition_spec),
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext {
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
case_sensitive: bool,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand All @@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext {
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub delete_file_index: DeleteFileIndex,
pub case_sensitive: bool,
}

impl ManifestFileContext {
Expand Down Expand Up @@ -89,6 +91,7 @@ impl ManifestFileContext {
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_index: delete_file_index.clone(),
case_sensitive: self.case_sensitive,
};

sender
Expand Down Expand Up @@ -135,6 +138,7 @@ impl ManifestEntryContext {
partition_spec: None,
// TODO: Extract name_mapping from table metadata property "schema.name-mapping.default"
name_mapping: None,
case_sensitive: self.case_sensitive,
})
}
}
Expand Down Expand Up @@ -277,6 +281,7 @@ impl PlanContext {
field_ids: self.field_ids.clone(),
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
case_sensitive: self.case_sensitive,
}
}
}
2 changes: 2 additions & 0 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,7 @@ pub mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
test_fn(task);

Expand All @@ -1902,6 +1903,7 @@ pub mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
test_fn(task);
}
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/scan/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pub struct FileScanTask {
#[serde(serialize_with = "serialize_not_implemented")]
#[serde(deserialize_with = "deserialize_not_implemented")]
pub name_mapping: Option<Arc<NameMapping>>,

/// Whether this scan task should treat column names as case-sensitive when binding predicates.
pub case_sensitive: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be set according TableScan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liurenjie1024 Thank you very much for reviewing the code! Yes, the case_sensitive flag should indeed be set according to the TableScan. It represents a query-level property that needs to flow down through the plan context and manifest contexts into each FileScanTask, so that DeleteFilter can correctly bind equality delete predicates with the intended case-sensitivity.

The overall call chain is:

TableScan → PlanContext → ManifestFileContext → ManifestEntryContext → FileScanTask

}

impl FileScanTask {
Expand Down
Loading