Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Dec 28, 2023

What changes were proposed in this pull request?

the column references in ALSModel.transform maybe ambiguous in some case

Why are the changes needed?

to fix a bug

before this fix, the test fails with:

JVM stacktrace:
org.apache.spark.sql.AnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "features", "features" missing from "user", "item", "id", "features", "id", "features" in operator !Project [user#60, item#63, UDF(features#50, features#54) AS prediction#94]. Attribute(s) with the same name appear in the operation: "features", "features".
Please check if the right attribute(s) are used. SQLSTATE: XX000;

and


pyspark.errors.exceptions.captured.AnalysisException: Column features#50, features#46 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.

JVM stacktrace:
org.apache.spark.sql.AnalysisException: Column features#50, features#46 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.

Does this PR introduce any user-facing change?

yes, bug fix

How was this patch tested?

added ut

Was this patch authored or co-authored using generative AI tooling?

no

fix

fix

fix
model.write().overwrite().save(d)
loaded_model = ALSModel().load(d)

with self.sql_conf({"spark.sql.analyzer.failAmbiguousSelfJoin": False}):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, fails with [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION]

predictions = loaded_model.transform(users.crossJoin(items))
self.assertTrue(predictions.count() > 0)

with self.sql_conf({"spark.sql.analyzer.failAmbiguousSelfJoin": True}):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, fails with org.apache.spark.sql.AnalysisException: Column features#50, features#46 are ambiguous

@zhengruifeng zhengruifeng changed the title [SPARK-46538][ML] Fix an ambiguous column reference issue in ALSModel.transform [SPARK-46538][ML] Fix the ambiguous column reference issue in ALSModel.transform Dec 28, 2023
@zhengruifeng
Copy link
Contributor Author

cc @cloud-fan and @WeichenXu123

validatedItems === itemFactors("id"), "left")
.select(dataset("*"),
predict(userFactors("features"), itemFactors("features")).as($(predictionCol)))
.withColumns(Map($(userCol) -> validatedUsers, $(itemCol) -> validatedItems))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use the Seq version of withColumns? So that the column order is deterministic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a withColumns now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It maybe not needed, I want to use withColumns to validate the columns first (while keep the column name) and then reference the validated column by s"${validatedInputAlias}.${$(itemCol)}"

validatedItems === itemFactors("id"), "left")
.select(dataset("*"),
predict(userFactors("features"), itemFactors("features")).as($(predictionCol)))
.withColumns(Seq($(userCol), $(itemCol)), Seq(validatedUsers, validatedItems))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, previously validatedUsers was directly used in the join condition, now we materialize it first and reference only columns in the join condition.

@zhengruifeng
Copy link
Contributor Author

merged to master

@zhengruifeng zhengruifeng deleted the ml_als_reference branch December 29, 2023 01:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants