Skip to content

Commit 7ba3af0

Browse files
authored
feat!: Add support for sparse transform expressions (#1199)
## What changes are proposed in this pull request? Log replay needs to define per-file transforms for row of metadata that survives data skipping. Unfortunately, `Expression::Struct` is "dense" (mentions _every_ output field) and this produces excessive overhead when injecting the (usually very few) partition columns for tables with wide schemas (hundreds or thousands of columns). Column mapping tables are even worse, because they don't _change_ the columns at all -- they just need to apply the output schema to the input data. Solution: Define a new `Expression::Transform` that is a "sparse" representation of the _changes_ to be made to a given top-level schema or nested struct. Input columns can be dropped or replaced, and new output columns can be injected after an input column of choice (or prepended to the output schema). The engine's expression evaluator does the actual work to transfer unchanged input columns across while building the output `EngineData`. Update log replay to use the new transform capability, so that the cost is `O(partition_columns)` instead of `O(schema_width)`. For non-partitioned tables with column mapping mode enabled, this translates to an empty (identity) transform which the default engine expression evaluator has been updated to optimize as a special case (just `apply_schema` directly to the input and return). Result: Scan times are cut by nearly half in the `metadata_bench` benchmark: ``` Benchmarking scan_metadata/scan_metadata: Warming up for 3.0000 s Warning: Unable to complete 20 samples in 5.0s. You may wish to increase target time to 5.1s, or reduce sample count to 10. scan_metadata/scan_metadata time: [250.51 ms 253.72 ms 257.45 ms] change: [-45.173% -44.306% -43.415%] (p = 0.00 < 0.05) Performance has improved. Found 1 outliers among 20 measurements (5.00%) 1 (5.00%) high mild ``` ### This PR affects the following public APIs Added a new `Expression::Transform` enum variant. ## How was this change tested? New and existing unit tests, existing benchmarks.
1 parent a4724de commit 7ba3af0

File tree

12 files changed

+847
-111
lines changed

12 files changed

+847
-111
lines changed

ffi/src/expressions/engine_visitor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,11 @@ fn visit_expression_impl(
482482
Expression::Opaque(OpaqueExpression { op, exprs }) => {
483483
visit_expression_opaque(visitor, op, exprs, sibling_list_id)
484484
}
485+
Expression::Transform(_) => {
486+
// Minimal FFI support: Transform expressions are treated as unknown
487+
// TODO: Implement full Transform FFI support in future version
488+
visit_unknown(visitor, sibling_list_id, "Transform")
489+
}
485490
Expression::Unknown(name) => visit_unknown(visitor, sibling_list_id, name),
486491
}
487492
}

kernel/src/engine/arrow_expression/apply_schema.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,59 @@ pub(crate) fn apply_schema_to(array: &ArrayRef, schema: &DataType) -> DeltaResul
183183
};
184184
Ok(array)
185185
}
186+
187+
#[cfg(test)]
188+
mod apply_schema_validation_tests {
189+
use super::*;
190+
191+
use std::sync::Arc;
192+
193+
use crate::arrow::array::{Int32Array, StructArray};
194+
use crate::arrow::datatypes::{
195+
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
196+
};
197+
use crate::schema::{DataType, StructField, StructType};
198+
199+
#[test]
200+
fn test_apply_schema_basic_functionality() {
201+
// Test that apply_schema works for basic field transformation
202+
let input_array = create_test_struct_array_2_fields();
203+
let target_schema = create_target_schema_2_fields();
204+
205+
// This should succeed - basic schema application
206+
let result = apply_schema_to_struct(&input_array, &target_schema);
207+
assert!(result.is_ok(), "Basic schema application should succeed");
208+
209+
let result_array = result.unwrap();
210+
assert_eq!(
211+
result_array.len(),
212+
input_array.len(),
213+
"Row count should be preserved"
214+
);
215+
assert_eq!(result_array.num_columns(), 2, "Should have 2 columns");
216+
}
217+
218+
// Helper functions to create test data
219+
fn create_test_struct_array_2_fields() -> StructArray {
220+
let field1 = ArrowField::new("a", ArrowDataType::Int32, false);
221+
let field2 = ArrowField::new("b", ArrowDataType::Int32, false);
222+
let schema = ArrowSchema::new(vec![field1, field2]);
223+
224+
let a_data = Int32Array::from(vec![1, 2, 3]);
225+
let b_data = Int32Array::from(vec![4, 5, 6]);
226+
227+
StructArray::try_new(
228+
schema.fields.clone(),
229+
vec![Arc::new(a_data), Arc::new(b_data)],
230+
None,
231+
)
232+
.unwrap()
233+
}
234+
235+
fn create_target_schema_2_fields() -> StructType {
236+
StructType::new([
237+
StructField::new("a", DataType::INTEGER, false),
238+
StructField::new("b", DataType::INTEGER, false),
239+
])
240+
}
241+
}

0 commit comments

Comments
 (0)