diff --git a/benchmarks/src/bin/topk.rs b/benchmarks/src/bin/topk.rs new file mode 100644 index 0000000000000..ef8545a603f21 --- /dev/null +++ b/benchmarks/src/bin/topk.rs @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::compute::concat_batches; +use arrow::util::pretty::pretty_format_batches; +use datafusion::catalog::TableProvider; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::error::Result; +use datafusion::physical_plan; +use datafusion::physical_plan::{displayable, ExecutionPlan}; +use datafusion::prelude::*; +use datafusion_benchmarks::tpch::TPCH_TABLES; +use datafusion_common::utils::get_available_parallelism; +use datafusion_common::DEFAULT_PARQUET_EXTENSION; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::{fs, io}; +use datafusion_common::instant::Instant; + +fn create_external_table_query(year: i32, scratch_dir: &str) -> String { + format!( + r#" + CREATE EXTERNAL TABLE lineitem_ship_{}( + l_shipdate date, + l_commitdate date, + l_shipmode varchar, + l_quantity int + ) + STORED AS parquet + LOCATION '{}{}/' + WITH ORDER(l_shipdate) + "#, + year, scratch_dir, year + ) +} + +fn create_insert_query(year: i32) -> String { + format!( + r#" + INSERT INTO lineitem_ship_{} + SELECT l_shipdate, l_commitdate, l_shipmode, l_quantity + FROM lineitem + WHERE EXTRACT(YEAR FROM l_shipdate) = {} + ORDER BY l_shipdate +"#, + year, year + ) +} + +const TOP_K_QUERY: &str = r#" + SELECT l_shipdate, l_commitdate, l_quantity + FROM lineitem_ship_1992 WHERE l_shipmode IN ('MAIL', 'AIR') + ORDER BY l_shipdate, l_commitdate, l_quantity + LIMIT 10 +"#; + +const TOP_K_UNION_QUERY: &str = r#" + SELECT l_shipdate, l_commitdate, l_quantity + FROM ( + SELECT l_shipdate, l_commitdate, l_quantity FROM lineitem_ship_1992 WHERE l_shipmode IN ('MAIL', 'AIR') + UNION ALL + SELECT l_shipdate, l_commitdate, l_quantity FROM lineitem_ship_1993 WHERE l_shipmode IN ('MAIL', 'AIR') + UNION ALL + SELECT l_shipdate, l_commitdate, l_quantity FROM lineitem_ship_1994 WHERE l_shipmode IN ('MAIL', 'AIR') + UNION ALL + SELECT l_shipdate, l_commitdate, l_quantity FROM lineitem_ship_1995 WHERE l_shipmode IN ('MAIL', 'AIR') + UNION ALL + SELECT l_shipdate, l_commitdate, l_quantity FROM lineitem_ship_1996 WHERE l_shipmode IN ('MAIL', 'AIR') + UNION ALL + SELECT l_shipdate, l_commitdate, l_quantity FROM lineitem_ship_1997 WHERE l_shipmode IN ('KOALA') + ) + ORDER BY l_shipdate, l_commitdate, l_quantity + LIMIT 10 +"#; + +#[tokio::main] +async fn main() -> Result<()> { + let mut session_config = SessionConfig::new(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + let ctx = &SessionContext::new_with_config(session_config); + + init(ctx).await?; + + let start_time = Instant::now(); + run_query(ctx, TOP_K_QUERY).await?; + println!("TopK query execution time: {:?}", start_time.elapsed()); + + Ok(()) +} + +async fn init(ctx: &SessionContext) -> Result<()> { + register_tpch_tables(ctx).await?; + + // Retrieve the "scratch/topk" directory path. + let scratch_dir = scratch_dir(); + let scratch_path = Path::new(scratch_dir.as_str()); + + let scratch_parent = scratch_path + .parent() + .expect("Failed to get parent directory of scratch dir"); + + // Create the parent "scratch" directory if it doesn't exist. + if !scratch_parent.exists() { + fs::create_dir(scratch_parent) + .expect("Failed to create parent scratch directory"); + } + + for year in 1992..=1997 { + let year_path = scratch_path.join(format!("{}", year)); + if !year_path.as_path().exists() { + fs::create_dir(year_path.as_path()) + .expect("Failed to create parent scratch directory"); + } + let create_external_query = + create_external_table_query(year, scratch_dir.as_str()); + run_query(ctx, create_external_query.as_str()).await?; + if is_directory_empty(year_path.as_path()).unwrap_or(true) { + let insert_query = create_insert_query(year); + run_query(ctx, insert_query.as_str()).await?; + } + } + + Ok(()) +} + +/// Returns true if the given directory does not exist or is empty. +fn is_directory_empty(path: &Path) -> io::Result { + if !path.exists() { + return Ok(true); + } + let mut entries = fs::read_dir(path)?; + Ok(entries.next().is_none()) +} + +async fn run_query(ctx: &SessionContext, sql: &str) -> Result<()> { + println!("--------------------------------------------------------"); + println!("Query:\n{}\n", sql.trim()); + + let df = ctx.sql(sql).await?; + println!("Logical plan:\n{}\n", df.logical_plan().display_indent()); + + let physical_plan = df.create_physical_plan().await?; + println!( + "Physical plan:\n{}\n", + displayable(physical_plan.as_ref()).indent(true) + ); + + let mut root_node = physical_plan.clone(); + loop { + let children = root_node.children(); + if children.is_empty() { + break; + } + root_node = children[0].clone(); + } + println!("Root Node:\n{:?}\n", root_node); + + let results = + physical_plan::collect(physical_plan.clone(), ctx.state().task_ctx()).await?; + let results = if results.len() <= 1 { + results + } else { + vec![concat_batches(&results[0].schema(), &results)?] + }; + println!("Results:\n{}\n", pretty_format_batches(&results)?); + + // Walk the physical plan and dump execution metrics for each node. + dump_execution_metrics(&physical_plan, 0); + + Ok(()) +} + +/// Recursively walks the physical plan and prints each node's debug info and metrics. +fn dump_execution_metrics(plan: &Arc, indent: usize) { + let indent_str = " ".repeat(indent); + + // Print any available metrics for this node. + if let Some(metrics) = plan.metrics() { + let metrics = metrics.aggregate_by_name().sorted_for_display(); + println!("{}{} Metrics: {}", indent_str, plan.name(), metrics); + } else { + println!("{}{} No metrics available", indent_str, plan.name()); + } + + // Recursively dump metrics for each child node. + for child in plan.children() { + dump_execution_metrics(child, indent + 2); + } +} + +async fn register_tpch_tables(ctx: &SessionContext) -> Result<()> { + for table in TPCH_TABLES { + let table_provider = { get_table(ctx, table).await? }; + ctx.register_table(*table, table_provider)?; + } + Ok(()) +} + +async fn get_table(ctx: &SessionContext, table: &str) -> Result> { + let target_partitions = get_available_parallelism(); + + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + + let path = format!("{}/{}", tpch_data(), table); + + let state = ctx.state(); + + let options = ListingOptions::new(format) + .with_file_extension(DEFAULT_PARQUET_EXTENSION) + .with_target_partitions(target_partitions) + .with_collect_stat(state.config().collect_statistics()); + + let table_path = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .infer_schema(&state) + .await?; + + Ok(Arc::new(ListingTable::try_new(config)?)) +} + +pub fn tpch_data() -> String { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("./data/tpch_sf10") + .display() + .to_string() +} + +pub fn scratch_dir() -> String { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("./scratch/topk/") + .display() + .to_string() +} diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index c7c33ba5b2ba5..21e550abed814 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -581,6 +581,59 @@ impl EquivalenceProperties { true } + /// Determines the longest prefix of `reqs` that is satisfied by the + /// existing ordering. Returns that prefix as a new `LexRequirement`. + /// + /// For example, if `reqs = [a ASC, b ASC, c ASC]` and the plan properties + /// only satisfy `[a ASC, b ASC]` but not `c ASC`, then this function will + /// return `[a ASC, b ASC]`. If no entries are satisfied, it returns an + /// empty `LexRequirement`. + /// + /// Internally, this proceeds in lexicographic order. For each requirement + /// `reqs[i]`, we check: + /// 1. Does the current plan ordering satisfy `reqs[i]` given what was + /// already satisfied on the left? (if yes, add `reqs[i]` to the prefix + /// and mark that expression as a “constant” for future comparisons) + /// 2. Otherwise, stop. + /// + /// If the plan happens to satisfy all of `reqs`, you get back `reqs.clone()`. + pub fn longest_satisfied_prefix(&self, reqs: &LexRequirement) -> LexRequirement { + let mut eq_properties = self.clone(); + // First, standardize the given requirement: + let normalized_reqs = eq_properties.normalize_sort_requirements(reqs); + + // Check whether given ordering is satisfied by constraints first + if self.satisfied_by_constraints(&normalized_reqs) { + return normalized_reqs; + } + + // This will hold the longest prefix that is satisfied so far + let mut satisfied_prefix = Vec::new(); + + for normalized_req in normalized_reqs { + // Check whether given ordering is satisfied + if !eq_properties.ordering_satisfy_single(&normalized_req) { + break; + } + satisfied_prefix.push(normalized_req.clone()); + // Treat satisfied keys as constants in subsequent iterations. We + // can do this because the "next" key only matters in a lexicographical + // ordering when the keys to its left have the same values. + // + // Note that these expressions are not properly "constants". This is just + // an implementation strategy confined to this function. + // + // For example, assume that the requirement is `[a ASC, (b + c) ASC]`, + // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`. + // From the analysis above, we know that `[a ASC]` is satisfied. Then, + // we add column `a` as constant to the algorithm state. This enables us + // to deduce that `(b + c) ASC` is satisfied, given `a` is constant. + eq_properties = eq_properties + .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr))); + } + LexRequirement::new(satisfied_prefix) + } + /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique). /// Returns true if any constraint fully satisfies the requirements. fn satisfied_by_constraints( diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 1072e9abf437e..4e364ac2786b0 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -966,6 +966,8 @@ pub struct SortExec { preserve_partitioning: bool, /// Fetch highest/lowest n results fetch: Option, + /// Common sort prefix between the input and the sort expressions (only used with fetch) + sort_prefix: LexOrdering, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, } @@ -975,13 +977,15 @@ impl SortExec { /// sorted output partition. pub fn new(expr: LexOrdering, input: Arc) -> Self { let preserve_partitioning = false; - let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning); + let (cache, sort_prefix) = + Self::compute_properties(&input, expr.clone(), preserve_partitioning); Self { expr, input, metrics_set: ExecutionPlanMetricsSet::new(), preserve_partitioning, fetch: None, + sort_prefix, cache, } } @@ -1033,6 +1037,7 @@ impl SortExec { expr: self.expr.clone(), metrics_set: self.metrics_set.clone(), preserve_partitioning: self.preserve_partitioning, + sort_prefix: self.sort_prefix.clone(), fetch, cache, } @@ -1066,22 +1071,38 @@ impl SortExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + /// It also returns the common sort prefix between the input and the sort expressions. fn compute_properties( input: &Arc, sort_exprs: LexOrdering, preserve_partitioning: bool, - ) -> PlanProperties { + ) -> (PlanProperties, LexOrdering) { // Determine execution mode: let requirement = LexRequirement::from(sort_exprs); let sort_satisfied = input .equivalence_properties() .ordering_satisfy_requirement(&requirement); + let sort_prefix = if sort_satisfied { + LexRequirement::default() + } else { + input + .equivalence_properties() + .longest_satisfied_prefix(&requirement) + }; + let sort_partially_satisfied = sort_satisfied || !sort_prefix.is_empty(); + // The emission type depends on whether the input is already sorted: // - If already sorted, we can emit results in the same way as the input // - If not sorted, we must wait until all data is processed to emit results (Final) let emission_type = if sort_satisfied { input.pipeline_behavior() + } else if sort_partially_satisfied { + if input.pipeline_behavior() == EmissionType::Incremental { + EmissionType::Both + } else { + input.pipeline_behavior() + } } else { EmissionType::Final }; @@ -1114,11 +1135,14 @@ impl SortExec { let output_partitioning = Self::output_partitioning_helper(input, preserve_partitioning); - PlanProperties::new( - eq_properties, - output_partitioning, - emission_type, - boundedness, + ( + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + boundedness, + ), + LexOrdering::from(sort_prefix), ) } } @@ -1130,7 +1154,12 @@ impl DisplayAs for SortExec { let preserve_partitioning = self.preserve_partitioning; match self.fetch { Some(fetch) => { - write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr) + write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?; + if !self.sort_prefix.is_empty() { + write!(f, ", sort_prefix=[{}]", self.sort_prefix) + } else { + Ok(()) + } } None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr), } @@ -1203,10 +1232,12 @@ impl ExecutionPlan for SortExec { trace!("End SortExec's input.execute for partition: {}", partition); + let requirement = &LexRequirement::from(self.expr.clone()); + let sort_satisfied = self .input .equivalence_properties() - .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone())); + .ordering_satisfy_requirement(requirement); match (sort_satisfied, self.fetch.as_ref()) { (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( @@ -1220,6 +1251,7 @@ impl ExecutionPlan for SortExec { let mut topk = TopK::try_new( partition, input.schema(), + self.sort_prefix.clone(), self.expr.clone(), *fetch, context.session_config().batch_size(), @@ -1232,6 +1264,9 @@ impl ExecutionPlan for SortExec { while let Some(batch) = input.next().await { let batch = batch?; topk.insert_batch(batch)?; + if topk.finished { + break; + } } topk.emit() }) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 85de1eefce2e4..ea307da5d2db5 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -83,23 +83,31 @@ pub struct TopK { /// The target number of rows for output batches batch_size: usize, /// sort expressions - expr: Arc<[PhysicalSortExpr]>, + output_ordering: Arc<[PhysicalSortExpr]>, /// row converter, for sort keys row_converter: RowConverter, /// scratch space for converting rows scratch_rows: Rows, /// stores the top k values and their sort key values, in order heap: TopKHeap, + /// row converter, for common keys between the sort keys and the input ordering + prefix_row_converter: Option, + /// guaranteed ordering of input batches to allow early exit optimization + input_ordering: Arc<[PhysicalSortExpr]>, + /// If true, indicates that all rows of subsequent batches are guaranteed to be worse than the top K + pub(crate) finished: bool, } impl TopK { /// Create a new [`TopK`] that stores the top `k` values, as /// defined by the sort expressions in `expr`. // TODO: make a builder or some other nicer API + #[allow(clippy::too_many_arguments)] pub fn try_new( partition_id: usize, schema: SchemaRef, - expr: LexOrdering, + input_ordering: LexOrdering, + output_ordering: LexOrdering, k: usize, batch_size: usize, runtime: Arc, @@ -108,9 +116,21 @@ impl TopK { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); - let expr: Arc<[PhysicalSortExpr]> = expr.into(); + let output_ordering: Arc<[PhysicalSortExpr]> = output_ordering.into(); - let sort_fields: Vec<_> = expr + let output_sort_fields: Vec<_> = output_ordering + .iter() + .map(|e| { + Ok(SortField::new_with_options( + e.expr.data_type(&schema)?, + e.options, + )) + }) + .collect::>()?; + + let input_ordering: Arc<[PhysicalSortExpr]> = input_ordering.into(); + + let input_sort_fields: Vec<_> = input_ordering .iter() .map(|e| { Ok(SortField::new_with_options( @@ -122,21 +142,30 @@ impl TopK { // TODO there is potential to add special cases for single column sort fields // to improve performance - let row_converter = RowConverter::new(sort_fields)?; + let row_converter = RowConverter::new(output_sort_fields)?; let scratch_rows = row_converter.empty_rows( batch_size, 20 * batch_size, // guesstimate 20 bytes per row ); + let prefix_row_converter = if input_ordering.is_empty() { + None + } else { + Some(RowConverter::new(input_sort_fields)?) + }; + Ok(Self { schema: Arc::clone(&schema), metrics: TopKMetrics::new(metrics, partition_id), reservation, batch_size, - expr, + output_ordering, row_converter, scratch_rows, heap: TopKHeap::new(k, batch_size, schema), + prefix_row_converter, + input_ordering, + finished: false, }) } @@ -147,7 +176,7 @@ impl TopK { let _timer = self.metrics.baseline.elapsed_compute().timer(); let sort_keys: Vec = self - .expr + .output_ordering .iter() .map(|expr| { let value = expr.expr.evaluate(&batch)?; @@ -163,7 +192,7 @@ impl TopK { // TODO make this algorithmically better?: // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) // this avoids some work and also might be better vectorizable. - let mut batch_entry = self.heap.register_batch(batch); + let mut batch_entry = self.heap.register_batch(batch.clone()); for (index, row) in rows.iter().enumerate() { match self.heap.max() { // heap has k items, and the new row is greater than the @@ -183,6 +212,65 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // Only evaluate the prefix columns for a *single* row in both the batch + // (the "last" row) and the "worst" row in the heap + if !self.input_ordering.is_empty() + && self.heap.k > 0 + && self.heap.inner.len() >= self.heap.k + && batch.num_rows() > 0 + { + let prefix_converter = self.prefix_row_converter.as_mut().unwrap(); + + // 1) Evaluate the prefix expressions for *only* the last row of the batch + let last_index = batch.num_rows() - 1; + let single_row_arrays_for_last: Vec = self + .input_ordering + .iter() + .map(|expr| { + // Evaluate the single row at `last_index` + expr.expr + .evaluate(&batch.slice(last_index, 1))? + .into_array(1) + }) + .collect::>()?; + + // Append to `prefix_row`, which now has data for exactly 1 row + let mut prefix_row = prefix_converter.empty_rows(1, 20); + prefix_converter.append(&mut prefix_row, &single_row_arrays_for_last)?; + + // Extract that single Row from our `prefix_scratch_row` + let last_prefix = prefix_row.row(0); + + // 2) Evaluate the prefix expressions for the "worst" row in the heap + let worst_topk = self.heap.max().unwrap(); + let store_entry = self.heap.store.get(worst_topk.batch_id).unwrap(); + let batch_worst = &store_entry.batch; + + let worst_index = worst_topk.index; + + let single_row_arrays_for_worst: Vec = self + .input_ordering + .iter() + .map(|expr| { + // Evaluate the single row at `worst_index` + expr.expr + .evaluate(&batch_worst.slice(worst_index, 1))? + .into_array(1) + }) + .collect::>()?; + + let mut worst_scratch_row = prefix_converter.empty_rows(1, 20); + prefix_converter + .append(&mut worst_scratch_row, &single_row_arrays_for_worst)?; + let worst_prefix = worst_scratch_row.row(0); + + // 3) If last row prefix is strictly greater => we can mark finished + if last_prefix.as_ref() > worst_prefix.as_ref() { + self.finished = true; + } + } + Ok(()) } @@ -193,10 +281,13 @@ impl TopK { metrics, reservation: _, batch_size, - expr: _, + output_ordering: _, row_converter: _, scratch_rows: _, mut heap, + prefix_row_converter: _, + input_ordering: _, + finished: _, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 76e3751e4b8e4..936f4c0892666 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -2356,7 +2356,7 @@ logical_plan 03)----WindowAggr: windowExpr=[[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 04)------TableScan: aggregate_test_100 projection=[c9] physical_plan -01)SortExec: TopK(fetch=5), expr=[rn1@1 ASC NULLS LAST, c9@0 ASC NULLS LAST], preserve_partitioning=[false] +01)SortExec: TopK(fetch=5), expr=[rn1@1 ASC NULLS LAST, c9@0 ASC NULLS LAST], preserve_partitioning=[false], sort_prefix=[rn1@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[c9@0 as c9, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] 03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted] 04)------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false]