-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32629][SQL] Track metrics of BitSet/OpenHashSet in full outer SHJ #29566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.joins | |
|
|
||
| import java.util.concurrent.TimeUnit._ | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.TaskContext | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
|
|
@@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans._ | |
| import org.apache.spark.sql.catalyst.plans.physical._ | ||
| import org.apache.spark.sql.execution.{RowIterator, SparkPlan} | ||
| import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} | ||
| import org.apache.spark.util.collection.BitSet | ||
| import org.apache.spark.util.collection.{BitSet, OpenHashSet} | ||
|
|
||
| /** | ||
| * Performs a hash join of two child relations by first shuffling the data using the join keys. | ||
|
|
@@ -136,10 +134,10 @@ case class ShuffledHashJoinExec( | |
| * Full outer shuffled hash join with unique join keys: | ||
| * 1. Process rows from stream side by looking up hash relation. | ||
| * Mark the matched rows from build side be looked up. | ||
| * A `BitSet` is used to track matched rows with key index. | ||
| * A [[BitSet]] is used to track matched rows with key index. | ||
|
||
| * 2. Process rows from build side by iterating hash relation. | ||
| * Filter out rows from build side being matched already, | ||
| * by checking key index from `BitSet`. | ||
| * by checking key index from [[BitSet]]. | ||
| */ | ||
| private def fullOuterJoinWithUniqueKey( | ||
| streamIter: Iterator[InternalRow], | ||
|
|
@@ -150,9 +148,9 @@ case class ShuffledHashJoinExec( | |
| streamNullJoinRowWithBuild: => InternalRow => JoinedRow, | ||
| buildNullRow: GenericInternalRow, | ||
| streamNullRow: GenericInternalRow): Iterator[InternalRow] = { | ||
| // TODO(SPARK-32629):record metrics of extra BitSet/HashSet | ||
| // in full outer shuffled hash join | ||
| val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) | ||
| val buildDataSize = longMetric("buildDataSize") | ||
| buildDataSize += matchedKeys.capacity / 8 | ||
|
||
|
|
||
| // Process stream side with looking up hash relation | ||
| val streamResultIter = streamIter.map { srow => | ||
|
|
@@ -198,11 +196,11 @@ case class ShuffledHashJoinExec( | |
| * Full outer shuffled hash join with non-unique join keys: | ||
| * 1. Process rows from stream side by looking up hash relation. | ||
| * Mark the matched rows from build side be looked up. | ||
| * A `HashSet[Long]` is used to track matched rows with | ||
| * A [[OpenHashSet]] (Long) is used to track matched rows with | ||
| * key index (Int) and value index (Int) together. | ||
| * 2. Process rows from build side by iterating hash relation. | ||
| * Filter out rows from build side being matched already, | ||
| * by checking key index and value index from `HashSet`. | ||
| * by checking key index and value index from [[OpenHashSet]]. | ||
| * | ||
| * The "value index" is defined as the index of the tuple in the chain | ||
| * of tuples having the same key. For example, if certain key is found thrice, | ||
|
|
@@ -218,9 +216,16 @@ case class ShuffledHashJoinExec( | |
| streamNullJoinRowWithBuild: => InternalRow => JoinedRow, | ||
| buildNullRow: GenericInternalRow, | ||
| streamNullRow: GenericInternalRow): Iterator[InternalRow] = { | ||
| // TODO(SPARK-32629):record metrics of extra BitSet/HashSet | ||
| // in full outer shuffled hash join | ||
| val matchedRows = new mutable.HashSet[Long] | ||
| val matchedRows = new OpenHashSet[Long] | ||
| TaskContext.get().addTaskCompletionListener[Unit](_ => { | ||
| // At the end of the task, update the task's memory usage for this | ||
| // [[OpenHashSet]] to track matched rows, which has two parts: | ||
| // [[OpenHashSet._bitset]] and [[OpenHashSet._data]]. | ||
| val buildDataSize = longMetric("buildDataSize") | ||
| val bitSetEstimatedSize = matchedRows.getBitSet.capacity / 8 | ||
| val dataEstimatedSize = matchedRows.capacity * 8 | ||
| buildDataSize += bitSetEstimatedSize + dataEstimatedSize | ||
| }) | ||
|
|
||
| def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = { | ||
| val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ import org.apache.spark.TestUtils | |
| import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} | ||
| import org.apache.spark.sql.DataFrame | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.execution.SparkPlanInfo | ||
| import org.apache.spark.sql.execution.{SparkPlan, SparkPlanInfo} | ||
| import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore} | ||
| import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED | ||
| import org.apache.spark.sql.test.SQLTestUtils | ||
|
|
@@ -254,6 +254,24 @@ trait SQLMetricsTestUtils extends SQLTestUtils { | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Verify if the metrics in `SparkPlan` operator are same as expected metrics. | ||
| * | ||
| * @param plan `SparkPlan` operator to check metrics | ||
| * @param expectedMetrics the expected metrics. The format is `metric name -> metric value`. | ||
| */ | ||
| protected def testMetricsInSparkPlanOperator( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to put this func here instead of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @maropu - I am following the convention like other method e.g. |
||
| plan: SparkPlan, | ||
| expectedMetrics: Map[String, Long]): Unit = { | ||
| expectedMetrics.foreach { case (metricName: String, metricValue: Long) => | ||
| assert(plan.metrics.contains(metricName), s"The query plan should have metric $metricName") | ||
| val actualMetric = plan.metrics(metricName) | ||
| assert(actualMetric.value == metricValue, | ||
| s"The query plan metric $metricName did not match, " + | ||
| s"expected:$metricValue, actual:${actualMetric.value}") | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just say
A bit set is ...to be generic.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated.