Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.command

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration

/**
Expand Down Expand Up @@ -73,4 +74,26 @@ object DataWritingCommand {
attr.withName(outputName)
}
}

/**
* When execute CTAS operators, Spark will use [[InsertIntoHadoopFsRelationCommand]]
* or [[InsertIntoHiveTable]] command to write data, they both inherit metrics from
* [[DataWritingCommand]], but after running [[InsertIntoHadoopFsRelationCommand]]
* or [[InsertIntoHiveTable]], we only update metrics in these two command through
* [[BasicWriteJobStatsTracker]], we also need to propogate metrics to the command
* that actually calls [[InsertIntoHadoopFsRelationCommand]] or [[InsertIntoHiveTable]].
*
* @param sparkContext Current SparkContext.
* @param command Command to execute writing data.
* @param metrics Metrics of real DataWritingCommand.
*/
def propogateMetrics(
sparkContext: SparkContext,
command: DataWritingCommand,
metrics: Map[String, SQLMetric]): Unit = {
command.metrics.foreach { case (key, metric) => metrics(key).set(metric.value) }
SQLMetrics.postDriverMetricUpdates(sparkContext,
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)

try {
dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan)
dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan, metrics)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -518,7 +519,8 @@ case class DataSource(
mode: SaveMode,
data: LogicalPlan,
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation = {
physicalPlan: SparkPlan,
metrics: Map[String, SQLMetric]): BaseRelation = {
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
Expand Down Expand Up @@ -546,6 +548,7 @@ case class DataSource(
partitionColumns = resolvedPartCols,
outputColumnNames = outputColumnNames)
resolved.run(sparkSession, physicalPlan)
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, resolved, metrics)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -782,4 +783,20 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
}

test("SPARK-34567: Add metrics for CTAS operator") {
withTable("t") {
val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
val dataWritingCommandExec =
df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
dataWritingCommandExec.executeCollect()
val createTableAsSelect = dataWritingCommandExec.cmd
assert(createTableAsSelect.metrics.contains("numFiles"))
assert(createTableAsSelect.metrics("numFiles").value == 1)
assert(createTableAsSelect.metrics.contains("numOutputBytes"))
assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
assert(createTableAsSelect.metrics.contains("numOutputRows"))
assert(createTableAsSelect.metrics("numOutputRows").value == 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {

val command = getWritingCommand(catalog, tableDesc, tableExists = true)
command.run(sparkSession, child)
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
Expand All @@ -69,6 +70,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
command.run(sparkSession, child)
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics)
} catch {
case NonFatal(e) =>
// drop the created table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton

// Disable AQE because metric info is different with AQE on/off
Expand All @@ -34,4 +36,29 @@ class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton
testMetricsDynamicPartition("hive", "hive", "t1")
}
}

test("SPARK-34567: Add metrics for CTAS operator") {
Seq(false, true).foreach { canOptimized =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> canOptimized.toString) {
withTable("t") {
val df = sql(s"CREATE TABLE t STORED AS PARQUET AS SELECT 1 as a")
val dataWritingCommandExec =
df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
dataWritingCommandExec.executeCollect()
val createTableAsSelect = dataWritingCommandExec.cmd
if (canOptimized) {
assert(createTableAsSelect.isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
} else {
assert(createTableAsSelect.isInstanceOf[CreateHiveTableAsSelectCommand])
}
assert(createTableAsSelect.metrics.contains("numFiles"))
assert(createTableAsSelect.metrics("numFiles").value == 1)
assert(createTableAsSelect.metrics.contains("numOutputBytes"))
assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
assert(createTableAsSelect.metrics.contains("numOutputRows"))
assert(createTableAsSelect.metrics("numOutputRows").value == 1)
}
}
}
}
}