diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 9f36fd8f794..b91e0b672eb 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -111,6 +111,7 @@ impl ExecutionPlan for CoalesceBatchesExec { target_batch_size: self.target_batch_size, buffer: Vec::new(), buffered_rows: 0, + is_closed: false, })) } } @@ -126,6 +127,8 @@ struct CoalesceBatchesStream { buffer: Vec, /// Buffered row count buffered_rows: usize, + /// Whether the stream has finished returning all of its data or not + is_closed: bool, } impl Stream for CoalesceBatchesStream { @@ -135,6 +138,9 @@ impl Stream for CoalesceBatchesStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if self.is_closed { + return Poll::Ready(None); + } loop { let input_batch = self.input.poll_next_unpin(cx); match input_batch { @@ -167,6 +173,7 @@ impl Stream for CoalesceBatchesStream { } } None => { + self.is_closed = true; // we have reached the end of the input stream but there could still // be buffered batches if self.buffer.is_empty() { @@ -234,7 +241,7 @@ pub fn concat_batches( #[cfg(test)] mod tests { use super::*; - use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec}; use arrow::array::UInt32Array; use arrow::datatypes::{DataType, Field, Schema}; @@ -244,7 +251,7 @@ mod tests { let partition = create_vec_batches(&schema, 10); let partitions = vec![partition]; - let output_partitions = coalesce_batches(&schema, partitions, 20).await?; + let output_partitions = coalesce_batches(&schema, partitions, 21).await?; assert_eq!(1, output_partitions.len()); // input is 10 batches x 8 rows (80 rows) @@ -287,6 +294,8 @@ mod tests { ) -> Result>> { // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec = + RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?; let exec: Arc = Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size));