Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2,587 changes: 0 additions & 2,587 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs

This file was deleted.

5 changes: 0 additions & 5 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,9 @@
//! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan

pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod optimizer;
pub mod projection_pushdown;
pub mod replace_with_order_preserving_variants;
#[cfg(test)]
pub mod test_utils;

mod sort_pushdown;
mod utils;

pub use datafusion_physical_optimizer::*;

This file was deleted.

54 changes: 0 additions & 54 deletions datafusion/core/src/physical_optimizer/test_utils.rs

This file was deleted.

25 changes: 0 additions & 25 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;

/// This utility function adds a `SortExec` above an operator according to the
Expand Down Expand Up @@ -72,23 +69,6 @@ pub fn add_sort_above_with_check<T: Clone + Default>(
}
}

/// Checks whether the given operator is a limit;
/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
}

/// Checks whether the given operator is a window;
/// i.e. either a [`WindowAggExec`] or a [`BoundedWindowAggExec`].
pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<WindowAggExec>() || plan.as_any().is::<BoundedWindowAggExec>()
}

/// Checks whether the given operator is a [`SortExec`].
pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortExec>()
}

/// Checks whether the given operator is a [`SortPreservingMergeExec`].
pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortPreservingMergeExec>()
Expand All @@ -99,11 +79,6 @@ pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<CoalescePartitionsExec>()
}

/// Checks whether the given operator is a [`UnionExec`].
pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<UnionExec>()
}

/// Checks whether the given operator is a [`RepartitionExec`].
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
Expand Down
95 changes: 0 additions & 95 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,13 @@ use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::logical_expr::LogicalPlan;
use crate::physical_plan::ExecutionPlan;
use crate::test::object_store::local_unpartitioned_file;
use crate::test_util::{aggr_test_schema, arrow_test_data};

use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::DataFusionError;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};

#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
Expand Down Expand Up @@ -290,96 +286,5 @@ fn make_decimal() -> RecordBatch {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
}

/// Created a sorted Csv exec
pub fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();

Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(vec![sort_exprs]),
)
.with_has_header(false)
.with_delimeter(0)
.with_quote(0)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub(crate) struct TestStreamPartition {
pub schema: SchemaRef,
}

impl PartitionStream for TestStreamPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
unreachable!()
}
}

/// Create an unbounded stream exec
pub fn stream_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();

Arc::new(
StreamingTableExec::try_new(
schema.clone(),
vec![Arc::new(TestStreamPartition {
schema: schema.clone(),
}) as _],
None,
vec![sort_exprs],
true,
None,
)
.unwrap(),
)
}

/// Create a csv exec for tests
pub fn csv_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();

Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("file_path".to_string(), 100))
.with_output_ordering(vec![sort_exprs]),
)
.with_has_header(true)
.with_delimeter(0)
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}

pub mod object_store;
pub mod variable;
Loading
Loading