Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

test("ShuffledHashJoin metrics") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: plz avoid unnecesary changes where possible.

val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value")
// Assume the execution plan is
Expand All @@ -325,30 +325,49 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
// +- LocalTableScan(nodeId = 7)
Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
val df = df1.join(df2, "key")
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
nodeId3 -> (("Exchange", Map(
"shuffle records written" -> 10L,
"records read" -> 10L)))),
enableWholeStage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this indentation is correct. I know it is just a small cosmetic change and probably doesn't need to be included in this PR. I will remove it if you think this should not be there.

)
}
}
}

test("ShuffledHashJoin(left, outer) metrics") {
val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value")
val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
Seq((0L, "right_outer", leftDf, rightDf, 10L, false),
(0L, "left_outer", rightDf, leftDf, 10L, false),
(0L, "right_outer", leftDf, rightDf, 10L, true),
(0L, "left_outer", rightDf, leftDf, 10L, true),
(2L, "left_anti", rightDf, leftDf, 8L, true),
(2L, "left_semi", rightDf, leftDf, 2L, true),
(1L, "left_anti", rightDf, leftDf, 8L, false),
(1L, "left_semi", rightDf, leftDf, 2L, false))
.foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) =>
val df = leftDf.hint("shuffle_hash").join(
rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
nodeId3 -> (("Exchange", Map(
"shuffle records written" -> 10L,
"records read" -> 10L)))),
nodeId -> (("ShuffledHashJoin", Map(
"number of output rows" -> rows)))),
enableWholeStage
)
}
}
}

test("BroadcastHashJoin(outer) metrics") {
val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
// Assume the execution plan is
// ... -> BroadcastHashJoin(nodeId = 0)
Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
case (joinType, nodeId, numRows, enableWholeStage) =>
Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true),
("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) =>
val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
Expand All @@ -365,9 +384,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
withTempView("testDataForJoin") {
// Assume the execution plan is
// ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " +
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " +
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
Seq(false, true).foreach { enableWholeStage =>
Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true))
.foreach { case (query, enableWholeStage) =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 2, Map(
0L -> (("BroadcastNestedLoopJoin", Map(
Expand All @@ -394,6 +416,19 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}

test("BroadcastLeftAntiJoinHash metrics") {
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) =>
val df = df2.join(broadcast(df1), $"key" === $"key2", "left_anti")
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> 2L)))),
enableWholeStage
)
}
}

test("CartesianProduct metrics") {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
Expand Down