Skip to content

Commit

Permalink
Use LexRequirement alias as much as possible (#12130)
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Aug 24, 2024
1 parent 2b6341c commit 3b90e3e
Show file tree
Hide file tree
Showing 20 changed files with 58 additions and 44 deletions.
5 changes: 3 additions & 2 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::{
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::{
datasource::{
file_format::{
Expand All @@ -38,7 +39,7 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_common::{GetExt, Statistics};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExpr;
use object_store::{ObjectMeta, ObjectStore};
use tempfile::tempdir;

Expand Down Expand Up @@ -123,7 +124,7 @@ impl FileFormat for TSVFileFormat {
input: Arc<dyn ExecutionPlan>,
state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.csv_file_format
.create_writer_physical_plan(input, state, conf, order_requirements)
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -178,7 +179,7 @@ impl FileFormat for ArrowFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Arrow format");
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ use datafusion_common::{
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -371,7 +372,7 @@ impl FileFormat for CsvFormat {
input: Arc<dyn ExecutionPlan>,
state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for CSV");
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

#[derive(Default)]
Expand Down Expand Up @@ -249,7 +250,7 @@ impl FileFormat for JsonFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Json");
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use file_compression_type::FileCompressionType;
use object_store::{ObjectMeta, ObjectStore};
use std::fmt::Debug;

/// Factory for creating [`FileFormat`] instances based on session and command level options
///
/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
Expand Down Expand Up @@ -132,7 +134,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<Vec<PhysicalSortRequirement>>,
_order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
Expand All @@ -76,6 +76,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -376,7 +377,7 @@ impl FileFormat for ParquetFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,7 @@ pub(crate) mod tests {
expressions, expressions::binary, expressions::lit, LexOrdering,
PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::PlanProperties;

/// Models operators like BoundedWindowExec that require an input
Expand Down Expand Up @@ -1489,7 +1490,7 @@ pub(crate) mod tests {
}

// model that it requires the output ordering of its input
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
if self.expr.is_empty() {
vec![None]
} else {
Expand Down
11 changes: 6 additions & 5 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;

/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
Expand All @@ -46,7 +47,7 @@ use datafusion_physical_expr::{
/// [`EnforceSorting`]: crate::physical_optimizer::enforce_sorting::EnforceSorting
#[derive(Default, Clone)]
pub struct ParentRequirements {
ordering_requirement: Option<Vec<PhysicalSortRequirement>>,
ordering_requirement: Option<LexRequirement>,
fetch: Option<usize>,
}

Expand Down Expand Up @@ -159,7 +160,7 @@ fn pushdown_sorts_helper(
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: LexRequirementRef,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
Expand Down Expand Up @@ -345,7 +346,7 @@ fn try_pushdown_requirements_to_join(
parent_required: LexRequirementRef,
sort_expr: Vec<PhysicalSortExpr>,
push_side: JoinSide,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let left_eq_properties = smj.left().equivalence_properties();
let right_eq_properties = smj.right().equivalence_properties();
let mut smj_required_orderings = smj.required_input_ordering();
Expand Down Expand Up @@ -460,7 +461,7 @@ fn expr_source_side(
fn shift_right_required(
parent_required: LexRequirementRef,
left_columns_len: usize,
) -> Result<Vec<PhysicalSortRequirement>> {
) -> Result<LexRequirement> {
let new_right_required = parent_required
.iter()
.filter_map(|r| {
Expand All @@ -486,7 +487,7 @@ enum RequirementsCompatibility {
/// Requirements satisfy
Satisfy,
/// Requirements compatible
Compatible(Option<Vec<PhysicalSortRequirement>>),
Compatible(Option<LexRequirement>),
/// Requirements not compatible
NonCompatible,
}
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ use datafusion_physical_plan::{

use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;
use datafusion_physical_expr_common::sort_expr::{
LexRequirement, PhysicalSortRequirement,
};

async fn register_current_csv(
ctx: &SessionContext,
Expand Down Expand Up @@ -416,7 +418,7 @@ impl ExecutionPlan for RequirementsTestExec {
self.input.properties()
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let requirement =
PhysicalSortRequirement::from_sort_exprs(&self.required_input_ordering);
vec![Some(requirement)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl PhysicalSortRequirement {
/// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> Vec<PhysicalSortRequirement> {
) -> LexRequirement {
ordering
.into_iter()
.cloned()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod tests {
// Convert each tuple to PhysicalSortRequirement
pub fn convert_to_sort_reqs(
in_data: &[(&Arc<dyn PhysicalExpr>, Option<SortOptions>)],
) -> Vec<PhysicalSortRequirement> {
) -> LexRequirement {
in_data
.iter()
.map(|(expr, options)| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl ExecutionPlan for OutputRequirementExec {
vec![&self.input]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![self.order_requirement.clone()]
}

Expand Down
7 changes: 3 additions & 4 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr,
};
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
Expand Down Expand Up @@ -125,7 +124,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// NOTE that checking `!is_empty()` does **not** check for a
/// required input ordering. Instead, the correct check is that at
/// least one entry must be `Some`
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![None; self.children().len()]
}

Expand Down
13 changes: 6 additions & 7 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ use arrow_array::{ArrayRef, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr::{Distribution, EquivalenceProperties};

use async_trait::async_trait;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::StreamExt;

/// `DataSink` implements writing streams of [`RecordBatch`]es to
Expand Down Expand Up @@ -90,7 +89,7 @@ pub struct DataSinkExec {
/// Schema describing the structure of the output data.
count_schema: SchemaRef,
/// Optional required sort order for output data.
sort_order: Option<Vec<PhysicalSortRequirement>>,
sort_order: Option<LexRequirement>,
cache: PlanProperties,
}

Expand All @@ -106,7 +105,7 @@ impl DataSinkExec {
input: Arc<dyn ExecutionPlan>,
sink: Arc<dyn DataSink>,
sink_schema: SchemaRef,
sort_order: Option<Vec<PhysicalSortRequirement>>,
sort_order: Option<LexRequirement>,
) -> Self {
let count_schema = make_count_schema();
let cache = Self::create_schema(&input, count_schema);
Expand All @@ -131,7 +130,7 @@ impl DataSinkExec {
}

/// Optional sort order for output data
pub fn sort_order(&self) -> &Option<Vec<PhysicalSortRequirement>> {
pub fn sort_order(&self) -> &Option<LexRequirement> {
&self.sort_order
}

Expand Down Expand Up @@ -189,7 +188,7 @@ impl ExecutionPlan for DataSinkExec {
vec![Distribution::SinglePartition; self.children().len()]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
// The required input ordering is set externally (e.g. by a `ListingTable`).
// Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`).
vec![self.sort_order.as_ref().cloned()]
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::expressions::PhysicalSortExpr;
use crate::joins::utils::{
Expand Down Expand Up @@ -288,7 +289,7 @@ impl ExecutionPlan for SortMergeJoinExec {
]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![
Some(PhysicalSortRequirement::from_sort_exprs(
&self.left_sort_exprs,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};

use ahash::RandomState;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{ready, Stream, StreamExt};
use hashbrown::HashSet;
use parking_lot::Mutex;
Expand Down Expand Up @@ -410,7 +411,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
}
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![
self.left_sort_exprs
.as_ref()
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;

use datafusion_physical_expr_common::sort_expr::LexRequirement;
use log::{debug, trace};

/// Sort preserving merge execution plan
Expand Down Expand Up @@ -187,7 +188,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
vec![false]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))]
}

Expand Down
Loading

0 comments on commit 3b90e3e

Please sign in to comment.