1616// under the License.
1717
1818use std:: collections:: HashSet ;
19+ use std:: num:: NonZeroUsize ;
1920use std:: sync:: Arc ;
2021
2122use datafusion:: error:: Result as DFResult ;
2223use datafusion:: physical_plan:: expressions:: Column ;
2324use datafusion:: physical_plan:: repartition:: RepartitionExec ;
2425use datafusion:: physical_plan:: { ExecutionPlan , Partitioning } ;
25- use iceberg:: spec:: { SchemaRef , TableMetadata , TableMetadataRef , Transform } ;
26+ use iceberg:: spec:: { TableMetadata , TableMetadataRef , Transform } ;
2627
2728/// Creates an Iceberg-aware repartition execution plan that optimizes data distribution
2829/// for parallel processing while respecting Iceberg table partitioning semantics.
@@ -57,8 +58,7 @@ use iceberg::spec::{SchemaRef, TableMetadata, TableMetadataRef, Transform};
5758///
5859/// # Arguments
5960///
60- /// * `input` - The input execution plan providing data to be repartitioned
61- /// * `table_schema` - The Iceberg table schema used to resolve column references
61+ /// * `input` - The input execution plan providing data to be repartitioned (should already be projected to match table schema)
6262/// * `table_metadata` - The Iceberg table metadata containing partition spec and sort order
6363/// * `target_partitions` - Target number of partitions for parallel processing (must be > 0)
6464///
@@ -72,25 +72,17 @@ use iceberg::spec::{SchemaRef, TableMetadata, TableMetadataRef, Transform};
7272/// ```ignore
7373/// let repartitioned_plan = repartition(
7474/// input_plan,
75- /// table.schema_ref(),
7675/// table.metadata_ref(),
7776/// 4, // Explicit partition count
7877/// )?;
7978/// ```
80- pub fn repartition (
79+ pub ( crate ) fn repartition (
8180 input : Arc < dyn ExecutionPlan > ,
82- table_schema : SchemaRef ,
8381 table_metadata : TableMetadataRef ,
84- target_partitions : usize ,
82+ target_partitions : NonZeroUsize ,
8583) -> DFResult < Arc < dyn ExecutionPlan > > {
86- if target_partitions == 0 {
87- return Err ( datafusion:: error:: DataFusionError :: Plan (
88- "repartition requires target_partitions > 0" . to_string ( ) ,
89- ) ) ;
90- }
91-
9284 let partitioning_strategy =
93- determine_partitioning_strategy ( & input, & table_schema , & table_metadata, target_partitions) ?;
85+ determine_partitioning_strategy ( & input, & table_metadata, target_partitions) ?;
9486
9587 if !needs_repartitioning ( & input, & partitioning_strategy) {
9688 return Ok ( input) ;
@@ -166,12 +158,11 @@ fn same_columns(
166158/// falls back to round-robin batch partitioning for even load distribution.
167159fn determine_partitioning_strategy (
168160 input : & Arc < dyn ExecutionPlan > ,
169- table_schema : & SchemaRef ,
170161 table_metadata : & TableMetadata ,
171- target_partitions : usize ,
162+ target_partitions : NonZeroUsize ,
172163) -> DFResult < Partitioning > {
173164 let partition_spec = table_metadata. default_partition_spec ( ) ;
174- let sort_order = table_metadata. default_sort_order ( ) ;
165+ let table_schema = table_metadata. current_schema ( ) ;
175166
176167 let names_iter: Box < dyn Iterator < Item = & str > > = {
177168 // Partition identity columns
@@ -194,40 +185,35 @@ fn determine_partitioning_strategy(
194185 None
195186 }
196187 } ) ;
197- // Bucket columns from sort order
198- let bucket_names_sort = sort_order. fields . iter ( ) . filter_map ( |sf| {
199- if let Transform :: Bucket ( _) = sf. transform {
200- table_schema
201- . field_by_id ( sf. source_id )
202- . map ( |field| field. name . as_str ( ) )
203- } else {
204- None
205- }
206- } ) ;
207- Box :: new ( part_names. chain ( bucket_names_part) . chain ( bucket_names_sort) )
188+ Box :: new ( part_names. chain ( bucket_names_part) )
208189 } ;
209190
210191 // Order: partitions first, then buckets
211192 // Deduplicate while preserving order
212193 let input_schema = input. schema ( ) ;
213- let mut seen: HashSet < & str > = HashSet :: new ( ) ;
194+ let mut seen = HashSet :: new ( ) ;
214195 let hash_exprs: Vec < Arc < dyn datafusion:: physical_expr:: PhysicalExpr > > = names_iter
215196 . filter ( |name| seen. insert ( * name) )
216197 . map ( |name| {
217198 let idx = input_schema
218199 . index_of ( name)
219- . map_err ( |e| datafusion:: error:: DataFusionError :: Plan ( e. to_string ( ) ) ) ?;
200+ . map_err ( |e| {
201+ datafusion:: error:: DataFusionError :: Plan ( format ! (
202+ "Column '{}' not found in input schema. Ensure projection happens before repartitioning. Error: {}" ,
203+ name, e
204+ ) )
205+ } ) ?;
220206 Ok ( Arc :: new ( Column :: new ( name, idx) )
221207 as Arc < dyn datafusion:: physical_expr:: PhysicalExpr > )
222208 } )
223209 . collect :: < DFResult < _ > > ( ) ?;
224210
225211 if !hash_exprs. is_empty ( ) {
226- return Ok ( Partitioning :: Hash ( hash_exprs, target_partitions) ) ;
212+ return Ok ( Partitioning :: Hash ( hash_exprs, target_partitions. get ( ) ) ) ;
227213 }
228214
229215 // Fallback to round-robin for unpartitioned, non-bucketed tables, and range-only partitions
230- Ok ( Partitioning :: RoundRobinBatch ( target_partitions) )
216+ Ok ( Partitioning :: RoundRobinBatch ( target_partitions. get ( ) ) )
231217}
232218
233219#[ cfg( test) ]
0 commit comments