Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions rust/datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
target_batch_size: self.target_batch_size,
buffer: Vec::new(),
buffered_rows: 0,
is_closed: false,
}))
}
}
Expand All @@ -126,6 +127,8 @@ struct CoalesceBatchesStream {
buffer: Vec<RecordBatch>,
/// Buffered row count
buffered_rows: usize,
/// Whether the stream has finished returning all of its data or not
is_closed: bool,
}

impl Stream for CoalesceBatchesStream {
Expand All @@ -135,6 +138,9 @@ impl Stream for CoalesceBatchesStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.is_closed {
return Poll::Ready(None);
}
loop {
let input_batch = self.input.poll_next_unpin(cx);
match input_batch {
Expand Down Expand Up @@ -167,6 +173,7 @@ impl Stream for CoalesceBatchesStream {
}
}
None => {
self.is_closed = true;
// we have reached the end of the input stream but there could still
// be buffered batches
if self.buffer.is_empty() {
Expand Down Expand Up @@ -234,7 +241,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema};

Expand All @@ -244,7 +251,7 @@ mod tests {
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];

let output_partitions = coalesce_batches(&schema, partitions, 20).await?;
let output_partitions = coalesce_batches(&schema, partitions, 21).await?;
assert_eq!(1, output_partitions.len());

// input is 10 batches x 8 rows (80 rows)
Expand Down Expand Up @@ -287,6 +294,8 @@ mod tests {
) -> Result<Vec<Vec<RecordBatch>>> {
// create physical plan
let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?;
let exec =
RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?;
let exec: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size));

Expand Down