Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2ee4c02
Update FileBatchWrite.scala
AngersZhuuuu Feb 7, 2021
3fdda41
update
AngersZhuuuu Feb 7, 2021
cb5873c
Update FileBatchWrite.scala
AngersZhuuuu Feb 8, 2021
0f567f0
update
AngersZhuuuu Feb 8, 2021
d065bcd
Merge branch 'master' into SPARK-34399
AngersZhuuuu Feb 8, 2021
669be65
Merge branch 'master' into SPARK-34399
AngersZhuuuu Feb 9, 2021
89f8201
Merge branch 'master' into SPARK-34399
AngersZhuuuu Feb 9, 2021
9ddd28c
Merge branch 'master' into SPARK-34399
AngersZhuuuu Feb 20, 2021
e552a48
follow comment
AngersZhuuuu Feb 23, 2021
6bf5a88
Merge branch 'SPARK-34399' of https://github.com/AngersZhuuuu/spark i…
AngersZhuuuu Feb 23, 2021
4f40eae
Update BasicWriteStatsTracker.scala
AngersZhuuuu Feb 23, 2021
e334a4c
follow comment
AngersZhuuuu Feb 23, 2021
2cc84df
Add UT
AngersZhuuuu Feb 24, 2021
dc78903
Merge branch 'master' into SPARK-34399
AngersZhuuuu Mar 4, 2021
8638618
Merge branch 'master' into SPARK-34399
AngersZhuuuu Jul 14, 2021
f10f24a
update
AngersZhuuuu Jul 14, 2021
7c87991
Merge branch 'SPARK-34399' of https://github.com/AngersZhuuuu/spark i…
AngersZhuuuu Jul 15, 2021
160e56d
update
AngersZhuuuu Jul 15, 2021
e69e279
Update BasicWriteStatsTracker.scala
AngersZhuuuu Jul 15, 2021
b92f0e4
Update SQLMetricsSuite.scala
AngersZhuuuu Jul 15, 2021
1cbd31b
Update FileFormatWriter.scala
AngersZhuuuu Jul 15, 2021
559d766
update
AngersZhuuuu Jul 16, 2021
ee2e3cf
update
AngersZhuuuu Jul 17, 2021
5c41947
Update SQLMetricsSuite.scala
AngersZhuuuu Jul 17, 2021
b91b0ae
follow comment
AngersZhuuuu Jul 19, 2021
cc42403
Update BasicWriteStatsTracker.scala
AngersZhuuuu Jul 19, 2021
d3389c6
follow comment
AngersZhuuuu Jul 20, 2021
a4890f2
fix scalatype
AngersZhuuuu Jul 20, 2021
6d91e25
follow comment
AngersZhuuuu Jul 20, 2021
1192f6f
Update BasicWriteStatsTracker.scala
AngersZhuuuu Jul 20, 2021
7b2bb06
Update SQLMetricsSuite.scala
AngersZhuuuu Jul 20, 2021
8107d20
Update SQLMetricsSuite.scala
AngersZhuuuu Jul 21, 2021
9b5aa94
Update SQLMetricsSuite.scala
AngersZhuuuu Jul 21, 2021
b5c9d63
Update BasicWriteStatsTracker.scala
AngersZhuuuu Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -48,7 +49,9 @@ case class BasicWriteTaskStats(
/**
* Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]].
*/
class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
class BasicWriteTaskStatsTracker(
hadoopConf: Configuration,
taskCommitTimeMetric: Option[SQLMetric] = None)
extends WriteTaskStatsTracker with Logging {

private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty
Expand Down Expand Up @@ -155,7 +158,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
numRows += 1
}

override def getFinalStats(): WriteTaskStats = {
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()

Expand All @@ -170,6 +173,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
taskCommitTimeMetric.foreach(_ += taskCommitTime)
BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows)
}
}
Expand All @@ -183,14 +187,21 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
*/
class BasicWriteJobStatsTracker(
serializableHadoopConf: SerializableConfiguration,
@transient val metrics: Map[String, SQLMetric])
@transient val driverSideMetrics: Map[String, SQLMetric],
taskCommitTimeMetric: SQLMetric)
extends WriteJobStatsTracker {

def this(
serializableHadoopConf: SerializableConfiguration,
metrics: Map[String, SQLMetric]) = {
this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME))
}

override def newTaskInstance(): WriteTaskStatsTracker = {
new BasicWriteTaskStatsTracker(serializableHadoopConf.value)
new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric))
}

override def processStats(stats: Seq[WriteTaskStats]): Unit = {
override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {
val sparkContext = SparkContext.getActive.get
var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty
var numFiles: Long = 0L
Expand All @@ -206,13 +217,14 @@ class BasicWriteJobStatsTracker(
totalNumOutput += summary.numRows
}

metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size)
driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime)
driverSideMetrics(NUM_FILES_KEY).add(numFiles)
driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList)
}
}

Expand All @@ -221,6 +233,8 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
val TASK_COMMIT_TIME = "taskCommitTime"
val JOB_COMMIT_TIME = "jobCommitTime"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"

Expand All @@ -230,7 +244,9 @@ object BasicWriteJobStatsTracker {
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"),
TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"),
JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* Abstract class for writing out data in a single Spark task.
Expand Down Expand Up @@ -92,10 +92,13 @@ abstract class FileFormatDataWriter(
*/
override def commit(): WriteTaskResult = {
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
committer.commitTask(taskAttemptContext)
}
val summary = ExecutedWriteSummary(
updatedPartitions = updatedPartitions.toSet,
stats = statsTrackers.map(_.getFinalStats()))
WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
WriteTaskResult(taskCommitMessage, summary)
}

def abort(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ object FileFormatWriter extends Logging {
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")

processStats(description.statsTrackers, ret.map(_.summary.stats))
processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
logInfo(s"Finished processing stats for write job ${description.uuid}.")

// return a set of all the partition paths that were updated during this job
Expand Down Expand Up @@ -302,13 +302,8 @@ object FileFormatWriter extends Logging {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
val taskAttemptID = taskAttemptContext.getTaskAttemptID
val (res, timeCost) = Utils.timeTakenMs {
logDebug("$taskAttemptID starts to write and commit.")
dataWriter.writeWithIterator(iterator)
dataWriter.commit()
}
logInfo(s"$taskAttemptID finished to write and commit. Elapsed time: $timeCost ms.")
res
dataWriter.writeWithIterator(iterator)
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
Expand All @@ -334,7 +329,8 @@ object FileFormatWriter extends Logging {
*/
private[datasources] def processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]])
statsPerTask: Seq[Seq[WriteTaskStats]],
jobCommitDuration: Long)
: Unit = {

val numStatsTrackers = statsTrackers.length
Expand All @@ -351,7 +347,7 @@ object FileFormatWriter extends Logging {
}

statsTrackers.zip(statsPerTracker).foreach {
case (statsTracker, stats) => statsTracker.processStats(stats)
case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ trait WriteTaskStatsTracker {

/**
* Returns the final statistics computed so far.
* @param taskCommitTime Time of committing the task.
* @note This may only be called once. Further use of the object may lead to undefined behavior.
* @return An object of subtype of [[WriteTaskStats]], to be sent to the driver.
*/
def getFinalStats(): WriteTaskStats
def getFinalStats(taskCommitTime: Long): WriteTaskStats
}


Expand All @@ -93,6 +94,7 @@ trait WriteJobStatsTracker extends Serializable {
* Process the given collection of stats computed during this job.
* E.g. aggregate them, write them to memory / disk, issue warnings, whatever.
* @param stats One [[WriteTaskStats]] object from each successful write task.
* @param jobCommitTime Time of committing the job.
* @note The type of @param `stats` is too generic. These classes should probably be parametrized:
* WriteTaskStatsTracker[S <: WriteTaskStats]
* WriteJobStatsTracker[S <: WriteTaskStats, T <: WriteTaskStatsTracker[S]]
Expand All @@ -103,5 +105,5 @@ trait WriteJobStatsTracker extends Serializable {
* to the expected derived type when implementing this method in a derived class.
* The framework will make sure to call this with the right arguments.
*/
def processStats(stats: Seq[WriteTaskStats]): Unit
def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FileBatchWrite(
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")

processStats(description.statsTrackers, results.map(_.summary.stats))
processStats(description.statsTrackers, results.map(_.summary.stats), duration)
logInfo(s"Finished processing stats for write job ${description.uuid}.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
}

private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = {
tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
tracker.getFinalStats(0L).asInstanceOf[BasicWriteTaskStats]
}

test("No files in run") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
class CustomWriteTaskStatsTrackerSuite extends SparkFunSuite {

def checkFinalStats(tracker: CustomWriteTaskStatsTracker, result: Map[String, Int]): Unit = {
assert(tracker.getFinalStats().asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
assert(tracker.getFinalStats(0L).asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
}

test("sequential file writing") {
Expand Down Expand Up @@ -64,7 +64,7 @@ class CustomWriteTaskStatsTracker extends WriteTaskStatsTracker {
numRowsPerFile(filePath) += 1
}

override def getFinalStats(): WriteTaskStats = {
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
CustomWriteTaskStats(numRowsPerFile.toMap)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ import java.io.File
import scala.reflect.{classTag, ClassTag}
import scala.util.Random

import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, SQLHadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -788,6 +792,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}

test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") {
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just happened to see the flakiness of the PR here: https://github.com/apache/spark/runs/4378978842

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 was not greater than 0
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$87(SQLMetricsSuite.scala:810)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:305)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:303)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.withTable(SQLMetricsSuite.scala:44)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$86(SQLMetricsSuite.scala:800)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$85(SQLMetricsSuite.scala:800)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$5(AdaptiveTestUtils.scala:65)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244)
	at org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44)
	at org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$4(AdaptiveTestUtils.scala:65)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)

Seems not super flaky though. I am noting it here in case other people see this failure more.

withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
"org.apache.spark.sql.execution.metric.CustomFileCommitProtocol") {
withTable("t", "t2") {
sql("CREATE TABLE t(id STRING) USING PARQUET")
val df = sql("INSERT INTO TABLE t SELECT 'abc'")
val insert = df.queryExecution.executedPlan.collect {
case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd
}
assert(insert.size == 1)
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME))
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME))
assert(insert.head.metrics(BasicWriteJobStatsTracker.JOB_COMMIT_TIME).value > 0)
assert(insert.head.metrics(BasicWriteJobStatsTracker.TASK_COMMIT_TIME).value > 0)
}
}
}

test("SPARK-34567: Add metrics for CTAS operator") {
withTable("t") {
val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
Expand All @@ -805,3 +827,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
}

case class CustomFileCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {
override def commitTask(
taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = {
Thread.sleep(Random.nextInt(100))
super.commitTask(taskContext)
}
}