-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-34399][SQL] Add commit duration to SQL tab's graph node. #31522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 33 commits
2ee4c02
3fdda41
cb5873c
0f567f0
d065bcd
669be65
89f8201
9ddd28c
e552a48
6bf5a88
4f40eae
e334a4c
2cc84df
dc78903
8638618
f10f24a
7c87991
160e56d
e69e279
b92f0e4
1cbd31b
559d766
ee2e3cf
5c41947
b91b0ae
cc42403
d3389c6
a4890f2
6d91e25
1192f6f
7b2bb06
8107d20
9b5aa94
b5c9d63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkContext, TaskContext} | |
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.execution.SQLExecution | ||
| import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ | ||
| import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} | ||
| import org.apache.spark.util.SerializableConfiguration | ||
|
|
||
|
|
@@ -48,7 +49,9 @@ case class BasicWriteTaskStats( | |
| /** | ||
| * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. | ||
| */ | ||
| class BasicWriteTaskStatsTracker(hadoopConf: Configuration) | ||
| class BasicWriteTaskStatsTracker( | ||
| hadoopConf: Configuration, | ||
| taskCommitTimeMetric: Option[SQLMetric] = None) | ||
| extends WriteTaskStatsTracker with Logging { | ||
|
|
||
| private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty | ||
|
|
@@ -155,7 +158,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) | |
| numRows += 1 | ||
| } | ||
|
|
||
| override def getFinalStats(): WriteTaskStats = { | ||
| override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { | ||
| submittedFiles.foreach(updateFileStats) | ||
| submittedFiles.clear() | ||
|
|
||
|
|
@@ -170,6 +173,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) | |
| "This could be due to the output format not writing empty files, " + | ||
| "or files being not immediately visible in the filesystem.") | ||
| } | ||
| taskCommitTimeMetric.foreach(_ += taskCommitTime) | ||
| BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) | ||
| } | ||
| } | ||
|
|
@@ -183,14 +187,21 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) | |
| */ | ||
| class BasicWriteJobStatsTracker( | ||
| serializableHadoopConf: SerializableConfiguration, | ||
| @transient val metrics: Map[String, SQLMetric]) | ||
| @transient val driverSideMetrics: Map[String, SQLMetric], | ||
| taskCommitTimeMetric: SQLMetric) | ||
| extends WriteJobStatsTracker { | ||
|
|
||
| def this( | ||
| serializableHadoopConf: SerializableConfiguration, | ||
| metrics: Map[String, SQLMetric]) = { | ||
| this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME)) | ||
| } | ||
|
|
||
| override def newTaskInstance(): WriteTaskStatsTracker = { | ||
| new BasicWriteTaskStatsTracker(serializableHadoopConf.value) | ||
| new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric)) | ||
| } | ||
|
|
||
| override def processStats(stats: Seq[WriteTaskStats]): Unit = { | ||
| override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { | ||
| val sparkContext = SparkContext.getActive.get | ||
| var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty | ||
| var numFiles: Long = 0L | ||
|
|
@@ -206,13 +217,14 @@ class BasicWriteJobStatsTracker( | |
| totalNumOutput += summary.numRows | ||
| } | ||
|
|
||
| metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles) | ||
| metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) | ||
| metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) | ||
| metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size) | ||
| driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime) | ||
| driverSideMetrics(NUM_FILES_KEY).add(numFiles) | ||
| driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) | ||
| driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) | ||
| driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size) | ||
|
|
||
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) | ||
| SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -221,6 +233,8 @@ object BasicWriteJobStatsTracker { | |
| private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes" | ||
| private val NUM_OUTPUT_ROWS_KEY = "numOutputRows" | ||
| private val NUM_PARTS_KEY = "numParts" | ||
| val TASK_COMMIT_TIME = "taskCommitTime" | ||
| val JOB_COMMIT_TIME = "jobCommitTime" | ||
| /** XAttr key of the data length header added in HADOOP-17414. */ | ||
| val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length" | ||
|
|
||
|
|
@@ -230,7 +244,10 @@ object BasicWriteJobStatsTracker { | |
| NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), | ||
| NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), | ||
| NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), | ||
| NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part") | ||
| NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"), | ||
| TASK_COMMIT_TIME -> | ||
| SQLMetrics.createTimingMetric(sparkContext, "time of committing the tasks"), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make it
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "time of committing the job") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make it
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| ) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,13 +22,17 @@ import java.io.File | |
| import scala.reflect.{classTag, ClassTag} | ||
| import scala.util.Random | ||
|
|
||
| import org.apache.hadoop.mapreduce.TaskAttemptContext | ||
|
|
||
| import org.apache.spark.internal.io.FileCommitProtocol | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} | ||
| import org.apache.spark.sql.catalyst.plans.logical.LocalRelation | ||
| import org.apache.spark.sql.execution._ | ||
| import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite | ||
| import org.apache.spark.sql.execution.aggregate.HashAggregateExec | ||
| import org.apache.spark.sql.execution.command.DataWritingCommandExec | ||
| import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, SQLHadoopMapReduceCommitProtocol} | ||
| import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} | ||
| import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec | ||
| import org.apache.spark.sql.functions._ | ||
|
|
@@ -788,6 +792,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils | |
| } | ||
| } | ||
|
|
||
| test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just happened to see the flakiness of the PR here: https://github.com/apache/spark/runs/4378978842 Seems not super flaky though. I am noting it here in case other people see this failure more. |
||
| withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> | ||
| "org.apache.spark.sql.execution.metric.CustomFileCommitProtocol") { | ||
| withTable("t", "t2") { | ||
| sql("CREATE TABLE t(id STRING) USING PARQUET") | ||
| val df = sql("INSERT INTO TABLE t SELECT 'abc'") | ||
| val insert = df.queryExecution.executedPlan.collect { | ||
| case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd | ||
| } | ||
| assert(insert.size == 1) | ||
| assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME)) | ||
| assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME)) | ||
| assert(insert.head.metrics(BasicWriteJobStatsTracker.JOB_COMMIT_TIME).value > 0) | ||
| assert(insert.head.metrics(BasicWriteJobStatsTracker.TASK_COMMIT_TIME).value > 0) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-34567: Add metrics for CTAS operator") { | ||
| withTable("t") { | ||
| val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a") | ||
|
|
@@ -805,3 +827,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils | |
| } | ||
| } | ||
| } | ||
|
|
||
| case class CustomFileCommitProtocol( | ||
| jobId: String, | ||
| path: String, | ||
| dynamicPartitionOverwrite: Boolean = false) | ||
| extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { | ||
| override def commitTask( | ||
| taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = { | ||
| Thread.sleep(Random.nextInt(100)) | ||
| super.commitTask(taskContext) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.