1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use std:: collections:: HashSet ;
1918use std:: num:: NonZeroUsize ;
2019use std:: sync:: Arc ;
2120
@@ -25,9 +24,10 @@ use datafusion::physical_plan::expressions::Column;
2524use datafusion:: physical_plan:: repartition:: RepartitionExec ;
2625use datafusion:: physical_plan:: { ExecutionPlan , Partitioning } ;
2726use iceberg:: spec:: { TableMetadata , TableMetadataRef , Transform } ;
28- use tracing;
2927
30- use crate :: physical_plan:: project:: PARTITION_VALUES_COLUMN ;
28+ /// Column name for the partition values struct column
29+ /// This must match the constant from iceberg::arrow after merging with main
30+ const PROJECTED_PARTITION_VALUE_COLUMN : & str = "_partition" ;
3131
3232/// Creates an Iceberg-aware repartition execution plan that optimizes data distribution
3333/// for parallel processing while respecting Iceberg table partitioning semantics.
@@ -37,40 +37,57 @@ use crate::physical_plan::project::PARTITION_VALUES_COLUMN;
3737///
3838/// ## Partitioning Strategies
3939///
40- /// - **Unpartitioned tables** – Uses round-robin distribution to balance load across workers.
41- /// - **Hash partitioning** – Applied for tables with identity or bucket transforms, ensuring
42- /// co-location of related data for efficient file clustering. This is used when the `_partition`
43- /// column is present AND the partition spec has hash-friendly transforms (Identity/Bucket),
44- /// or when source columns with these transforms are available.
45- /// - **Round-robin partitioning** – Used for temporal transforms (Year, Month, Day, Hour),
46- /// Truncate, or other transforms that don't provide good hash distribution.
47- /// - **Mixed transforms** – Combines multiple transform types, using hash partitioning only
48- /// when Identity or Bucket transforms are present, otherwise falling back to round-robin.
40+ /// - **Partitioned tables with Identity/Bucket transforms** – Uses hash partitioning on the
41+ /// `_partition` column for optimal data distribution and file clustering. Ensures that rows
42+ /// with the same partition values are co-located in the same task.
43+ ///
44+ /// - **Partitioned tables with temporal transforms** – Uses round-robin partitioning for
45+ /// temporal transforms (Year, Month, Day, Hour) that don't provide uniform hash distribution.
46+ ///
47+ /// - **Unpartitioned tables** – Uses round-robin distribution to balance load evenly across workers.
48+ ///
49+ /// ## Requirements
50+ ///
51+ /// - **For partitioned tables**: The input MUST include the `_partition` column.
52+ /// Add it by calling [`project_with_partition`](crate::physical_plan::project_with_partition) before [`repartition`].
53+ /// - **For unpartitioned tables**: No special preparation needed.
54+ /// - Returns an error if a partitioned table is missing the `_partition` column.
4955///
5056/// ## Performance Notes
5157///
52- /// - Only repartitions when the input scheme or partition count differs from the target.
58+ /// - Only adds repartitioning when the input partitioning differs from the target.
5359/// - Requires an explicit target partition count for deterministic behavior.
54- /// - Preserves column order (partitions first, then buckets) for consistent layout.
5560///
5661/// # Arguments
5762///
58- /// * `input` – The input execution plan providing data to be repartitioned . For partitioned tables,
59- /// the input should include the `_partition` column (added via `project_with_partition`).
60- /// * `table_metadata` – Iceberg table metadata containing partition spec.
61- /// * `target_partitions` – Target number of partitions for parallel processing (must be > 0).
63+ /// * `input` - The input [`ExecutionPlan`] . For partitioned tables, must include the `_partition`
64+ /// column (added via [ `project_with_partition`](crate::physical_plan::project_with_partition) ).
65+ /// * `table_metadata` - Iceberg table metadata containing partition spec.
66+ /// * `target_partitions` - Target number of partitions for parallel processing (must be > 0).
6267///
6368/// # Returns
6469///
65- /// An execution plan that applies the optimal partitioning strategy, or the original input plan if no repartitioning is needed.
70+ /// An [`ExecutionPlan`] that applies the optimal partitioning strategy, or the original input plan
71+ /// if repartitioning is not needed.
72+ ///
73+ /// # Errors
74+ ///
75+ /// Returns [`DataFusionError::Plan`] if a partitioned table input is missing the `_partition` column.
6676///
67- /// # Example
77+ /// # Examples
78+ ///
79+ /// For partitioned tables, first add the `_partition` column:
6880///
6981/// ```ignore
82+ /// use std::num::NonZeroUsize;
83+ /// use iceberg_datafusion::physical_plan::project_with_partition;
84+ ///
85+ /// let plan_with_partition = project_with_partition(input_plan, &table)?;
86+ ///
7087/// let repartitioned_plan = repartition(
71- /// input_plan ,
88+ /// plan_with_partition ,
7289/// table.metadata_ref(),
73- /// 4, // Explicit partition count
90+ /// NonZeroUsize::new(4).unwrap(),
7491/// )?;
7592/// ```
7693#[ allow( dead_code) ]
@@ -128,39 +145,26 @@ fn same_columns(a_exprs: &[Arc<dyn PhysicalExpr>], b_exprs: &[Arc<dyn PhysicalEx
128145///
129146/// ## Partitioning Strategy
130147///
131- /// - **Hash partitioning using `_partition` column**: Used when the input includes a
132- /// projected `_partition` column AND the partition spec contains Identity or Bucket transforms.
133- /// Ensures data is distributed based on actual partition values with good distribution.
134- ///
135- /// - **Hash partitioning using source columns**: Applied when identity or bucket transforms
136- /// provide good distribution:
137- /// 1. Identity partition columns (e.g., `PARTITIONED BY (user_id, category)`)
138- /// 2. Bucket columns from partition spec (e.g., `bucket(16, user_id)`)
139- ///
140- /// Ensures co-location within partitions and buckets for optimal file clustering.
141- ///
142- /// - **Round-robin partitioning**: Used for unpartitioned tables, or when partition transforms
143- /// don't provide good hash distribution (e.g., Year, Month, Day, Hour, Truncate transforms).
144- /// Ensures even load distribution across partitions.
148+ /// - **Partitioned tables**: Must have the `_partition` column in the input schema (added via
149+ /// `project_with_partition`). Uses hash partitioning if the partition spec contains Identity
150+ /// or Bucket transforms for good data distribution. Falls back to round-robin for temporal
151+ /// transforms (Year, Month, Day, Hour) that don't provide uniform hash distribution.
145152///
146- /// ## Column Priority
153+ /// - **Unpartitioned tables**: Always uses round-robin batch partitioning to ensure even load
154+ /// distribution across workers.
147155///
148- /// Columns are combined in the following order, with duplicates removed:
149- /// 1. `_partition` column (highest priority, if present)
150- /// 2. Identity partition columns from partition spec
151- /// 3. Bucket columns from partition spec
156+ /// ## Requirements
152157///
153- /// ## Fallback
154- ///
155- /// If no suitable hash columns are found, falls back to round-robin batch partitioning
156- /// to ensure even load distribution across partitions .
158+ /// - **For partitioned tables**: The input MUST include the `_partition` column
159+ /// (added via `project_with_partition()`).
160+ /// - **For unpartitioned tables**: No special preparation needed.
161+ /// - Returns an error if a partitioned table is missing the `_partition` column .
157162fn determine_partitioning_strategy (
158163 input : & Arc < dyn ExecutionPlan > ,
159164 table_metadata : & TableMetadata ,
160165 target_partitions : NonZeroUsize ,
161166) -> DFResult < Partitioning > {
162167 let partition_spec = table_metadata. default_partition_spec ( ) ;
163- let table_schema = table_metadata. current_schema ( ) ;
164168 let input_schema = input. schema ( ) ;
165169 let target_partition_count = target_partitions. get ( ) ;
166170
@@ -170,90 +174,34 @@ fn determine_partitioning_strategy(
170174 . iter ( )
171175 . any ( |pf| matches ! ( pf. transform, Transform :: Identity | Transform :: Bucket ( _) ) ) ;
172176
173- let partition_col_result = input_schema. index_of ( PARTITION_VALUES_COLUMN ) ;
177+ let partition_col_result = input_schema. index_of ( PROJECTED_PARTITION_VALUE_COLUMN ) ;
174178 let is_partitioned_table = !partition_spec. is_unpartitioned ( ) ;
175179
176180 match ( is_partitioned_table, partition_col_result) {
177181 // Case 1: Partitioned table with _partition column present
178182 ( true , Ok ( partition_col_idx) ) => {
179- let partition_field = input_schema. field ( partition_col_idx) ;
180- if partition_field. name ( ) != PARTITION_VALUES_COLUMN {
181- return Err ( DataFusionError :: Plan ( format ! (
182- "Expected {} column at index {}, but found '{}'" ,
183- PARTITION_VALUES_COLUMN ,
184- partition_col_idx,
185- partition_field. name( )
186- ) ) ) ;
187- }
188-
189- let partition_expr = Arc :: new ( Column :: new ( PARTITION_VALUES_COLUMN , partition_col_idx) )
183+ let partition_expr = Arc :: new ( Column :: new ( PROJECTED_PARTITION_VALUE_COLUMN , partition_col_idx) )
190184 as Arc < dyn PhysicalExpr > ;
191185
192- return if has_hash_friendly_transforms {
186+ if has_hash_friendly_transforms {
193187 Ok ( Partitioning :: Hash (
194188 vec ! [ partition_expr] ,
195189 target_partition_count,
196190 ) )
197191 } else {
198192 Ok ( Partitioning :: RoundRobinBatch ( target_partition_count) )
199- } ;
200- }
201-
202- // Case 2: Partitioned table missing _partition column (warning)
203- ( true , Err ( _) ) => {
204- tracing:: warn!(
205- "Partitioned table input missing {} column. \
206- Consider adding partition projection before repartitioning.",
207- PARTITION_VALUES_COLUMN
208- ) ;
209- }
210-
211- // Case 3: Unpartitioned table with _partition column
212- ( false , Ok ( _) ) => {
213- tracing:: warn!(
214- "Input contains {} column but table is unpartitioned. \
215- This may indicate unnecessary projection.",
216- PARTITION_VALUES_COLUMN
217- ) ;
193+ }
218194 }
219195
220- // Case 4: Unpartitioned table without _partition column
221- ( false , Err ( _) ) => {
222- // Nothing to do - fall through to source column analysis
223- }
224- }
196+ // Case 2: Partitioned table missing _partition column (normally this should not happen)
197+ ( true , Err ( _) ) => Err ( DataFusionError :: Plan ( format ! (
198+ "Partitioned table input missing {} column. \
199+ Ensure projection happens before repartitioning.",
200+ PROJECTED_PARTITION_VALUE_COLUMN
201+ ) ) ) ,
225202
226- let hash_column_names: Vec < & str > = partition_spec
227- . fields ( )
228- . iter ( )
229- . filter ( |pf| matches ! ( pf. transform, Transform :: Identity | Transform :: Bucket ( _) ) )
230- . filter_map ( |pf| {
231- table_schema
232- . field_by_id ( pf. source_id )
233- . map ( |sf| sf. name . as_str ( ) )
234- } )
235- . collect ( ) ;
236-
237- let mut seen_columns = HashSet :: with_capacity ( hash_column_names. len ( ) ) ;
238- let hash_exprs: Vec < Arc < dyn PhysicalExpr > > = hash_column_names
239- . into_iter ( )
240- . filter ( |name| seen_columns. insert ( * name) )
241- . map ( |column_name| {
242- let column_idx = input_schema. index_of ( column_name) . map_err ( |e| {
243- DataFusionError :: Plan ( format ! (
244- "Column '{}' not found in input schema. \
245- Ensure projection happens before repartitioning. Error: {}",
246- column_name, e
247- ) )
248- } ) ?;
249- Ok ( Arc :: new ( Column :: new ( column_name, column_idx) ) as Arc < dyn PhysicalExpr > )
250- } )
251- . collect :: < DFResult < _ > > ( ) ?;
252-
253- if !hash_exprs. is_empty ( ) {
254- Ok ( Partitioning :: Hash ( hash_exprs, target_partition_count) )
255- } else {
256- Ok ( Partitioning :: RoundRobinBatch ( target_partition_count) )
203+ // Case 3: Unpartitioned table, always use RoundRobinBatch
204+ ( false , _) => Ok ( Partitioning :: RoundRobinBatch ( target_partition_count) ) ,
257205 }
258206}
259207
@@ -561,6 +509,11 @@ mod tests {
561509 ArrowField :: new( "date" , ArrowDataType :: Date32 , false ) ,
562510 ArrowField :: new( "user_id" , ArrowDataType :: Int64 , false ) ,
563511 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
512+ ArrowField :: new(
513+ PROJECTED_PARTITION_VALUE_COLUMN ,
514+ ArrowDataType :: Struct ( Fields :: empty( ) ) ,
515+ false ,
516+ ) ,
564517 ] ) ) ;
565518 let input = Arc :: new ( EmptyExec :: new ( arrow_schema) ) ;
566519 let repartitioned_plan = repartition (
@@ -573,10 +526,11 @@ mod tests {
573526 let partitioning = repartitioned_plan. properties ( ) . output_partitioning ( ) ;
574527 match partitioning {
575528 Partitioning :: Hash ( exprs, _) => {
576- // With the new logic, we expect at least 1 column
577- assert ! (
578- !exprs. is_empty( ) ,
579- "Should have at least one column for hash partitioning"
529+ // Should use _partition column for hash partitioning
530+ assert_eq ! (
531+ exprs. len( ) ,
532+ 1 ,
533+ "Should have exactly one hash column (_partition)"
580534 ) ;
581535
582536 let column_names: Vec < String > = exprs
@@ -588,19 +542,13 @@ mod tests {
588542 } )
589543 . collect ( ) ;
590544
591- // Should include either user_id (identity transform) or date (partition field)
592- let has_user_id = column_names. contains ( & "user_id" . to_string ( ) ) ;
593- let has_date = column_names. contains ( & "date" . to_string ( ) ) ;
594545 assert ! (
595- has_user_id || has_date ,
596- "Should include either 'user_id' or 'date' column, got: {:?}" ,
546+ column_names . contains ( & PROJECTED_PARTITION_VALUE_COLUMN . to_string ( ) ) ,
547+ "Should use _partition column, got: {:?}" ,
597548 column_names
598549 ) ;
599550 }
600- Partitioning :: RoundRobinBatch ( _) => {
601- // This could happen if no suitable hash columns are found
602- }
603- _ => panic ! ( "Unexpected partitioning strategy: {:?}" , partitioning) ,
551+ _ => panic ! ( "Expected Hash partitioning with Identity transform" ) ,
604552 }
605553 }
606554
@@ -717,6 +665,11 @@ mod tests {
717665 let arrow_schema = Arc :: new ( ArrowSchema :: new ( vec ! [
718666 ArrowField :: new( "date" , ArrowDataType :: Date32 , false ) ,
719667 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
668+ ArrowField :: new(
669+ PROJECTED_PARTITION_VALUE_COLUMN ,
670+ ArrowDataType :: Struct ( Fields :: empty( ) ) ,
671+ false ,
672+ ) ,
720673 ] ) ) ;
721674 let input = Arc :: new ( EmptyExec :: new ( arrow_schema) ) ;
722675 let repartitioned_plan = repartition (
@@ -729,7 +682,7 @@ mod tests {
729682 let partitioning = repartitioned_plan. properties ( ) . output_partitioning ( ) ;
730683 assert ! (
731684 matches!( partitioning, Partitioning :: RoundRobinBatch ( _) ) ,
732- "Should use round-robin for range-only partitions "
685+ "Should use round-robin for temporal transforms (Day) that don't provide good hash distribution "
733686 ) ;
734687 }
735688
@@ -788,6 +741,11 @@ mod tests {
788741 ArrowField :: new( "date" , ArrowDataType :: Date32 , false ) ,
789742 ArrowField :: new( "user_id" , ArrowDataType :: Int64 , false ) ,
790743 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
744+ ArrowField :: new(
745+ PROJECTED_PARTITION_VALUE_COLUMN ,
746+ ArrowDataType :: Struct ( Fields :: empty( ) ) ,
747+ false ,
748+ ) ,
791749 ] ) ) ;
792750 let input = Arc :: new ( EmptyExec :: new ( arrow_schema) ) ;
793751 let repartitioned_plan = repartition (
@@ -800,11 +758,7 @@ mod tests {
800758 let partitioning = repartitioned_plan. properties ( ) . output_partitioning ( ) ;
801759 match partitioning {
802760 Partitioning :: Hash ( exprs, _) => {
803- assert_eq ! (
804- exprs. len( ) ,
805- 1 ,
806- "Should have one hash column (user_id identity transform)"
807- ) ;
761+ assert_eq ! ( exprs. len( ) , 1 , "Should have one hash column (_partition)" ) ;
808762 let column_names: Vec < String > = exprs
809763 . iter ( )
810764 . filter_map ( |expr| {
@@ -814,8 +768,9 @@ mod tests {
814768 } )
815769 . collect ( ) ;
816770 assert ! (
817- column_names. contains( & "user_id" . to_string( ) ) ,
818- "Should include identity transform column 'user_id'"
771+ column_names. contains( & PROJECTED_PARTITION_VALUE_COLUMN . to_string( ) ) ,
772+ "Should use _partition column for mixed transforms with Identity, got: {:?}" ,
773+ column_names
819774 ) ;
820775 }
821776 _ => panic ! ( "Expected Hash partitioning for table with identity transforms" ) ,
@@ -874,7 +829,7 @@ mod tests {
874829 ) ,
875830 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
876831 ArrowField :: new(
877- PARTITION_VALUES_COLUMN ,
832+ PROJECTED_PARTITION_VALUE_COLUMN ,
878833 ArrowDataType :: Struct ( Fields :: empty( ) ) ,
879834 false ,
880835 ) ,
@@ -943,7 +898,7 @@ mod tests {
943898 ArrowField :: new( "user_id" , ArrowDataType :: Int64 , false ) ,
944899 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
945900 ArrowField :: new(
946- PARTITION_VALUES_COLUMN ,
901+ PROJECTED_PARTITION_VALUE_COLUMN ,
947902 ArrowDataType :: Struct ( Fields :: empty( ) ) ,
948903 false ,
949904 ) ,
0 commit comments