Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate and adjust Substrait NamedTable schemas (#12223) #12245

Merged
merged 21 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 121 additions & 10 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use crate::variation_const::{
};
use datafusion::arrow::array::{new_empty_array, AsArray};
use datafusion::common::scalar::ScalarStructBuilder;
use datafusion::dataframe::DataFrame;
use datafusion::logical_expr::builder::project;
use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::{
col, expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning,
Expand Down Expand Up @@ -277,6 +279,20 @@ pub fn extract_projection(
);
Ok(LogicalPlan::TableScan(scan))
}
LogicalPlan::Projection(projection) => {
// create another Projection around the Projection to handle the field masking
let fields: Vec<Expr> = column_indices
.into_iter()
.map(|i| {
let (qualifier, field) =
projection.schema.qualified_field(i);
let column =
Column::new(qualifier.cloned(), field.name());
Expr::Column(column)
})
.collect();
project(LogicalPlan::Projection(projection), fields)
}
_ => plan_err!("unexpected plan for table"),
}
}
Expand Down Expand Up @@ -640,6 +656,10 @@ pub async fn from_substrait_rel(
}
Some(RelType::Read(read)) => match &read.as_ref().read_type {
Some(ReadType::NamedTable(nt)) => {
let named_struct = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for Named Table")
})?;

let table_reference = match nt.names.len() {
0 => {
return plan_err!("No table name found in NamedTable");
Expand All @@ -657,7 +677,13 @@ pub async fn from_substrait_rel(
table: nt.names[2].clone().into(),
},
};
let t = ctx.table(table_reference).await?;

let substrait_schema =
from_substrait_named_struct(named_struct, extensions)?
.replace_qualifier(table_reference.clone());

let t = ctx.table(table_reference.clone()).await?;
let t = ensure_schema_compatability(t, substrait_schema)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should maybe add this for the local file reads below as well (I didn't have it in my branch yet as I didn't need it immediately). Somewhat annoyingly it'll cause the rest of the TPCH tests to fail...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth doing as one big swoop in this PR, or would it make sense to do it as a followup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me to do it separately!

let t = t.into_optimized_plan()?;
extract_projection(t, &read.projection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started wondering if the ensure_schema_compatability can now conflict with extract_projection - and I think it can, either by failing if DF doesn't optimize the select into a projection, or if DF does, then by overriding the select's projection with the Substrait projection...

I guess a fix would be something like in extract_projection, if there is an existing scan.projection, then apply columnIndices on it first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That did indeed cause problems. It triggered an error of unexpected plan for table in extract_projection.

I added some code for this case in d571eb2 (#12245). Is something like this what you had in mind?

I am noticing that the plans generated look a little weird/bad with a lot of redundant projects

                "Projection: DATA.a, DATA.b\
                \n  Projection: DATA.a, DATA.b\
                \n    Projection: DATA.a, DATA.b, DATA.c\
                \n      TableScan: DATA projection=[b, a, c]"

but they are at least correct for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is something like this what you had in mind?

What I had in mind was manipulating the scan.projection directly - kinda like it is alreadydone in extract_projection, we could do it that way also for ensure_schema_compatibility. That way there wouldn't be additional Projections, and maybe it'd be a bit more efficient if the current setup doesn't push the column-pruning into the scan level (though I'm a bit surprised they don't get optimized anyways).

But I don't think it's necessary - the way you've done it here seems correct, and we (I?) can do the project-mangling as a followup, unless you want to take a stab at it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think project unmangling would be better as a follow-up. Possible as part of #12347 because supporting remaps is going to add yet another layer of Projects 😅

}
Expand All @@ -671,7 +697,7 @@ pub async fn from_substrait_rel(
if vt.values.is_empty() {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema,
schema: DFSchemaRef::new(schema),
}));
}

Expand Down Expand Up @@ -704,7 +730,10 @@ pub async fn from_substrait_rel(
})
.collect::<Result<_>>()?;

Ok(LogicalPlan::Values(Values { schema, values }))
Ok(LogicalPlan::Values(Values {
schema: DFSchemaRef::new(schema),
values,
}))
}
Some(ReadType::LocalFiles(lf)) => {
fn extract_filename(name: &str) -> Option<String> {
Expand Down Expand Up @@ -850,6 +879,87 @@ pub async fn from_substrait_rel(
}
}

/// Ensures that the given Substrait schema is compatible with the schema as given by DataFusion
///
/// This means:
/// 1. All fields present in the Substrait schema are present in the DataFusion schema. The
/// DataFusion schema may have MORE fields, but not the other way around.
/// 2. All fields are compatible. See [`ensure_field_compatability`] for details
///
/// This function returns a DataFrame with fields adjusted if necessary in the event that the
/// Substrait schema is a subset of the DataFusion schema.
fn ensure_schema_compatability(
table: DataFrame,
substrait_schema: DFSchema,
) -> Result<DataFrame> {
let df_schema = table.schema().to_owned().strip_qualifiers();
if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
return Ok(table);
}
let selected_columns = substrait_schema
.strip_qualifiers()
.fields()
.iter()
.map(|substrait_field| {
let df_field =
df_schema.field_with_unqualified_name(substrait_field.name())?;
ensure_field_compatability(df_field, substrait_field)?;
Ok(col(format!("\"{}\"", df_field.name())))
})
.collect::<Result<_>>()?;

table.select(selected_columns)
}

/// Ensures that the given Substrait field is compatible with the given DataFusion field
///
/// A field is compatible between Substrait and DataFusion if:
/// 1. They have logically equivalent types.
/// 2. They have the same nullability OR the Substrait field is nullable and the DataFusion fields
/// is not nullable.
///
/// If a Substrait field is not nullable, the Substrait plan may be built around assuming it is not
/// nullable. As such if DataFusion has that field as nullable the plan should be rejected.
fn ensure_field_compatability(
datafusion_field: &Field,
substrait_field: &Field,
) -> Result<()> {
if !DFSchema::datatype_is_logically_equal(
datafusion_field.data_type(),
substrait_field.data_type(),
) {
return substrait_err!(
"Field '{}' in Substrait schema has a different type ({}) than the corresponding field in the table schema ({}).",
substrait_field.name(),
substrait_field.data_type(),
datafusion_field.data_type()
);
}

if !compatible_nullabilities(
datafusion_field.is_nullable(),
substrait_field.is_nullable(),
) {
// TODO: from_substrait_struct_type needs to be updated to set the nullability correctly. It defaults to true for now.
return substrait_err!(
"Field '{}' is nullable in the DataFusion schema but not nullable in the Substrait schema.",
substrait_field.name()
);
}
Ok(())
}

/// Returns true if the DataFusion and Substrait nullabilities are compatible, false otherwise
fn compatible_nullabilities(
datafusion_nullability: bool,
substrait_nullability: bool,
) -> bool {
// DataFusion and Substrait have the same nullability
(datafusion_nullability == substrait_nullability)
// DataFusion is not nullable and Substrait is nullable
|| (!datafusion_nullability && substrait_nullability)
}

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For
Expand Down Expand Up @@ -1586,10 +1696,11 @@ fn next_struct_field_name(
}
}

fn from_substrait_named_struct(
/// Convert Substrait NamedStruct to DataFusion DFSchemaRef
pub fn from_substrait_named_struct(
vbarua marked this conversation as resolved.
Show resolved Hide resolved
base_schema: &NamedStruct,
extensions: &Extensions,
) -> Result<DFSchemaRef> {
) -> Result<DFSchema> {
let mut name_idx = 0;
let fields = from_substrait_struct_type(
base_schema.r#struct.as_ref().ok_or_else(|| {
Expand All @@ -1601,12 +1712,12 @@ fn from_substrait_named_struct(
);
if name_idx != base_schema.names.len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
base_schema.names.len()
);
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
base_schema.names.len()
);
}
Ok(DFSchemaRef::new(DFSchema::try_from(Schema::new(fields?))?))
DFSchema::try_from(Schema::new(fields?))
}

fn from_substrait_bound(
Expand Down
16 changes: 5 additions & 11 deletions datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use crate::variation_const::{
use datafusion::arrow::array::{Array, GenericListArray, OffsetSizeTrait};
use datafusion::common::{
exec_err, internal_err, not_impl_err, plan_err, substrait_datafusion_err,
substrait_err, DFSchemaRef, ToDFSchema,
};
use datafusion::common::{substrait_err, DFSchemaRef};
#[allow(unused_imports)]
use datafusion::logical_expr::expr::{
Alias, BinaryExpr, Case, Cast, GroupingSet, InList, InSubquery, Sort, WindowFunction,
Expand Down Expand Up @@ -140,19 +140,13 @@ pub fn to_substrait_rel(
maintain_singular_struct: false,
});

let table_schema = scan.source.schema().to_dfschema_ref()?;
let base_schema = to_substrait_named_struct(&table_schema, extensions)?;

Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
base_schema: Some(NamedStruct {
names: scan
.source
.schema()
.fields()
.iter()
.map(|f| f.name().to_owned())
.collect(),
r#struct: None,
}),
base_schema: Some(base_schema),
vbarua marked this conversation as resolved.
Show resolved Hide resolved
filter: None,
best_effort_filter: None,
projection,
Expand Down
Loading