From 2af93ea0620fad961ac31fc0e7d0838965a030a4 Mon Sep 17 00:00:00 2001 From: rishi Date: Fri, 24 Apr 2020 12:24:53 -0700 Subject: [PATCH 1/7] [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite --- .../execution/metric/SQLMetricsSuite.scala | 78 ++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 0ee7fcadfb05..3c3352acb3be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, false), + ("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, true)) + .foreach { case (joinType, nodeId, df1, df2, enableWholeStage) => + val df = df1.join(df2, $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> 10L)))), + enableWholeStage + ) + } + } + } + + test("ShuffledHashJoin(left-anti) metrics") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq((2L, true), (1L, false)).foreach { case (nodeId, enableWholeStage) => + val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", "left_anti") + testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> 8L)))), + enableWholeStage + ) + } + } + } + + test("ShuffledHashJoin(left-semi) metrics") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) => + val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", "left_semi") + testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L)))), + enableWholeStage + ) + } + } + } + test("BroadcastHashJoin(outer) metrics") { val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value") val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value") @@ -365,9 +421,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("testDataForJoin") { // Assume the execution plan is // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0) - val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " + + val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " + + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" + val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" - Seq(false, true).foreach { enableWholeStage => + Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) + .foreach { case (query, enableWholeStage) => val df = spark.sql(query) testSparkPlanMetrics(df, 2, Map( 0L -> (("BroadcastNestedLoopJoin", Map( @@ -394,6 +453,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("BroadcastLeftAntiJoinHash metrics") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value") + // Assume the execution plan is + // ... -> BroadcastHashJoin(nodeId = 1) + Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) => + val df = df2.join(broadcast(df1), $"key" === $"key2", "left_anti") + testSparkPlanMetrics(df, 2, Map( + nodeId -> (("BroadcastHashJoin", Map( + "number of output rows" -> 2L)))), + enableWholeStage + ) + } + } + test("CartesianProduct metrics") { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2) From 0d16bdf48b9f33df8956c6e6c37ab48e3050244e Mon Sep 17 00:00:00 2001 From: rishi Date: Sun, 26 Apr 2020 10:51:18 -0700 Subject: [PATCH 2/7] incorporating code review comments --- .../execution/metric/SQLMetricsSuite.scala | 128 ++++++++---------- 1 file changed, 58 insertions(+), 70 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 3c3352acb3be..8c331ee1f1f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -310,8 +310,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("ShuffledHashJoin metrics") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is @@ -325,18 +325,18 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => - val df = df1.join(df2, "key") - testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( - "number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( - "shuffle records written" -> 2L, - "records read" -> 2L))), - nodeId3 -> (("Exchange", Map( - "shuffle records written" -> 10L, - "records read" -> 10L)))), - enableWholeStage - ) + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( + nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), + nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), + nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L)))), + enableWholeStage + ) } } } @@ -345,55 +345,43 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") - val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") - - Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, false), - ("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, true)) - .foreach { case (joinType, nodeId, df1, df2, enableWholeStage) => - val df = df1.join(df2, $"key" === $"key2", joinType) + val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") + val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq((0L, "right_outer", leftDf, rightDf, 10L, false), + (0L, "left_outer", rightDf, leftDf, 10L, false), + (0L, "right_outer", leftDf, rightDf, 10L, true), + (0L, "left_outer", rightDf, leftDf, 10L, true)) + .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => + val df = leftDf.join(rightDf, $"key" === $"key2", joinType) testSparkPlanMetrics(df, 1, Map( nodeId -> (("ShuffledHashJoin", Map( - "number of output rows" -> 10L)))), + "number of output rows" -> rows)))), enableWholeStage ) } } } - test("ShuffledHashJoin(left-anti) metrics") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") - val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") - - Seq((2L, true), (1L, false)).foreach { case (nodeId, enableWholeStage) => - val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", "left_anti") - testSparkPlanMetrics(df, 1, Map( - nodeId -> (("ShuffledHashJoin", Map( - "number of output rows" -> 8L)))), - enableWholeStage - ) - } - } - } - - test("ShuffledHashJoin(left-semi) metrics") { + test("ShuffledHashJoin(left) metrics") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") - val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") - - Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) => - val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", "left_semi") - testSparkPlanMetrics(df, 1, Map( - nodeId -> (("ShuffledHashJoin", Map( - "number of output rows" -> 2L)))), - enableWholeStage - ) - } + val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") + val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq((2L, "left_anti", leftDf, rightDf, 8L, true), + (2L, "left_semi", leftDf, rightDf, 2L, true), + (1L, "left_anti", leftDf, rightDf, 8L, false), + (1L, "left_semi", leftDf, rightDf, 2L, false)) + .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => + val df = rightDf.join(leftDf.hint("shuffle_hash"), $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> rows)))), + enableWholeStage + ) + } } } @@ -402,15 +390,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value") // Assume the execution plan is // ... -> BroadcastHashJoin(nodeId = 0) - Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), - ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach { + Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true), + ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) => - val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) - testSparkPlanMetrics(df, 2, Map( - nodeId -> (("BroadcastHashJoin", Map( - "number of output rows" -> numRows)))), - enableWholeStage - ) + val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 2, Map( + nodeId -> (("BroadcastHashJoin", Map( + "number of output rows" -> numRows)))), + enableWholeStage + ) } } @@ -427,13 +415,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) .foreach { case (query, enableWholeStage) => - val df = spark.sql(query) - testSparkPlanMetrics(df, 2, Map( - 0L -> (("BroadcastNestedLoopJoin", Map( - "number of output rows" -> 12L)))), - enableWholeStage - ) - } + val df = spark.sql(query) + testSparkPlanMetrics(df, 2, Map( + 0L -> (("BroadcastNestedLoopJoin", Map( + "number of output rows" -> 12L)))), + enableWholeStage + ) + } } } } @@ -621,9 +609,9 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { def checkFilterAndRangeMetrics( - df: DataFrame, - filterNumOutputs: Int, - rangeNumOutputs: Int): Unit = { + df: DataFrame, + filterNumOutputs: Int, + rangeNumOutputs: Int): Unit = { val plan = df.queryExecution.executedPlan val filters = collectNodeWithinWholeStage[FilterExec](plan) From 7565906da5f28c4e6dde25ed7fedb20c9f85e327 Mon Sep 17 00:00:00 2001 From: rishi Date: Sun, 26 Apr 2020 11:19:52 -0700 Subject: [PATCH 3/7] incorporating code review comments --- .../execution/metric/SQLMetricsSuite.scala | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 8c331ee1f1f3..88c6d50471cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -341,41 +341,22 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } - test("ShuffledHashJoin(outer) metrics") { + test("ShuffledHashJoin(left,outer) metrics") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") - Seq((0L, "right_outer", leftDf, rightDf, 10L, false), (0L, "left_outer", rightDf, leftDf, 10L, false), (0L, "right_outer", leftDf, rightDf, 10L, true), - (0L, "left_outer", rightDf, leftDf, 10L, true)) - .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => - val df = leftDf.join(rightDf, $"key" === $"key2", joinType) - testSparkPlanMetrics(df, 1, Map( - nodeId -> (("ShuffledHashJoin", Map( - "number of output rows" -> rows)))), - enableWholeStage - ) - } - } - } - - test("ShuffledHashJoin(left) metrics") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") - val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") - - Seq((2L, "left_anti", leftDf, rightDf, 8L, true), - (2L, "left_semi", leftDf, rightDf, 2L, true), - (1L, "left_anti", leftDf, rightDf, 8L, false), - (1L, "left_semi", leftDf, rightDf, 2L, false)) + (0L, "left_outer", rightDf, leftDf, 10L, true), + (2L, "left_anti", rightDf, leftDf, 8L, true), + (2L, "left_semi", rightDf, leftDf, 2L, true), + (1L, "left_anti", rightDf, leftDf, 8L, false), + (1L, "left_semi", rightDf, leftDf, 2L, false)) .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => - val df = rightDf.join(leftDf.hint("shuffle_hash"), $"key" === $"key2", joinType) + val df = leftDf.join(rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 1, Map( nodeId -> (("ShuffledHashJoin", Map( "number of output rows" -> rows)))), From 882ba724505201f940497cb48d8c2ce66573e135 Mon Sep 17 00:00:00 2001 From: rishi Date: Mon, 27 Apr 2020 07:28:04 -0700 Subject: [PATCH 4/7] incorporating code review comments --- .../execution/metric/SQLMetricsSuite.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 88c6d50471cf..9d8729af8439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -372,14 +372,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // Assume the execution plan is // ... -> BroadcastHashJoin(nodeId = 0) Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true), - ("right_outer", 1L, 6L, true)).foreach { - case (joinType, nodeId, numRows, enableWholeStage) => - val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) - testSparkPlanMetrics(df, 2, Map( - nodeId -> (("BroadcastHashJoin", Map( - "number of output rows" -> numRows)))), - enableWholeStage - ) + ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) => + val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 2, Map( + nodeId -> (("BroadcastHashJoin", Map("number of output rows" -> numRows)))), + enableWholeStage + ) } } @@ -590,9 +588,9 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { def checkFilterAndRangeMetrics( - df: DataFrame, - filterNumOutputs: Int, - rangeNumOutputs: Int): Unit = { + df: DataFrame, + filterNumOutputs: Int, + rangeNumOutputs: Int): Unit = { val plan = df.queryExecution.executedPlan val filters = collectNodeWithinWholeStage[FilterExec](plan) From 42d7ec49ee534940a37f4acbc8c1758e466446f1 Mon Sep 17 00:00:00 2001 From: rishi Date: Mon, 27 Apr 2020 07:53:16 -0700 Subject: [PATCH 5/7] incorporating code review comments --- .../apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 9d8729af8439..93e25c9a7be1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -342,8 +342,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } test("ShuffledHashJoin(left,outer) metrics") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") @@ -356,7 +355,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils (1L, "left_anti", rightDf, leftDf, 8L, false), (1L, "left_semi", rightDf, leftDf, 2L, false)) .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => - val df = leftDf.join(rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) + val df = leftDf.hint("shuffle_hash").join( + rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 1, Map( nodeId -> (("ShuffledHashJoin", Map( "number of output rows" -> rows)))), From 9f7f98e4528881797ca1c57ca1944b63fb427d87 Mon Sep 17 00:00:00 2001 From: rishi Date: Tue, 28 Apr 2020 10:47:12 -0700 Subject: [PATCH 6/7] incorporating code review comments --- .../execution/metric/SQLMetricsSuite.scala | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 93e25c9a7be1..d163e5aed7a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -341,41 +341,37 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } - test("ShuffledHashJoin(left,outer) metrics") { - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") - val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") - Seq((0L, "right_outer", leftDf, rightDf, 10L, false), - (0L, "left_outer", rightDf, leftDf, 10L, false), - (0L, "right_outer", leftDf, rightDf, 10L, true), - (0L, "left_outer", rightDf, leftDf, 10L, true), - (2L, "left_anti", rightDf, leftDf, 8L, true), - (2L, "left_semi", rightDf, leftDf, 2L, true), - (1L, "left_anti", rightDf, leftDf, 8L, false), - (1L, "left_semi", rightDf, leftDf, 2L, false)) - .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => - val df = leftDf.hint("shuffle_hash").join( - rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) - testSparkPlanMetrics(df, 1, Map( - nodeId -> (("ShuffledHashJoin", Map( - "number of output rows" -> rows)))), - enableWholeStage - ) - } - } + test("ShuffledHashJoin(left, outer) metrics") { + val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") + val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + Seq((0L, "right_outer", leftDf, rightDf, 10L, false), + (0L, "left_outer", rightDf, leftDf, 10L, false), + (0L, "right_outer", leftDf, rightDf, 10L, true), + (0L, "left_outer", rightDf, leftDf, 10L, true), + (2L, "left_anti", rightDf, leftDf, 8L, true), + (2L, "left_semi", rightDf, leftDf, 2L, true), + (1L, "left_anti", rightDf, leftDf, 8L, false), + (1L, "left_semi", rightDf, leftDf, 2L, false)) + .foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => + val df = leftDf.hint("shuffle_hash").join( + rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> rows)))), + enableWholeStage + ) + } } test("BroadcastHashJoin(outer) metrics") { val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value") val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value") - // Assume the execution plan is - // ... -> BroadcastHashJoin(nodeId = 0) Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) => val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 2, Map( - nodeId -> (("BroadcastHashJoin", Map("number of output rows" -> numRows)))), + nodeId -> (("BroadcastHashJoin", Map( + "number of output rows" -> numRows)))), enableWholeStage ) } @@ -423,8 +419,6 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("BroadcastLeftAntiJoinHash metrics") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value") - // Assume the execution plan is - // ... -> BroadcastHashJoin(nodeId = 1) Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) => val df = df2.join(broadcast(df1), $"key" === $"key2", "left_anti") testSparkPlanMetrics(df, 2, Map( From 830dfbf7d5b612a1cdb61854419ddadad4829c31 Mon Sep 17 00:00:00 2001 From: rishi Date: Wed, 29 Apr 2020 12:46:58 -0700 Subject: [PATCH 7/7] incorporating code review comments --- .../sql/execution/metric/SQLMetricsSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d163e5aed7a3..d4d0dbdfe4f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -390,13 +390,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) .foreach { case (query, enableWholeStage) => - val df = spark.sql(query) - testSparkPlanMetrics(df, 2, Map( - 0L -> (("BroadcastNestedLoopJoin", Map( - "number of output rows" -> 12L)))), - enableWholeStage - ) - } + val df = spark.sql(query) + testSparkPlanMetrics(df, 2, Map( + 0L -> (("BroadcastNestedLoopJoin", Map( + "number of output rows" -> 12L)))), + enableWholeStage + ) + } } } }