Skip to content
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

test("ShuffledHashJoin metrics") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: plz avoid unnecesary changes where possible.

val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value")
// Assume the execution plan is
Expand All @@ -325,36 +325,61 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
// +- LocalTableScan(nodeId = 7)
Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
nodeId3 -> (("Exchange", Map(
"shuffle records written" -> 10L,
"records read" -> 10L)))),
enableWholeStage
)
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
nodeId3 -> (("Exchange", Map(
"shuffle records written" -> 10L,
"records read" -> 10L)))),
enableWholeStage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this indentation is correct. I know it is just a small cosmetic change and probably doesn't need to be included in this PR. I will remove it if you think this should not be there.

)
}
}
}

test("ShuffledHashJoin(left,outer) metrics") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (left, outer)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in the latest commit.

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

40 -> -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting it to -1 would trigger SortMergeJoin instead of ShuffledHashJoin based on the rules in Spark Strategies.

Copy link
Member

@maropu maropu Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a hint to control join physical plans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest commit.

SQLConf.SHUFFLE_PARTITIONS.key -> "2",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set this value for the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my latest commit combined three different tests into one. Not setting this parameter will not trigger the correct join type based on the rules in the Spark Strategies file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest commit.

SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value")
val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
Seq((0L, "right_outer", leftDf, rightDf, 10L, false),
(0L, "left_outer", rightDf, leftDf, 10L, false),
(0L, "right_outer", leftDf, rightDf, 10L, true),
(0L, "left_outer", rightDf, leftDf, 10L, true),
(2L, "left_anti", rightDf, leftDf, 8L, true),
(2L, "left_semi", rightDf, leftDf, 2L, true),
(1L, "left_anti", rightDf, leftDf, 8L, false),
(1L, "left_semi", rightDf, leftDf, 2L, false))
.foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) =>
val df = leftDf.join(rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 1, Map(
nodeId -> (("ShuffledHashJoin", Map(
"number of output rows" -> rows)))),
enableWholeStage
)
}
}
}

test("BroadcastHashJoin(outer) metrics") {
val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
// Assume the execution plan is
// ... -> BroadcastHashJoin(nodeId = 0)
Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true),
("right_outer", 1L, 6L, true)).foreach {
case (joinType, nodeId, numRows, enableWholeStage) =>
val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> numRows)))),
enableWholeStage
)
val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> numRows)))),
enableWholeStage
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong indents

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest commit

}
}

Expand All @@ -365,16 +390,19 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
withTempView("testDataForJoin") {
// Assume the execution plan is
// ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " +
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
Seq(false, true).foreach { enableWholeStage =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 2, Map(
0L -> (("BroadcastNestedLoopJoin", Map(
"number of output rows" -> 12L)))),
enableWholeStage
)
}
val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " +
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true))
.foreach { case (query, enableWholeStage) =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 2, Map(
0L -> (("BroadcastNestedLoopJoin", Map(
"number of output rows" -> 12L)))),
enableWholeStage
)
}
Copy link
Member

@maropu maropu Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: format(wrong indents)

        Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true))
          .foreach { case (query, enableWholeStage) =>
          val df = spark.sql(query)
          testSparkPlanMetrics(df, 2, Map(
            0L -> (("BroadcastNestedLoopJoin", Map(
              "number of output rows" -> 12L)))),
            enableWholeStage
          )
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest commit.

}
}
}
Expand All @@ -394,6 +422,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}

test("BroadcastLeftAntiJoinHash metrics") {
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
// Assume the execution plan is
// ... -> BroadcastHashJoin(nodeId = 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need this comment? I think the code below is clear without this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest commit

Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) =>
val df = df2.join(broadcast(df1), $"key" === $"key2", "left_anti")
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> 2L)))),
enableWholeStage
)
}
}

test("CartesianProduct metrics") {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
Expand Down Expand Up @@ -547,9 +590,9 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
def checkFilterAndRangeMetrics(
df: DataFrame,
filterNumOutputs: Int,
rangeNumOutputs: Int): Unit = {
df: DataFrame,
filterNumOutputs: Int,
rangeNumOutputs: Int): Unit = {
val plan = df.queryExecution.executedPlan

val filters = collectNodeWithinWholeStage[FilterExec](plan)
Expand Down