Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ prost = { workspace = true }
substrait = { version = "0.58", features = ["serde"] }
url = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
uuid = { version = "1.17.0", features = ["v4"] }

[dev-dependencies]
datafusion = { workspace = true, features = ["nested_expressions"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,20 @@ pub async fn from_project_rel(
// to transform it into a column reference
window_exprs.insert(e.clone());
}
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
// Substrait plans are ordinal based, so they do not provide names for columns.
// Names for columns are generated by Datafusion during conversion, and for literals
// Datafusion produces names based on the literal value. It is possible to construct
// valid Substrait plans that result in duplicated names if the same literal value is
// used in multiple relations. To avoid this issue, we alias literals with unique names.
// The name tracker will ensure that two literals in the same project would have
// unique names but, it does not ensure that if a literal column exists in a previous
// project say before a join that it is deduplicated with respect to those columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify my understanding of this. The name tracker guarantees name uniqueness in the output names of the relation it is applied to (Projects, Aggregates). In cases of relations that consume multiple inputs (Joins, Unions), if the individual inputs have names that are unique but duplicated between them, we get duplicate name issues in the output schema when we combine the input schemas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes so the name tracker makes sure that if you have a project that creates two null string columns, it will create two unique names for those two columns. But, say you create one of those null columns before a join and then another in a project immediately after a join, the plan fails with an ambiguous column error because there is a UTF8(NULL) from say the left and then another UTF8(NULL) from the project after the join which has no source and it's therefore an ambiguous reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're also correct that if you have a null in the left and the right side of a join then this will also be an issue today

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, say you create one of those null columns before a join and then another in a project immediately after a join, the plan fails with an ambiguous column error because there is a UTF8(NULL) from say the left and then another UTF8(NULL) from the project after the join which has no source and it's therefore an ambiguous reference.

Interesting, I would have expected

let mut final_exprs: Vec<Expr> = vec![];
for index in 0..original_schema.fields().len() {
let e = Expr::Column(Column::from(original_schema.qualified_field(index)));
final_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
}

to generate a unique name in that scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is the NameTracker doesn't ignore qualifiers, but the "ambiguous schema" check does. Thus if the input to the Project has e.g. "table1.NULL" column and adds a "NULL" column (from lit(NULL)), the NameTracker doesn't rename the newly added column, and then we get both table1.NULL and NULL columns which fails the ambiguous check.

I think my recommendation would be to make the NameTracker more robust instead, so that it ignores the qualifiers at least when there is also a non-qualified name. While this UUID-aliasing of literals seems like it should work for this specific case, I can imagine there might be some other case where the clash happens with non-literal columns (though I'm not able to come up with an example right now).

(Also hey 👋 @xanderbailey!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated some of the comments made by @vbarua. I'm happy to merge as is to unblock us and then I can have a go at improving the name tracker in a follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 1 is perfectly reasonable. This is still an improvement even if it doesn't solve every case, and we can always iterate it on it further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me.

FWIW, I looked a bit at what it'd take to fix the tracker. I think a core of the issue is that DF checks name ambiguity in two ways: there's the AmbiguousColumn exception you're running into, and then there is a validate_unique_names() function which gets called on the creation of the Project. The former needs unique non-qualified names, while the latter needs unique schema names (which can be qualified).

An easy fix for the former would be to change name_for_alias() into qualified_name()._1 here

match self.get_unique_name(expr.name_for_alias()?) {
. However, that then regresses the latter check (including in the test case for this PR), since there will then be a project node with an expr CAST(B.C as Utf8) with a qualified name ([no qualifier], "B.C") and a schema name "B.C", as well as a reference to the original column B.C with a qualified name ("B", "C") and also schema name "B.C". As the qualified name's name parts are different, it wouldn't be renamed (after the change I propose), and then it'd fail the validate_unique_names() check. So maybe for a proper fix, NameTracker would need to track both the schema name and the name-part of the qualified name, and rename until both are unique.

(A simple example of the behavior of the CAST and validate_unique_names() is that SELECT data.a, CAST(data.a as string) from data; also fails in datafusion-cli.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb seems like we're okay to merge as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a follow on ticket to track the improvement idea here:

// See: https://github.com/apache/datafusion/pull/17299
let maybe_apply_alias = match e {
lit @ Expr::Literal(_, _) => lit.alias(uuid::Uuid::new_v4().to_string()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one thing that's a bit wonky about this is that the usage of UUIDs injects a little bit of randomness into the conversion from Substrait plan to Datafusion plan. You're already dealing with this in your tests by using your filter for eliding UUID values:

        let mut settings = insta::Settings::clone_current();
        settings.add_filter(
            r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
            "[UUID]",
        );

Using UUIDs makes it easy to guarantee that names are unique, but make the plans less readable and can complicate testing.

Figuring out a deterministic scheme for this would be nice. We could potentially apply the name tracker to the inputs of multi-input relations (i.e. JoinRel, CrossRel, SetRel). That's not as nice as the UUID solution you have because it would require extra handling in every multi-input relation, but it could potentially improve readability. I'm not wedded to this though. The UUID solutions works well enough and maybe in practice it won't be that much of an issue. If it does, we can always tweak the plan conversion later.

_ => e,
};
explicit_exprs.push(name_tracker.get_uniquely_named_expr(maybe_apply_alias)?);
}

let input = if !window_exprs.is_empty() {
Expand Down
40 changes: 24 additions & 16 deletions datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,23 +647,31 @@ mod tests {
#[tokio::test]
async fn test_multiple_unions() -> Result<()> {
let plan_str = test_plan_to_string("multiple_unions.json").await?;
assert_snapshot!(
plan_str,
@r#"
Projection: Utf8("people") AS product_category, Utf8("people")__temp__0 AS product_type, product_key
Union
Projection: Utf8("people"), Utf8("people") AS Utf8("people")__temp__0, sales.product_key
Left Join: sales.product_key = food.@food_id
TableScan: sales
TableScan: food
Union
Projection: people.$f3, people.$f5, people.product_key0
Left Join: people.product_key0 = food.@food_id
TableScan: people
TableScan: food
TableScan: more_products
"#

let mut settings = insta::Settings::clone_current();
settings.add_filter(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
"[UUID]",
);
settings.bind(|| {
assert_snapshot!(
plan_str,
@r#"
Projection: [UUID] AS product_category, [UUID] AS product_type, product_key
Union
Projection: Utf8("people") AS [UUID], Utf8("people") AS [UUID], sales.product_key
Left Join: sales.product_key = food.@food_id
TableScan: sales
TableScan: food
Union
Projection: people.$f3, people.$f5, people.product_key0
Left Join: people.product_key0 = food.@food_id
TableScan: people
TableScan: food
TableScan: more_products
"#
);
});

Ok(())
}
Expand Down
41 changes: 41 additions & 0 deletions datafusion/substrait/tests/cases/logical_plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn null_literal_before_and_after_joins() -> Result<()> {
// Confirms that literals used before and after a join but for different columns
// are correctly handled.

// File generated with substrait-java's Isthmus:
// ./isthmus-cli/build/graal/isthmus --create "create table A (a int); create table B (a int, c int); create table C (a int, d int)" "select t.*, C.d, CAST(NULL AS VARCHAR) as e from (select a, CAST(NULL AS VARCHAR) as c from A UNION ALL select a, c from B) t LEFT JOIN C ON t.a = C.a"
let proto_plan =
read_json("tests/testdata/test_plans/disambiguate_literals_with_same_name.substrait.json");
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

let mut settings = insta::Settings::clone_current();
settings.add_filter(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
"[UUID]",
);
settings.bind(|| {
assert_snapshot!(
plan,
@r#"
Projection: left.A, left.[UUID] AS C, right.D, Utf8(NULL) AS [UUID] AS E
Left Join: left.A = right.A
SubqueryAlias: left
Union
Projection: A.A, Utf8(NULL) AS [UUID]
TableScan: A
Projection: B.A, CAST(B.C AS Utf8)
TableScan: B
SubqueryAlias: right
TableScan: C
"#
);
});

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}

#[tokio::test]
async fn non_nullable_lists() -> Result<()> {
// DataFusion's Substrait consumer treats all lists as nullable, even if the Substrait plan specifies them as non-nullable.
Expand Down
Loading