Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
}

val columnEquals = df.sparkSession.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
val fillColumnsInfo = df.schema.fields.filter { f =>
val typeMatches = (targetType, f.dataType) match {
case (NumericType, dt) => dt.isInstanceOf[NumericType]
case (StringType, dt) => dt == StringType
Expand All @@ -497,12 +497,10 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
throw new IllegalArgumentException(s"$targetType is not matched at fillValue")
}
// Only fill if the column is part of the cols list.
if (typeMatches && cols.exists(col => columnEquals(f.name, col))) {
fillCol[T](f, value)
} else {
df.col(f.name)
}
typeMatches && cols.exists(col => columnEquals(f.name, col))
}.map { col =>
(col.name, fillCol[T](col, value))
}
df.select(projections : _*)
df.withColumns(fillColumnsInfo.map(_._1), fillColumnsInfo.map(_._2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When df has a duplicate column name, what is the behavior? Also, we need to add test cases to ensure the behaviors are consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we fill the duplicate column, we'll still get AnalysisException: Reference xx is ambiguous. Add test cases in 03305be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking, BTW, does this keep the order of columns? Seems previously the order of columns in its input DataFrame but seems now it can be changed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the new approach, we only pass in the columns found in the existing fields, and withColumns will replace the existing columns with the original order.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for the help.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,41 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSparkSession {
}
}

def createDFsWithSameFieldsName(): (DataFrame, DataFrame) = {
val df1 = Seq(
("f1-1", "f2", null),
("f1-2", null, null),
("f1-3", "f2", "f3-1"),
("f1-4", "f2", "f3-1")
).toDF("f1", "f2", "f3")
val df2 = Seq(
("f1-1", null, null),
("f1-2", "f2", null),
("f1-3", "f2", "f4-1")
).toDF("f1", "f2", "f4")
(df1, df2)
}

test("fill unambiguous field for join operation") {
val (df1, df2) = createDFsWithSameFieldsName()
val joined_df = df1.join(df2, Seq("f1"), joinType = "left_outer")
checkAnswer(joined_df.na.fill("", cols = Seq("f4")),
Row("f1-1", "f2", null, null, "") ::
Row("f1-2", null, null, "f2", "") ::
Row("f1-3", "f2", "f3-1", "f2", "f4-1") ::
Row("f1-4", "f2", "f3-1", null, "") :: Nil)
}

test("fill ambiguous field for join operation") {
val (df1, df2) = createDFsWithSameFieldsName()
val joined_df = df1.join(df2, Seq("f1"), joinType = "left_outer")

val message = intercept[AnalysisException] {
joined_df.na.fill("", cols = Seq("f2"))
}.getMessage
assert(message.contains("Reference 'f2' is ambiguous"))
}

test("replace") {
val input = createDF()

Expand Down