Skip to content

Commit

Permalink
Add fetch info to the statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Aug 8, 2024
1 parent 6b87c4c commit 8dd7e0a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
use futures::stream::{Stream, StreamExt};

use datafusion_common::stats::Precision;
use datafusion_common::Result;
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -212,7 +213,12 @@ impl ExecutionPlan for CoalesceBatchesExec {
}

fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
let mut statistics = self.input.statistics()?;
// When fetch is used output rows generated will be precise.
if let Some(fetch) = self.fetch {
statistics.num_rows = Precision::Exact(fetch);
}
Ok(statistics)
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -921,7 +922,12 @@ impl ExecutionPlan for SortExec {
}

fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
let mut statistics = self.input.statistics()?;
// When fetch is used output rows generated will be precise.
if let Some(fetch) = self.fetch {
statistics.num_rows = Precision::Exact(fetch);
}
Ok(statistics)
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Expand Down
17 changes: 17 additions & 0 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1148,3 +1148,20 @@ SELECT (SELECT c from ordered_table ORDER BY c LIMIT 1) UNION ALL (SELECT 23 as
----
0
23

# Do not increase partition number after fetch 1. As this will be unnecessary.
query TT
EXPLAIN SELECT a + b as sum1 FROM (SELECT a, b
FROM ordered_table
ORDER BY a ASC LIMIT 1
);
----
logical_plan
01)Projection: ordered_table.a + ordered_table.b AS sum1
02)--Limit: skip=0, fetch=1
03)----Sort: ordered_table.a ASC NULLS LAST, fetch=1
04)------TableScan: ordered_table projection=[a, b]
physical_plan
01)ProjectionExec: expr=[a@0 + b@1 as sum1]
02)--SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true
9 changes: 3 additions & 6 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2048,12 +2048,9 @@ logical_plan
05)--------TableScan: aggregate_test_100 projection=[c13]
physical_plan
01)ProjectionExec: expr=[array_agg(aggregate_test_100.c13)@0 as array_agg1]
02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(aggregate_test_100.c13)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[array_agg(aggregate_test_100.c13)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
06)----------SortExec: TopK(fetch=1), expr=[c13@0 ASC NULLS LAST], preserve_partitioning=[false]
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true
02)--AggregateExec: mode=Single, gby=[], aggr=[array_agg(aggregate_test_100.c13)]
03)----SortExec: TopK(fetch=1), expr=[c13@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true


query ?
Expand Down

0 comments on commit 8dd7e0a

Please sign in to comment.