Skip to content

Commit

Permalink
fix: ser/der fetch in CoalesceBatchesExec (#12107)
Browse files Browse the repository at this point in the history
  • Loading branch information
haohuaijin committed Aug 22, 2024
1 parent a50aeef commit 4c3b744
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 4 deletions.
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ message NestedLoopJoinExecNode {
message CoalesceBatchesExecNode {
PhysicalPlanNode input = 1;
uint32 target_batch_size = 2;
optional uint32 fetch = 3;
}

message CoalescePartitionsExecNode {
Expand Down
19 changes: 19 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
runtime,
extension_codec,
)?;
Ok(Arc::new(CoalesceBatchesExec::new(
input,
coalesce_batches.target_batch_size as usize,
)))
Ok(Arc::new(
CoalesceBatchesExec::new(
input,
coalesce_batches.target_batch_size as usize,
)
.with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
))
}
PhysicalPlanType::Merge(merge) => {
let input: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -1536,6 +1539,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
protobuf::CoalesceBatchesExecNode {
input: Some(Box::new(input)),
target_batch_size: coalesce_batches.target_batch_size() as u32,
fetch: coalesce_batches.fetch().map(|n| n as u32),
},
))),
});
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::vec;
use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf;
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
use datafusion_functions_aggregate::min_max::max_udaf;
Expand Down Expand Up @@ -629,6 +630,23 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
))
}

#[test]
fn roundtrip_coalesce_with_fetch() -> Result<()> {
let field_a = Field::new("a", DataType::Boolean, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

roundtrip_test(Arc::new(CoalesceBatchesExec::new(
Arc::new(EmptyExec::new(schema.clone())),
8096,
)))?;

roundtrip_test(Arc::new(
CoalesceBatchesExec::new(Arc::new(EmptyExec::new(schema.clone())), 8096)
.with_fetch(Some(10)),
))
}

#[test]
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let scan_config = FileScanConfig {
Expand Down

0 comments on commit 4c3b744

Please sign in to comment.