1717
1818//! Partition value projection for Iceberg tables.
1919
20- use std:: sync:: { Arc , Mutex } ;
20+ use std:: sync:: Arc ;
2121
2222use datafusion:: arrow:: array:: { ArrayRef , RecordBatch , StructArray } ;
2323use datafusion:: arrow:: datatypes:: { DataType , Schema as ArrowSchema } ;
@@ -28,6 +28,7 @@ use datafusion::physical_expr::expressions::Column;
2828use datafusion:: physical_plan:: projection:: ProjectionExec ;
2929use datafusion:: physical_plan:: { ColumnarValue , ExecutionPlan } ;
3030use iceberg:: arrow:: record_batch_projector:: RecordBatchProjector ;
31+ use iceberg:: arrow:: schema_to_arrow_schema;
3132use iceberg:: spec:: { PartitionSpec , Schema } ;
3233use iceberg:: table:: Table ;
3334use iceberg:: transform:: BoxedTransformFunction ;
@@ -63,6 +64,10 @@ pub fn project_with_partition(
6364 }
6465
6566 let input_schema = input. schema ( ) ;
67+
68+ // Validate that input schema matches the table schema
69+ validate_schema_compatibility ( & input_schema, table_schema. as_ref ( ) ) ?;
70+
6671 let partition_type = build_partition_type ( partition_spec, table_schema. as_ref ( ) ) ?;
6772 let calculator = PartitionValueCalculator :: new (
6873 partition_spec. as_ref ( ) . clone ( ) ,
@@ -88,13 +93,13 @@ pub fn project_with_partition(
8893/// PhysicalExpr implementation for partition value calculation
8994#[ derive( Debug , Clone ) ]
9095struct PartitionExpr {
91- calculator : Arc < Mutex < PartitionValueCalculator > > ,
96+ calculator : Arc < PartitionValueCalculator > ,
9297}
9398
9499impl PartitionExpr {
95100 fn new ( calculator : PartitionValueCalculator ) -> Self {
96101 Self {
97- calculator : Arc :: new ( Mutex :: new ( calculator) ) ,
102+ calculator : Arc :: new ( calculator) ,
98103 }
99104 }
100105}
@@ -115,23 +120,15 @@ impl PhysicalExpr for PartitionExpr {
115120 }
116121
117122 fn data_type ( & self , _input_schema : & ArrowSchema ) -> DFResult < DataType > {
118- let calculator = self
119- . calculator
120- . lock ( )
121- . map_err ( |e| DataFusionError :: Internal ( format ! ( "Failed to lock calculator: {}" , e) ) ) ?;
122- Ok ( calculator. partition_type . clone ( ) )
123+ Ok ( self . calculator . partition_type . clone ( ) )
123124 }
124125
125126 fn nullable ( & self , _input_schema : & ArrowSchema ) -> DFResult < bool > {
126127 Ok ( false )
127128 }
128129
129130 fn evaluate ( & self , batch : & RecordBatch ) -> DFResult < ColumnarValue > {
130- let calculator = self
131- . calculator
132- . lock ( )
133- . map_err ( |e| DataFusionError :: Internal ( format ! ( "Failed to lock calculator: {}" , e) ) ) ?;
134- let array = calculator. calculate ( batch) ?;
131+ let array = self . calculator . calculate ( batch) ?;
135132 Ok ( ColumnarValue :: Array ( array) )
136133 }
137134
@@ -147,39 +144,34 @@ impl PhysicalExpr for PartitionExpr {
147144 }
148145
149146 fn fmt_sql ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
150- if let Ok ( calculator) = self . calculator . lock ( ) {
151- let field_names: Vec < String > = calculator
152- . partition_spec
153- . fields ( )
154- . iter ( )
155- . map ( |pf| format ! ( "{}({})" , pf. transform, pf. name) )
156- . collect ( ) ;
157- write ! ( f, "iceberg_partition_values[{}]" , field_names. join( ", " ) )
158- } else {
159- write ! ( f, "iceberg_partition_values" )
160- }
147+ let field_names: Vec < String > = self
148+ . calculator
149+ . partition_spec
150+ . fields ( )
151+ . iter ( )
152+ . map ( |pf| format ! ( "{}({})" , pf. transform, pf. name) )
153+ . collect ( ) ;
154+ write ! ( f, "iceberg_partition_values[{}]" , field_names. join( ", " ) )
161155 }
162156}
163157
164158impl std:: fmt:: Display for PartitionExpr {
165159 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
166- if let Ok ( calculator) = self . calculator . lock ( ) {
167- let field_names: Vec < & str > = calculator
168- . partition_spec
169- . fields ( )
170- . iter ( )
171- . map ( |pf| pf. name . as_str ( ) )
172- . collect ( ) ;
173- write ! ( f, "iceberg_partition_values({})" , field_names. join( ", " ) )
174- } else {
175- write ! ( f, "iceberg_partition_values" )
176- }
160+ let field_names: Vec < & str > = self
161+ . calculator
162+ . partition_spec
163+ . fields ( )
164+ . iter ( )
165+ . map ( |pf| pf. name . as_str ( ) )
166+ . collect ( ) ;
167+ write ! ( f, "iceberg_partition_values({})" , field_names. join( ", " ) )
177168 }
178169}
179170
180171impl std:: hash:: Hash for PartitionExpr {
181172 fn hash < H : std:: hash:: Hasher > ( & self , state : & mut H ) {
182- std:: any:: TypeId :: of :: < Self > ( ) . hash ( state) ;
173+ // Two PartitionExpr are equal if they share the same calculator Arc
174+ Arc :: as_ptr ( & self . calculator ) . hash ( state) ;
183175 }
184176}
185177
@@ -264,6 +256,46 @@ impl PartitionValueCalculator {
264256 }
265257}
266258
259+ /// Validates that the input Arrow schema is compatible with the Iceberg table schema.
260+ ///
261+ /// This ensures that:
262+ /// - All fields in the input schema have matching names in the table schema
263+ /// - The Arrow data types are compatible with the corresponding Iceberg types
264+ fn validate_schema_compatibility (
265+ arrow_schema : & ArrowSchema ,
266+ table_schema : & Schema ,
267+ ) -> DFResult < ( ) > {
268+ // Convert Iceberg schema to Arrow schema for comparison
269+ let expected_arrow_schema =
270+ schema_to_arrow_schema ( table_schema) . map_err ( to_datafusion_error) ?;
271+
272+ // Check that all fields in the input schema exist in the table schema with compatible types
273+ for arrow_field in arrow_schema. fields ( ) {
274+ let field_name = arrow_field. name ( ) ;
275+
276+ let expected_field = expected_arrow_schema
277+ . field_with_name ( field_name)
278+ . map_err ( |_| {
279+ DataFusionError :: Internal ( format ! (
280+ "Input schema field '{}' not found in Iceberg table schema" ,
281+ field_name
282+ ) )
283+ } ) ?;
284+
285+ // Compare data types (metadata like field_id can differ)
286+ if arrow_field. data_type ( ) != expected_field. data_type ( ) {
287+ return Err ( DataFusionError :: Internal ( format ! (
288+ "Input schema field '{}' has type {:?}, but Iceberg table schema expects {:?}" ,
289+ field_name,
290+ arrow_field. data_type( ) ,
291+ expected_field. data_type( )
292+ ) ) ) ;
293+ }
294+ }
295+
296+ Ok ( ( ) )
297+ }
298+
267299fn build_partition_type (
268300 partition_spec : & PartitionSpec ,
269301 table_schema : & Schema ,
@@ -496,4 +528,30 @@ mod tests {
496528 assert_eq ! ( city_partition. value( 0 ) , "New York" ) ;
497529 assert_eq ! ( city_partition. value( 1 ) , "Los Angeles" ) ;
498530 }
531+
532+ #[ test]
533+ fn test_validate_schema_compatibility ( ) {
534+ let table_schema = Schema :: builder ( )
535+ . with_schema_id ( 0 )
536+ . with_fields ( vec ! [
537+ NestedField :: required( 1 , "id" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
538+ NestedField :: required( 2 , "name" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
539+ ] )
540+ . build ( )
541+ . unwrap ( ) ;
542+
543+ let valid_arrow_schema = ArrowSchema :: new ( vec ! [
544+ Field :: new( "id" , DataType :: Int32 , false ) ,
545+ Field :: new( "name" , DataType :: Utf8 , false ) ,
546+ ] ) ;
547+ assert ! ( super :: validate_schema_compatibility( & valid_arrow_schema, & table_schema) . is_ok( ) ) ;
548+
549+ let invalid_arrow_schema = ArrowSchema :: new ( vec ! [
550+ Field :: new( "id" , DataType :: Int32 , false ) ,
551+ Field :: new( "unknown_field" , DataType :: Int32 , false ) ,
552+ ] ) ;
553+ assert ! (
554+ super :: validate_schema_compatibility( & invalid_arrow_schema, & table_schema) . is_err( )
555+ ) ;
556+ }
499557}
0 commit comments