Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coerce BinaryView/Utf8View to LargeBinary/LargeUtf8 on output. #12271

Merged
merged 7 commits into from
Sep 9, 2024
Merged
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,11 @@ config_namespace! {

/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false

/// When set to true, if the returned type is a view type
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ impl LogicalPlan {
/// referenced expressions into columns.
///
/// See also: [`crate::utils::columnize_expr`]
pub(crate) fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
match self {
LogicalPlan::Aggregate(aggregate) => Ok(aggregate
.output_expressions()?
Expand Down
206 changes: 201 additions & 5 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use itertools::izip;

use arrow::datatypes::{DataType, Field, IntervalUnit};
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema};

use crate::analyzer::AnalyzerRule;
use crate::utils::NamePreserver;
Expand Down Expand Up @@ -66,19 +66,39 @@ impl TypeCoercion {
}
}

/// Coerce output schema based upon optimizer config.
fn coerce_output(plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
if !config.optimizer.expand_views_at_output {
return Ok(plan);
}

let outer_refs = plan.expressions();
if outer_refs.is_empty() {
return Ok(plan);
}

if let Some(dfschema) = transform_schema_to_nonview(plan.schema()) {
coerce_plan_expr_for_schema(plan, &dfschema?)
} else {
Ok(plan)
}
}

impl AnalyzerRule for TypeCoercion {
fn name(&self) -> &str {
"type_coercion"
}

fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let empty_schema = DFSchema::empty();

// recurse
let transformed_plan = plan
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
.data;

Ok(transformed_plan)
// finish
coerce_output(transformed_plan, config)
}
}

Expand Down Expand Up @@ -514,6 +534,59 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
}
}

/// Transform a schema to use non-view types for Utf8View and BinaryView
fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option<Result<DFSchema>> {
let metadata = dfschema.as_arrow().metadata.clone();
let len = dfschema.fields().len();
let mut transformed = false;

let (qualifiers, transformed_fields) = dfschema
.iter()
.map(|(qualifier, field)| match field.data_type() {
DataType::Utf8View => {
transformed = true;
(
qualifier,
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
)),
)
}
DataType::BinaryView => {
transformed = true;
(
qualifier,
Arc::new(Field::new(
field.name(),
DataType::LargeBinary,
field.is_nullable(),
)),
)
}
_ => (qualifier, Arc::clone(field)),
})
.fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut qs, mut fs), (q, f)| {
qs.push(q.cloned());
fs.push(f);
(qs, fs)
},
);
wiedld marked this conversation as resolved.
Show resolved Hide resolved

if !transformed {
return None;
}

let schema = Schema::new_with_metadata(transformed_fields, metadata);
Some(DFSchema::from_field_specific_qualified_schema(
qualifiers,
&Arc::new(schema),
))
}

/// Casts the given `value` to `target_type`. Note that this function
/// only considers `Null` or `Utf8` values.
fn coerce_scalar(target_type: &DataType, value: &ScalarValue) -> Result<ScalarValue> {
Expand Down Expand Up @@ -935,10 +1008,11 @@ mod test {
use arrow::datatypes::DataType::Utf8;
use arrow::datatypes::{DataType, Field, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
use datafusion_expr::logical_plan::{EmptyRelation, Projection};
use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort};
use datafusion_expr::test::function_stub::avg_udaf;
use datafusion_expr::{
cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF,
Expand All @@ -951,7 +1025,7 @@ mod test {
use crate::analyzer::type_coercion::{
coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
};
use crate::test::assert_analyzed_plan_eq;
use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq};

fn empty() -> Arc<LogicalPlan> {
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down Expand Up @@ -982,6 +1056,128 @@ mod test {
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)
}

#[test]
fn coerce_utf8view_output() -> Result<()> {
let expr = col("a");
let empty = empty_with_type(DataType::Utf8View);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);

// baseline
let expected = "Projection: a\n EmptyRelation";
wiedld marked this conversation as resolved.
Show resolved Hide resolved
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan.clone(), expected)?;

// coerce: Utf8View => LargeUtf8
let expect_cast = "Projection: CAST(a AS LargeUtf8)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expect_cast,
)?;

// request coerce --> but output in bool, not Utf8View
let bool_expr = col("a").lt(lit("foo"));
let plan_no_cast = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
let expect_no_cast =
wiedld marked this conversation as resolved.
Show resolved Hide resolved
"Projection: a < CAST(Utf8(\"foo\") AS Utf8View)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan_no_cast.clone(),
expect_no_cast,
)?;

// coerce start with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
let expect_cast = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
wiedld marked this conversation as resolved.
Show resolved Hide resolved
options,
Arc::new(TypeCoercion::new()),
sort_plan.clone(),
expect_cast,
)?;

Ok(())
}

#[test]
fn coerce_binaryview_output() -> Result<()> {
let expr = col("a");
let empty = empty_with_type(DataType::BinaryView);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);

// baseline
let expected = "Projection: a\n EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan.clone(), expected)?;

// coerce: BinaryView => LargeBinary
let expect_cast = "Projection: CAST(a AS LargeBinary)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expect_cast,
)?;

// request coerce --> but output in bool, not BinaryView
let bool_expr = col("a").lt(lit(vec![8, 1, 8, 1]));
let plan_no_cast = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
let expect_no_cast =
"Projection: a < CAST(Binary(\"8,1,8,1\") AS BinaryView)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan_no_cast.clone(),
expect_no_cast,
)?;

// coerce start with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
let expect_cast = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
sort_plan.clone(),
expect_cast,
)?;

Ok(())
}

#[test]
fn nested_case() -> Result<()> {
let expr = col("a").lt(lit(2_u32));
Expand Down
11 changes: 11 additions & 0 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ pub fn assert_analyzed_plan_eq(
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
assert_analyzed_plan_with_config_eq(options, rule, plan, expected)?;

Ok(())
}

pub fn assert_analyzed_plan_with_config_eq(
options: ConfigOptions,
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan}");
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.expand_views_at_output false
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
Expand Down Expand Up @@ -314,6 +315,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down