Skip to content

Commit

Permalink
[SPARK] Add SQL metrics for ConvertToDelta (delta-io#3841)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Add the metric "numConvertedFiles" to the ConvertToDelta command node

## How was this patch tested?

Added a new test validating the values

## Does this PR introduce _any_ user-facing changes?

The metric become visible in the plan (i.e. Spark UI)
  • Loading branch information
olaky authored Nov 15, 2024
1 parent 6ff1cbf commit 3a98b8a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.util._
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkContext
import org.apache.spark.internal.MDC
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, V1Table}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -76,6 +78,11 @@ abstract class ConvertToDeltaCommandBase(
protected lazy val icebergEnabled: Boolean =
conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_ENABLED)

override lazy val metrics: Map[String, SQLMetric] = Map (
"numConvertedFiles" ->
SQLMetrics.createMetric(SparkContext.getOrCreate(), "number of files converted")
)

protected def isParquetPathProvider(provider: String): Boolean =
provider.equalsIgnoreCase("parquet")

Expand Down Expand Up @@ -380,16 +387,18 @@ abstract class ConvertToDeltaCommandBase(

val numFiles = targetTable.numFiles
val addFilesIter = createDeltaActions(spark, manifest, partitionFields, txn, fs)
val metrics = Map[String, String](
val transactionMetrics = Map[String, String](
"numConvertedFiles" -> numFiles.toString
)
metrics("numConvertedFiles") += numFiles
sendDriverMetrics(spark, metrics)
val (committedVersion, postCommitSnapshot) = txn.commitLarge(
spark,
addFilesIter,
Some(txn.protocol),
getOperation(numFiles, convertProperties, targetTable.format),
getContext,
metrics)
transactionMetrics)
} finally {
manifest.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.functions.{col, from_json}

trait ConvertToDeltaSQLSuiteBase extends ConvertToDeltaSuiteBaseCommons
Expand Down Expand Up @@ -58,6 +59,23 @@ trait ConvertToDeltaSQLSuiteBase extends ConvertToDeltaSuiteBaseCommons
}
}

for (numFiles <- Seq(1, 7)) {
test(s"numConvertedFiles metric ($numFiles files)") {
val testTableName = "test_table"
withTable(testTableName) {
spark.range(end = numFiles).toDF("part").withColumn("data", col("part"))
.write.partitionBy("part").mode("overwrite").format("parquet").saveAsTable(testTableName)

val plans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
convertToDelta(testTableName, Some("part long"))
}

// Validate that the command node has the correct metrics.
val commandNode = plans.collect { case exe: ExecutedCommandExec => exe.cmd }.head
assert(commandNode.metrics("numConvertedFiles").value === numFiles)
}
}
}
}

class ConvertToDeltaSQLSuite extends ConvertToDeltaSQLSuiteBase
Expand Down

0 comments on commit 3a98b8a

Please sign in to comment.