Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LimitPushdown optimization rule and CoalesceBatchesExec fetch #27

Closed
wants to merge 51 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d868d60
Add LimitPushdown skeleton
alihandroid Jul 10, 2024
36f0ba9
Transform StreamTableExec into fetching version when skip is 0
alihandroid Jul 11, 2024
532fe69
Transform StreamTableExec into fetching version when skip is non-zero
alihandroid Jul 11, 2024
8dcd86b
Fix non-zero skip test
alihandroid Jul 11, 2024
26636f1
Add fetch field to CoalesceBatchesExec
alihandroid Jul 13, 2024
145da2d
Tag ProjectionExec, CoalescePartitionsExec and SortPreservingMergeExe…
alihandroid Jul 13, 2024
1d0d2c8
Add `with_fetch` to SortExec
alihandroid Jul 13, 2024
d4eab68
Push limit down through supporting ExecutionPlans
alihandroid Jul 13, 2024
ff1609f
Reorder LimitPushdown optimization to before SanityCheckPlan
alihandroid Jul 13, 2024
3a8989f
Refactor LimitPushdown tests
alihandroid Jul 15, 2024
10d6436
Refactor LimitPushdown tests
alihandroid Jul 15, 2024
26c907a
Add more LimitPushdown tests
alihandroid Jul 15, 2024
1f3299a
Add fetch support to CoalesceBatchesExec
alihandroid Jul 15, 2024
49efeb7
Fix tests that were affected
alihandroid Jul 15, 2024
68a315c
Refactor LimitPushdown push_down_limits
alihandroid Jul 15, 2024
7960c0a
Remove unnecessary parameter from coalesce_batches_exec
alihandroid Jul 15, 2024
5294cd2
Format files
alihandroid Jul 17, 2024
2bb7385
Apply clippy fixes
alihandroid Jul 17, 2024
54d8713
Make CoalesceBatchesExec display consistent
alihandroid Jul 17, 2024
db48495
Fix slt tests according to LimitPushdown rules
alihandroid Jul 17, 2024
f57326c
Resolve linter errors
mustafasrepo Jul 19, 2024
faaa4b5
Minor changes
mustafasrepo Jul 19, 2024
12a8d44
Merge branch 'apache_main' into alihan_apache_main
mustafasrepo Jul 19, 2024
c9c0845
Minor changes
mustafasrepo Jul 19, 2024
0fb546f
Fix GlobalLimitExec sometimes replacing LocalLimitExec
alihandroid Jul 21, 2024
79b0ba9
Fix unnecessary LocalLimitExec for ProjectionExec
alihandroid Jul 21, 2024
c2984d1
Rename GlobalOrLocal into LimitExec
alihandroid Jul 21, 2024
62c1f10
Clarify pushdown recursion
alihandroid Jul 21, 2024
5509bcf
Minor changes
mustafasrepo Jul 22, 2024
082d25e
Minor
berkaysynnada Jul 22, 2024
8b514fa
Do not display when fetch is None
berkaysynnada Jul 22, 2024
fb625c9
.rs removal
berkaysynnada Jul 22, 2024
bf75757
Clean-up tpch plans
berkaysynnada Jul 22, 2024
cb9806f
Clean-up comments
berkaysynnada Jul 22, 2024
887e75c
Update datafusion/core/src/physical_optimizer/optimizer.rs
ozankabak Jul 22, 2024
24282aa
Update datafusion/physical-plan/src/coalesce_batches.rs
ozankabak Jul 22, 2024
eb1b4c8
Update datafusion/physical-plan/src/coalesce_batches.rs
ozankabak Jul 22, 2024
33f9ece
Update datafusion/physical-plan/src/coalesce_batches.rs
ozankabak Jul 22, 2024
46621d5
Update datafusion/core/src/physical_optimizer/limit_pushdown.rs
ozankabak Jul 22, 2024
26a8d3c
Update datafusion/core/src/physical_optimizer/limit_pushdown.rs
ozankabak Jul 22, 2024
d57a3a6
Update datafusion/physical-plan/src/lib.rs
ozankabak Jul 22, 2024
f888826
Implement with_fetch() for other source execs
berkaysynnada Jul 22, 2024
a58d7c2
Merge branch 'tmp' into apache_main
berkaysynnada Jul 22, 2024
a096ea4
Minor
berkaysynnada Jul 22, 2024
0f7aa8d
Merge all Global/Local-LimitExec combinations in LimitPushdown
alihandroid Jul 25, 2024
dd9ad7a
Merge remote-tracking branch 'upstream/main' into apache_main
alihandroid Jul 25, 2024
f9becb6
Fix compile errors after merge
alihandroid Jul 25, 2024
0d01e76
Merge remote-tracking branch 'upstream/main' into apache_main
alihandroid Jul 26, 2024
a4a7794
Update datafusion/core/src/physical_optimizer/limit_pushdown.rs
ozankabak Jul 26, 2024
c4c7276
Avoid code duplication
ozankabak Jul 26, 2024
dcd69b6
Incorporate review feedback
ozankabak Jul 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,19 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
projected_schema: self.projected_schema.clone(),
projected_output_ordering: self.projected_output_ordering.clone(),
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

pub struct ArrowOpener {
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
projected_schema: self.projected_schema.clone(),
projected_output_ordering: self.projected_output_ordering.clone(),
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

#[cfg(feature = "avro")]
Expand Down
18 changes: 18 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
has_header: self.has_header,
delimiter: self.delimiter,
quote: self.quote,
escape: self.escape,
comment: self.comment,
newlines_in_values: self.newlines_in_values,
metrics: self.metrics.clone(),
file_compression_type: self.file_compression_type,
cache: self.cache.clone(),
}))
}
}

/// A Config for [`CsvOpener`]
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
file_compression_type: self.file_compression_type,
cache: self.cache.clone(),
}))
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
18 changes: 18 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,24 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
metadata_size_hint: self.metadata_size_hint,
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
}))
}
}

fn should_enable_page_index(
Expand Down
Loading