Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,22 @@ object OptimizeUpdateFields extends Rule[LogicalPlan] {
val values = withFields.map(_.valExpr)

val newNames = mutable.ArrayBuffer.empty[String]
val newValues = mutable.ArrayBuffer.empty[Expression]
val newValues = mutable.HashMap.empty[String, Expression]
// Used to remember the casing of the last instance
val nameMap = mutable.HashMap.empty[String, String]

if (caseSensitive) {
names.zip(values).reverse.foreach { case (name, value) =>
if (!newNames.contains(name)) {
newNames += name
newValues += value
}
}
} else {
val nameSet = mutable.HashSet.empty[String]
names.zip(values).reverse.foreach { case (name, value) =>
val lowercaseName = name.toLowerCase(Locale.ROOT)
if (!nameSet.contains(lowercaseName)) {
newNames += name
newValues += value
nameSet += lowercaseName
}
names.zip(values).foreach { case (name, value) =>
val normalizedName = if (caseSensitive) name else name.toLowerCase(Locale.ROOT)
if (nameMap.contains(normalizedName)) {
newValues += normalizedName -> value
} else {
newNames += normalizedName
newValues += normalizedName -> value
}
nameMap += normalizedName -> name
}

val newWithFields = newNames.reverse.zip(newValues.reverse).map(p => WithField(p._1, p._2))
val newWithFields = newNames.map(n => WithField(nameMap(n), newValues(n)))
UpdateFields(structExpr, newWithFields.toSeq)

case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,25 @@ class OptimizeWithFieldsSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
}

test("SPARK-35213: ensure optimize WithFields maintains correct WithField ordering") {
val originalQuery = testRelation
.select(
Alias(UpdateFields('a,
WithField("a1", Literal(3)) ::
WithField("b1", Literal(4)) ::
WithField("a1", Literal(5)) ::
Nil), "out")())

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
.select(
Alias(UpdateFields('a,
WithField("a1", Literal(5)) ::
WithField("b1", Literal(4)) ::
Nil), "out")())
.analyze

comparePlans(optimized, correctAnswer)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you check the output data type, you can see the struct type is not different:

optimized: ArrayBuffer(StructType(StructField(a1,IntegerType,false), StructField(b1,IntegerType,false)))
correctAnswer: ArrayBuffer(StructType(StructField(a1,IntegerType,false), StructField(b1,IntegerType,false)))

By design, UpdateFields will keep the order of fields in struct expression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yea, it looks better to keep original WithField order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just to sanity check the WithField order does actually stay the same, the tests on the Column Suite show how it can actually give you an incorrect schema. I don't fully know how a schema is determined (what part of the planning phase)

}
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,61 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
StructType(Seq(StructField("a", IntegerType, nullable = true))))
}

test("SPARK-35213: chained withField operations should have correct schema for new columns") {
val df = spark.createDataFrame(
sparkContext.parallelize(Row(null) :: Nil),
StructType(Seq(StructField("data", NullType))))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to just create an empty dataframe with no columns in Scala? I mostly operate and python and can just do spark.createDataFrame([[]])


checkAnswer(
df.withColumn("data", struct()
.withField("a", struct())
.withField("b", struct())
.withField("a.aa", lit("aa1"))
.withField("b.ba", lit("ba1"))
.withField("a.ab", lit("ab1"))),
Row(Row(Row("aa1", "ab1"), Row("ba1"))) :: Nil,
StructType(Seq(
StructField("data", StructType(Seq(
StructField("a", StructType(Seq(
StructField("aa", StringType, nullable = false),
StructField("ab", StringType, nullable = false)
)), nullable = false),
StructField("b", StructType(Seq(
StructField("ba", StringType, nullable = false)
)), nullable = false)
)), nullable = false)
Comment on lines +1702 to +1711
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Using ddl might be more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's kinda verbose, but I feel like for complicated things the objects are easier to understand than the DDL strings, especially with structs. Wasn't sure if there was an easier way to not have to explicitly mark everything as not nullable at least

))
)
}

test("SPARK-35213: optimized withField operations should maintain correct nested struct " +
"ordering") {
val df = spark.createDataFrame(
sparkContext.parallelize(Row(null) :: Nil),
StructType(Seq(StructField("data", NullType))))

checkAnswer(
df.withColumn("data", struct()
.withField("a", struct().withField("aa", lit("aa1")))
.withField("b", struct().withField("ba", lit("ba1")))
)
.withColumn("data", col("data").withField("b.bb", lit("bb1")))
.withColumn("data", col("data").withField("a.ab", lit("ab1"))),
Row(Row(Row("aa1", "ab1"), Row("ba1", "bb1"))) :: Nil,
StructType(Seq(
StructField("data", StructType(Seq(
StructField("a", StructType(Seq(
StructField("aa", StringType, nullable = false),
StructField("ab", StringType, nullable = false)
)), nullable = false),
StructField("b", StructType(Seq(
StructField("ba", StringType, nullable = false),
StructField("bb", StringType, nullable = false)
)), nullable = false)
)), nullable = false)
))
)
}

test("dropFields should throw an exception if called on a non-StructType column") {
intercept[AnalysisException] {
Expand Down