Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LimitPushdown optimization rule and CoalesceBatchesExec fetch #27

Closed
wants to merge 51 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d868d60
Add LimitPushdown skeleton
alihandroid Jul 10, 2024
36f0ba9
Transform StreamTableExec into fetching version when skip is 0
alihandroid Jul 11, 2024
532fe69
Transform StreamTableExec into fetching version when skip is non-zero
alihandroid Jul 11, 2024
8dcd86b
Fix non-zero skip test
alihandroid Jul 11, 2024
26636f1
Add fetch field to CoalesceBatchesExec
alihandroid Jul 13, 2024
145da2d
Tag ProjectionExec, CoalescePartitionsExec and SortPreservingMergeExe…
alihandroid Jul 13, 2024
1d0d2c8
Add `with_fetch` to SortExec
alihandroid Jul 13, 2024
d4eab68
Push limit down through supporting ExecutionPlans
alihandroid Jul 13, 2024
ff1609f
Reorder LimitPushdown optimization to before SanityCheckPlan
alihandroid Jul 13, 2024
3a8989f
Refactor LimitPushdown tests
alihandroid Jul 15, 2024
10d6436
Refactor LimitPushdown tests
alihandroid Jul 15, 2024
26c907a
Add more LimitPushdown tests
alihandroid Jul 15, 2024
1f3299a
Add fetch support to CoalesceBatchesExec
alihandroid Jul 15, 2024
49efeb7
Fix tests that were affected
alihandroid Jul 15, 2024
68a315c
Refactor LimitPushdown push_down_limits
alihandroid Jul 15, 2024
7960c0a
Remove unnecessary parameter from coalesce_batches_exec
alihandroid Jul 15, 2024
5294cd2
Format files
alihandroid Jul 17, 2024
2bb7385
Apply clippy fixes
alihandroid Jul 17, 2024
54d8713
Make CoalesceBatchesExec display consistent
alihandroid Jul 17, 2024
db48495
Fix slt tests according to LimitPushdown rules
alihandroid Jul 17, 2024
f57326c
Resolve linter errors
mustafasrepo Jul 19, 2024
faaa4b5
Minor changes
mustafasrepo Jul 19, 2024
12a8d44
Merge branch 'apache_main' into alihan_apache_main
mustafasrepo Jul 19, 2024
c9c0845
Minor changes
mustafasrepo Jul 19, 2024
0fb546f
Fix GlobalLimitExec sometimes replacing LocalLimitExec
alihandroid Jul 21, 2024
79b0ba9
Fix unnecessary LocalLimitExec for ProjectionExec
alihandroid Jul 21, 2024
c2984d1
Rename GlobalOrLocal into LimitExec
alihandroid Jul 21, 2024
62c1f10
Clarify pushdown recursion
alihandroid Jul 21, 2024
5509bcf
Minor changes
mustafasrepo Jul 22, 2024
082d25e
Minor
berkaysynnada Jul 22, 2024
8b514fa
Do not display when fetch is None
berkaysynnada Jul 22, 2024
fb625c9
.rs removal
berkaysynnada Jul 22, 2024
bf75757
Clean-up tpch plans
berkaysynnada Jul 22, 2024
cb9806f
Clean-up comments
berkaysynnada Jul 22, 2024
887e75c
Update datafusion/core/src/physical_optimizer/optimizer.rs
ozankabak Jul 22, 2024
24282aa
Update datafusion/physical-plan/src/coalesce_batches.rs
ozankabak Jul 22, 2024
eb1b4c8
Update datafusion/physical-plan/src/coalesce_batches.rs
ozankabak Jul 22, 2024
33f9ece
Update datafusion/physical-plan/src/coalesce_batches.rs
ozankabak Jul 22, 2024
46621d5
Update datafusion/core/src/physical_optimizer/limit_pushdown.rs
ozankabak Jul 22, 2024
26a8d3c
Update datafusion/core/src/physical_optimizer/limit_pushdown.rs
ozankabak Jul 22, 2024
d57a3a6
Update datafusion/physical-plan/src/lib.rs
ozankabak Jul 22, 2024
f888826
Implement with_fetch() for other source execs
berkaysynnada Jul 22, 2024
a58d7c2
Merge branch 'tmp' into apache_main
berkaysynnada Jul 22, 2024
a096ea4
Minor
berkaysynnada Jul 22, 2024
0f7aa8d
Merge all Global/Local-LimitExec combinations in LimitPushdown
alihandroid Jul 25, 2024
dd9ad7a
Merge remote-tracking branch 'upstream/main' into apache_main
alihandroid Jul 25, 2024
f9becb6
Fix compile errors after merge
alihandroid Jul 25, 2024
0d01e76
Merge remote-tracking branch 'upstream/main' into apache_main
alihandroid Jul 26, 2024
a4a7794
Update datafusion/core/src/physical_optimizer/limit_pushdown.rs
ozankabak Jul 26, 2024
c4c7276
Avoid code duplication
ozankabak Jul 26, 2024
dcd69b6
Incorporate review feedback
ozankabak Jul 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
566 changes: 566 additions & 0 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod combine_partial_final_agg;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAgg
use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::join_selection::JoinSelection;
use crate::physical_optimizer::limit_pushdown::LimitPushdown;
use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation;
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
Expand Down Expand Up @@ -98,6 +99,10 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
// The LimitPushdown rule tries to push limits down as far as possible,
// replacing ExecutionPlans with fetching versions or adding limits
// past ExecutionPlans that support limit pushdown.
ozankabak marked this conversation as resolved.
Show resolved Hide resolved
Arc::new(LimitPushdown::new()),
// The SanityCheckPlan rule checks whether the order and
// distribution requirements of each node in the plan
// is satisfied. It will also reject non-runnable query
Expand Down
23 changes: 12 additions & 11 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,18 @@ async fn sort_preserving_merge() {
// SortPreservingMergeExec (not a Sort which would compete
// with the SortPreservingMergeExec for memory)
&[
"+---------------+-------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+-------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Limit: skip=0, fetch=10 |",
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+-------------------------------------------------------------------------------------------------------------+",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Limit: skip=0, fetch=10 |",
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | LocalLimitExec: fetch=10 |",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to get this plan would be to support limit pushdown into MemoryExec (rather than applying the limit with a new LocalLimitExec)

"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
]
)
.run()
Expand Down
164 changes: 155 additions & 9 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub struct CoalesceBatchesExec {
input: Arc<dyn ExecutionPlan>,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// Maximum number of rows to fetch,
/// `None` means fetching all rows
ozankabak marked this conversation as resolved.
Show resolved Hide resolved
fetch: Option<usize>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
Expand All @@ -58,11 +61,18 @@ impl CoalesceBatchesExec {
Self {
input,
target_batch_size,
fetch: None,
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
}

/// Update fetch with the argument
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}

/// The input plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
Expand Down Expand Up @@ -96,8 +106,13 @@ impl DisplayAs for CoalesceBatchesExec {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
self.target_batch_size,
)?;
if let Some(fetch) = self.fetch {
write!(f, ", fetch={fetch}")?;
};

Ok(())
}
}
}
Expand Down Expand Up @@ -133,10 +148,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CoalesceBatchesExec::new(
Arc::clone(&children[0]),
self.target_batch_size,
)))
Ok(Arc::new(
CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
.with_fetch(self.fetch),
))
}

fn execute(
Expand All @@ -148,8 +163,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
input: self.input.execute(partition, context)?,
schema: self.input.schema(),
target_batch_size: self.target_batch_size,
fetch: self.fetch,
buffer: Vec::new(),
buffered_rows: 0,
total_rows: 0,
is_closed: false,
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
}))
Expand All @@ -162,6 +179,16 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(CoalesceBatchesExec {
input: Arc::clone(&self.input),
target_batch_size: self.target_batch_size,
fetch: limit,
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

struct CoalesceBatchesStream {
Expand All @@ -171,10 +198,14 @@ struct CoalesceBatchesStream {
schema: SchemaRef,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// Maximum number of rows to fetch,
ozankabak marked this conversation as resolved.
Show resolved Hide resolved
fetch: Option<usize>,
/// Buffered batches
buffer: Vec<RecordBatch>,
/// Buffered row count
buffered_rows: usize,
/// Total number of rows returned
total_rows: usize,
/// Whether the stream has finished returning all of its data or not
is_closed: bool,
/// Execution metrics
Expand Down Expand Up @@ -216,6 +247,39 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
// handle fetch limit
if let Some(fetch) = self.fetch {
if self.total_rows + batch.num_rows() >= fetch {
// we have reached the fetch limit
let remaining_rows = fetch - self.total_rows;
// Shouldn't be empty
debug_assert!(remaining_rows > 0);

self.is_closed = true;
self.total_rows = fetch;
// trim the batch
let batch = batch.slice(0, remaining_rows);
// add to the buffered batches
self.buffered_rows += batch.num_rows();
self.buffer.push(batch);
// combine the batches and return
let batch = concat_batches(
&self.schema,
&self.buffer,
self.buffered_rows,
)?;
// reset buffer state
self.buffer.clear();
self.buffered_rows = 0;
// return batch
return Poll::Ready(Some(Ok(batch)));
} else {
self.total_rows += batch.num_rows();
}
} else {
self.total_rows += batch.num_rows();
}
ozankabak marked this conversation as resolved.
Show resolved Hide resolved

if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
Expand Down Expand Up @@ -304,7 +368,7 @@ mod tests {
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];

let output_partitions = coalesce_batches(&schema, partitions, 21).await?;
let output_partitions = coalesce_batches(&schema, partitions, 21, None).await?;
assert_eq!(1, output_partitions.len());

// input is 10 batches x 8 rows (80 rows)
Expand All @@ -319,6 +383,86 @@ mod tests {
Ok(())
}

#[tokio::test]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

async fn test_concat_batches_with_fetch_larger_than_input_size() -> Result<()> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];

let output_partitions =
coalesce_batches(&schema, partitions, 21, Some(100)).await?;
assert_eq!(1, output_partitions.len());

// input is 10 batches x 8 rows (80 rows) with fetch limit of 100
// expected to behave the same as `test_concat_batches`
let batches = &output_partitions[0];
assert_eq!(4, batches.len());
assert_eq!(24, batches[0].num_rows());
assert_eq!(24, batches[1].num_rows());
assert_eq!(24, batches[2].num_rows());
assert_eq!(8, batches[3].num_rows());

Ok(())
}

#[tokio::test]
async fn test_concat_batches_with_fetch_less_than_input_size() -> Result<()> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];

let output_partitions =
coalesce_batches(&schema, partitions, 21, Some(50)).await?;
assert_eq!(1, output_partitions.len());

// input is 10 batches x 8 rows (80 rows) with fetch limit of 50
let batches = &output_partitions[0];
assert_eq!(3, batches.len());
assert_eq!(24, batches[0].num_rows());
assert_eq!(24, batches[1].num_rows());
assert_eq!(2, batches[2].num_rows());

Ok(())
}

#[tokio::test]
async fn test_concat_batches_with_fetch_less_than_target_and_no_remaining_rows(
) -> Result<()> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];

let output_partitions =
coalesce_batches(&schema, partitions, 21, Some(48)).await?;
assert_eq!(1, output_partitions.len());

// input is 10 batches x 8 rows (80 rows) with fetch limit of 48
let batches = &output_partitions[0];
assert_eq!(2, batches.len());
assert_eq!(24, batches[0].num_rows());
assert_eq!(24, batches[1].num_rows());

Ok(())
}

#[tokio::test]
async fn test_concat_batches_with_fetch_less_target_batch_size() -> Result<()> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];

let output_partitions =
coalesce_batches(&schema, partitions, 21, Some(10)).await?;
assert_eq!(1, output_partitions.len());

// input is 10 batches x 8 rows (80 rows) with fetch limit of 10
let batches = &output_partitions[0];
assert_eq!(1, batches.len());
assert_eq!(10, batches[0].num_rows());

Ok(())
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 4 quantities with variable-length batch sizes:
1)Total input row count
2) Single batch input row count
3) Target batch count
4) Fetch count
We have tested four different cases by ordering them differently. Can you extend these cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rephrase your comment? I only changed the fetch count for the tests, do you mean that I should parametrize the other inputs as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please. For example, your tests cover:

  1. Total input row count > target batch size > fetch count > single batch row count
  2. Total input row count > fetch count > target batch size > single batch row count
  3. Fetch count > total input row count > target batch size > fetch count > single batch row count

Can you extend these with other scenarios which are possible to exist?

fn test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
}
Expand All @@ -327,13 +471,15 @@ mod tests {
schema: &SchemaRef,
input_partitions: Vec<Vec<RecordBatch>>,
target_batch_size: usize,
fetch: Option<usize>,
) -> Result<Vec<Vec<RecordBatch>>> {
// create physical plan
let exec = MemoryExec::try_new(&input_partitions, Arc::clone(schema), None)?;
let exec =
RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?;
let exec: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size));
let exec: Arc<dyn ExecutionPlan> = Arc::new(
CoalesceBatchesExec::new(Arc::new(exec), target_batch_size).with_fetch(fetch),
);

// execute and collect results
let output_partition_count = exec.output_partitioning().partition_count();
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn supports_limit_pushdown(&self) -> bool {
true
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}

/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
fn supports_limit_pushdown(&self) -> bool {
false
}

/// Returns a fetching version of this `ExecutionPlan` node, if it supports
ozankabak marked this conversation as resolved.
Show resolved Hide resolved
/// fetch limits. Returns `None` otherwise.
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
None
}
}

/// Extension trait provides an easy API to fetch various properties of
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ impl ExecutionPlan for ProjectionExec {
Arc::clone(&self.schema),
))
}

fn supports_limit_pushdown(&self) -> bool {
true
}
}

/// If e is a direct column reference, returns the field level
Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,17 @@ impl ExecutionPlan for SortExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
fetch: limit,
cache: self.cache.clone(),
}))
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn supports_limit_pushdown(&self) -> bool {
true
}
}

#[cfg(test)]
Expand Down
Loading
Loading