Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coerce BinaryView/Utf8View to LargeBinary/LargeUtf8 on output. #12271

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,11 @@ config_namespace! {

/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false

/// When set to true, if the returned type is a view type
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ impl LogicalPlan {
/// referenced expressions into columns.
///
/// See also: [`crate::utils::columnize_expr`]
pub(crate) fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
match self {
LogicalPlan::Aggregate(aggregate) => Ok(aggregate
.output_expressions()?
Expand Down
231 changes: 225 additions & 6 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use std::sync::Arc;

use itertools::izip;

use arrow::datatypes::{DataType, Field, IntervalUnit};
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema};

use crate::analyzer::AnalyzerRule;
use crate::utils::NamePreserver;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column,
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference,
};
use datafusion_expr::expr::{
self, Alias, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like,
Expand Down Expand Up @@ -66,19 +66,39 @@ impl TypeCoercion {
}
}

/// Coerce output schema based upon optimizer config.
fn coerce_output(plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
if !config.optimizer.expand_views_at_output {
return Ok(plan);
}

let outer_refs = plan.expressions();
if outer_refs.is_empty() {
return Ok(plan);
}

if let Some(dfschema) = transform_schema_to_nonview(plan.schema()) {
coerce_plan_expr_for_schema(plan, &dfschema?)
} else {
Ok(plan)
}
}

impl AnalyzerRule for TypeCoercion {
fn name(&self) -> &str {
"type_coercion"
}

fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let empty_schema = DFSchema::empty();

// recurse
let transformed_plan = plan
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
.data;

Ok(transformed_plan)
// finish
coerce_output(transformed_plan, config)
}
}

Expand Down Expand Up @@ -514,6 +534,55 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
}
}

/// Transform a schema to use non-view types for Utf8View and BinaryView
fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option<Result<DFSchema>> {
let metadata = dfschema.as_arrow().metadata.clone();
let mut transformed = false;

let (qualifiers, transformed_fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
dfschema
.iter()
.map(|(qualifier, field)| match field.data_type() {
DataType::Utf8View => {
transformed = true;
(
qualifier.cloned() as Option<TableReference>,
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
)),
)
}
DataType::BinaryView => {
transformed = true;
(
qualifier.cloned() as Option<TableReference>,
Arc::new(Field::new(
field.name(),
DataType::LargeBinary,
field.is_nullable(),
)),
)
}
_ => (
qualifier.cloned() as Option<TableReference>,
Arc::clone(field),
),
})
.unzip();

if !transformed {
return None;
}

let schema = Schema::new_with_metadata(transformed_fields, metadata);
Some(DFSchema::from_field_specific_qualified_schema(
qualifiers,
&Arc::new(schema),
))
}

/// Casts the given `value` to `target_type`. Note that this function
/// only considers `Null` or `Utf8` values.
fn coerce_scalar(target_type: &DataType, value: &ScalarValue) -> Result<ScalarValue> {
Expand Down Expand Up @@ -935,10 +1004,11 @@ mod test {
use arrow::datatypes::DataType::Utf8;
use arrow::datatypes::{DataType, Field, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
use datafusion_expr::logical_plan::{EmptyRelation, Projection};
use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort};
use datafusion_expr::test::function_stub::avg_udaf;
use datafusion_expr::{
cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF,
Expand All @@ -951,7 +1021,7 @@ mod test {
use crate::analyzer::type_coercion::{
coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
};
use crate::test::assert_analyzed_plan_eq;
use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq};

fn empty() -> Arc<LogicalPlan> {
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down Expand Up @@ -982,6 +1052,155 @@ mod test {
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)
}

fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> Result<()> {
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;

assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expected,
)
}

fn do_not_coerce_on_output(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_analyzed_plan_with_config_eq(
ConfigOptions::default(),
Arc::new(TypeCoercion::new()),
plan.clone(),
expected,
)
}

#[test]
fn coerce_utf8view_output() -> Result<()> {
// Plan A
// scenario: outermost utf8view projection
let expr = col("a");
let empty = empty_with_type(DataType::Utf8View);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);
// Plan A: no coerce
let if_not_coerced = "Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan A: coerce requested: Utf8View => LargeUtf8
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

// Plan B
// scenario: outermost bool projection
let bool_expr = col("a").lt(lit("foo"));
let bool_plan = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
// Plan B: no coerce
let if_not_coerced =
"Projection: a < CAST(Utf8(\"foo\") AS Utf8View)\n EmptyRelation";
do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: no coercion applied
let if_coerced = if_not_coerced;
coerce_on_output_if_viewtype(bool_plan, if_coerced)?;

// Plan C
// scenario: with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
// Plan C: no coerce
let if_not_coerced =
"Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?;
// Plan C: coerce requested: Utf8View => LargeUtf8
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?;

// Plan D
// scenario: two layers of projections with view types
let plan = LogicalPlan::Projection(Projection::try_new(
vec![col("a")],
Arc::new(sort_plan),
)?);
// Plan D: no coerce
let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: Utf8View => LargeUtf8 only on outermost
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

Ok(())
}

#[test]
fn coerce_binaryview_output() -> Result<()> {
// Plan A
// scenario: outermost binaryview projection
let expr = col("a");
let empty = empty_with_type(DataType::BinaryView);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);
// Plan A: no coerce
let if_not_coerced = "Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan A: coerce requested: BinaryView => LargeBinary
let if_coerced = "Projection: CAST(a AS LargeBinary)\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

// Plan B
// scenario: outermost bool projection
let bool_expr = col("a").lt(lit(vec![8, 1, 8, 1]));
let bool_plan = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
// Plan B: no coerce
let if_not_coerced =
"Projection: a < CAST(Binary(\"8,1,8,1\") AS BinaryView)\n EmptyRelation";
do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: no coercion applied
let if_coerced = if_not_coerced;
coerce_on_output_if_viewtype(bool_plan, if_coerced)?;

// Plan C
// scenario: with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
// Plan C: no coerce
let if_not_coerced =
"Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?;
// Plan C: coerce requested: BinaryView => LargeBinary
let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?;

// Plan D
// scenario: two layers of projections with view types
let plan = LogicalPlan::Projection(Projection::try_new(
vec![col("a")],
Arc::new(sort_plan),
)?);
// Plan D: no coerce
let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: BinaryView => LargeBinary only on outermost
let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

Ok(())
}

#[test]
fn nested_case() -> Result<()> {
let expr = col("a").lt(lit(2_u32));
Expand Down
11 changes: 11 additions & 0 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ pub fn assert_analyzed_plan_eq(
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
assert_analyzed_plan_with_config_eq(options, rule, plan, expected)?;

Ok(())
}

pub fn assert_analyzed_plan_with_config_eq(
options: ConfigOptions,
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan}");
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.expand_views_at_output false
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
Expand Down Expand Up @@ -314,6 +315,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down