Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2003,10 +2003,7 @@ class Dataset[T] private[sql](
if (groupColExprIds.contains(attr.exprId)) {
attr
} else {
// Removing duplicate rows should not change output attributes. We should keep
// the original exprId of the attribute. Otherwise, to select a column in original
// dataset will cause analysis exception due to unresolved attribute.
Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
Aggregate(groupCols, aggCols, logicalPlan)
Expand Down
15 changes: 10 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
Expand Down Expand Up @@ -898,11 +899,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
(1, 2), (1, 1), (2, 1), (2, 2))
}

test("dropDuplicates should not change child plan output") {
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
checkDataset(
ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
("a", 1), ("b", 1))
test("SPARK-19065 dropDuplicates should not create expressions using the same id") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have an end-to-end test to show that using same id when alias will cause troubles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in my fist commit: 13f54a9

I removed it because it's not a Structured Streaming issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may introduce other unknown issues because I saw right now SQL rules that replace attributes don't deal with Aliass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we remove this test and add a new test to show the behavior change more obvious?

val df = ...
val df2 = df.dropDuplicates("i")
intercept[AnalysisException] { df2.select(df("i")) }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't work. The test needs to trigger an attribute replacement to expose this bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I didn't mean to test this bug, but the behavior change, i.e. what you discussed with Michael: #16564 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird to me that adding a test to verify that we don't support some feature, so I just added my previous regression test back in order to have a test to catch this issue.

val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS().dropDuplicates("_1")
var exprs = Set.empty[NamedExpression]
ds.logicalPlan.transformAllExpressions { case e: NamedExpression =>
exprs += e
e
}
val duplicatedExprs = exprs.groupBy(expr => expr.exprId).filter(_._2.size > 1).values
assert(duplicatedExprs.isEmpty)
}

test("SPARK-16097: Encoders.tuple should handle null object correctly") {
Expand Down