Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coerce BinaryView/Utf8View to LargeBinary/LargeUtf8 on output. #12271

Merged
merged 7 commits into from
Sep 9, 2024
245 changes: 134 additions & 111 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column,
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference,
};
use datafusion_expr::expr::{
self, Alias, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like,
Expand Down Expand Up @@ -537,44 +537,40 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
/// Transform a schema to use non-view types for Utf8View and BinaryView
fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option<Result<DFSchema>> {
let metadata = dfschema.as_arrow().metadata.clone();
let len = dfschema.fields().len();
let mut transformed = false;

let (qualifiers, transformed_fields) = dfschema
.iter()
.map(|(qualifier, field)| match field.data_type() {
DataType::Utf8View => {
transformed = true;
(
qualifier,
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
)),
)
}
DataType::BinaryView => {
transformed = true;
(
qualifier,
Arc::new(Field::new(
field.name(),
DataType::LargeBinary,
field.is_nullable(),
)),
)
}
_ => (qualifier, Arc::clone(field)),
})
.fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut qs, mut fs), (q, f)| {
qs.push(q.cloned());
fs.push(f);
(qs, fs)
},
);
let (qualifiers, transformed_fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
dfschema
.iter()
.map(|(qualifier, field)| match field.data_type() {
DataType::Utf8View => {
transformed = true;
(
qualifier.cloned() as Option<TableReference>,
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
)),
)
}
DataType::BinaryView => {
transformed = true;
(
qualifier.cloned() as Option<TableReference>,
Arc::new(Field::new(
field.name(),
DataType::LargeBinary,
field.is_nullable(),
)),
)
}
_ => (
qualifier.cloned() as Option<TableReference>,
Arc::clone(field),
),
})
.unzip();

if !transformed {
return None;
Expand Down Expand Up @@ -1056,124 +1052,151 @@ mod test {
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)
}

fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> Result<()> {
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;

assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expected,
)
}

fn do_not_coerce_on_output(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_analyzed_plan_with_config_eq(
ConfigOptions::default(),
Arc::new(TypeCoercion::new()),
plan.clone(),
expected,
)
}

#[test]
fn coerce_utf8view_output() -> Result<()> {
// Plan A
// scenario: outermost utf8view projection
let expr = col("a");
let empty = empty_with_type(DataType::Utf8View);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);

// baseline
let expected = "Projection: a\n EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan.clone(), expected)?;

// coerce: Utf8View => LargeUtf8
let expect_cast = "Projection: CAST(a AS LargeUtf8)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expect_cast,
)?;

// request coerce --> but output in bool, not Utf8View
// Plan A: no coerce
let if_not_coerced = "Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan A: coerce requested: Utf8View => LargeUtf8
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

// Plan B
// scenario: outermost bool projection
let bool_expr = col("a").lt(lit("foo"));
let plan_no_cast = LogicalPlan::Projection(Projection::try_new(
let bool_plan = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
let expect_no_cast =
// Plan B: no coerce
let if_not_coerced =
"Projection: a < CAST(Utf8(\"foo\") AS Utf8View)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan_no_cast.clone(),
expect_no_cast,
)?;
do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: no coercion applied
let if_coerced = if_not_coerced;
coerce_on_output_if_viewtype(bool_plan, if_coerced)?;

// coerce start with a non-projection root logical plan node
// Plan C
// scenario: with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
let expect_cast = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
sort_plan.clone(),
expect_cast,
)?;
// Plan C: no coerce
let if_not_coerced =
"Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?;
// Plan C: coerce requested: Utf8View => LargeUtf8
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?;

// Plan D
// scenario: two layers of projections with view types
let plan = LogicalPlan::Projection(Projection::try_new(
vec![col("a")],
Arc::new(sort_plan),
)?);
// Plan D: no coerce
let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: Utf8View => LargeUtf8 only on outermost
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

Ok(())
}

#[test]
fn coerce_binaryview_output() -> Result<()> {
// Plan A
// scenario: outermost binaryview projection
let expr = col("a");
let empty = empty_with_type(DataType::BinaryView);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);

// baseline
let expected = "Projection: a\n EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan.clone(), expected)?;

// coerce: BinaryView => LargeBinary
let expect_cast = "Projection: CAST(a AS LargeBinary)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expect_cast,
)?;

// request coerce --> but output in bool, not BinaryView
// Plan A: no coerce
let if_not_coerced = "Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan A: coerce requested: BinaryView => LargeBinary
let if_coerced = "Projection: CAST(a AS LargeBinary)\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

// Plan B
// scenario: outermost bool projection
let bool_expr = col("a").lt(lit(vec![8, 1, 8, 1]));
let plan_no_cast = LogicalPlan::Projection(Projection::try_new(
let bool_plan = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
let expect_no_cast =
// Plan B: no coerce
let if_not_coerced =
"Projection: a < CAST(Binary(\"8,1,8,1\") AS BinaryView)\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan_no_cast.clone(),
expect_no_cast,
)?;
do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: no coercion applied
let if_coerced = if_not_coerced;
coerce_on_output_if_viewtype(bool_plan, if_coerced)?;

// coerce start with a non-projection root logical plan node
// Plan C
// scenario: with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
let expect_cast = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;
assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
sort_plan.clone(),
expect_cast,
)?;
// Plan C: no coerce
let if_not_coerced =
"Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?;
// Plan C: coerce requested: BinaryView => LargeBinary
let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?;

// Plan D
// scenario: two layers of projections with view types
let plan = LogicalPlan::Projection(Projection::try_new(
vec![col("a")],
Arc::new(sort_plan),
)?);
// Plan D: no coerce
let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: BinaryView => LargeBinary only on outermost
let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

Ok(())
}
Expand Down