diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 0ee7fcadfb05..d4d0dbdfe4f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -310,8 +310,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("ShuffledHashJoin metrics") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is @@ -325,30 +325,49 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => - val df = df1.join(df2, "key") + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( + nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), + nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), + nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L)))), + enableWholeStage + ) + } + } + } + + test("ShuffledHashJoin(left, outer) metrics") { + val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") + val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + Seq((0L, "right_outer", leftDf, rightDf, 10L, false), + (0L, "left_outer", rightDf, leftDf, 10L, false), + (0L, "right_outer", leftDf, rightDf, 10L, true), + (0L, "left_outer", rightDf, leftDf, 10L, true), + (2L, "left_anti", rightDf, leftDf, 8L, true), + (2L, "left_semi", rightDf, leftDf, 2L, true), + (1L, "left_anti", rightDf, leftDf, 8L, false), + (1L, "left_semi", rightDf, leftDf, 2L, false)) + .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => + val df = leftDf.hint("shuffle_hash").join( + rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( - "number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( - "shuffle records written" -> 2L, - "records read" -> 2L))), - nodeId3 -> (("Exchange", Map( - "shuffle records written" -> 10L, - "records read" -> 10L)))), + nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> rows)))), enableWholeStage ) } - } } test("BroadcastHashJoin(outer) metrics") { val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value") val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value") - // Assume the execution plan is - // ... -> BroadcastHashJoin(nodeId = 0) - Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), - ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach { - case (joinType, nodeId, numRows, enableWholeStage) => + Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true), + ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) => val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 2, Map( nodeId -> (("BroadcastHashJoin", Map( @@ -365,9 +384,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("testDataForJoin") { // Assume the execution plan is // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0) - val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " + + val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " + + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" + val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" - Seq(false, true).foreach { enableWholeStage => + Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) + .foreach { case (query, enableWholeStage) => val df = spark.sql(query) testSparkPlanMetrics(df, 2, Map( 0L -> (("BroadcastNestedLoopJoin", Map( @@ -394,6 +416,19 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("BroadcastLeftAntiJoinHash metrics") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value") + Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) => + val df = df2.join(broadcast(df1), $"key" === $"key2", "left_anti") + testSparkPlanMetrics(df, 2, Map( + nodeId -> (("BroadcastHashJoin", Map( + "number of output rows" -> 2L)))), + enableWholeStage + ) + } + } + test("CartesianProduct metrics") { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)