diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 3cc7d542a0dcd..a3a5b0618a9e2 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -348,6 +348,9 @@ impl RepartitionExec { for (num_output_partition, partition_indices) in indices.into_iter().enumerate() { + if partition_indices.is_empty() { + continue; + } let timer = r_metrics.repart_time.timer(); let indices = partition_indices.into(); // Produce batches based on indices @@ -952,4 +955,32 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn hash_repartition_avoid_empty_batch() -> Result<()> { + let batch = RecordBatch::try_from_iter(vec![( + "a", + Arc::new(StringArray::from(vec!["foo"])) as ArrayRef, + )]) + .unwrap(); + let partitioning = Partitioning::Hash( + vec![Arc::new(crate::physical_plan::expressions::Column::new( + "a", 0, + ))], + 2, + ); + let schema = batch.schema(); + let input = MockExec::new(vec![Ok(batch)], schema); + let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + let output_stream0 = exec.execute(0).await.unwrap(); + let batch0 = crate::physical_plan::common::collect(output_stream0) + .await + .unwrap(); + let output_stream1 = exec.execute(1).await.unwrap(); + let batch1 = crate::physical_plan::common::collect(output_stream1) + .await + .unwrap(); + assert!(batch0.is_empty() || batch1.is_empty()); + Ok(()) + } }