diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 59ffd1638111..053c58e2e14a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -17,9 +17,15 @@ package org.apache.spark.sql.execution +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec} +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -57,12 +63,58 @@ private[execution] object SparkPlanInfo { new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } + def makeOutputMetadata( + path: Option[Path], + outputColumnNames: Seq[String]): Map[String, String] = { + val pathString = path match { + case Some(p) if p != null => p.toString + case _ => "" + } + Map("OutputPath" -> pathString, + "OutputColumnNames" -> outputColumnNames.mkString("[", ", ", "]") + ) + } + + def reflectTable(write: DataWritingCommand, className: String, field: String): CatalogTable = { + val tableField = Utils.classForName(className).getDeclaredField(field) + tableField.setAccessible(true) + tableField.get(write).asInstanceOf[CatalogTable] + } + // dump the file scan metadata (e.g file path) to event log val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata + case DataWritingCommandExec(i: InsertIntoHadoopFsRelationCommand, _) => + makeOutputMetadata(Some(i.outputPath), i.outputColumnNames) + case DataWritingCommandExec(d: DataWritingCommand, _) => + d.getClass.getCanonicalName match { + case CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND => + val table = reflectTable(d, CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case CREATE_HIVE_TABLE_AS_SELECT_COMMAND => + val table = reflectTable(d, CREATE_HIVE_TABLE_AS_SELECT_COMMAND, "tableDesc") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case INSERT_INTO_HIVE_DIR_COMMAND => + val table = reflectTable(d, INSERT_INTO_HIVE_DIR_COMMAND, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case INSERT_INTO_HIVE_TABLE => + val table = reflectTable(d, INSERT_INTO_HIVE_TABLE, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case _ => Map[String, String]() + } case _ => Map[String, String]() } + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metadata, metrics) } + + private val CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND = + classOf[CreateDataSourceTableAsSelectCommand].getCanonicalName + private val CREATE_HIVE_TABLE_AS_SELECT_COMMAND = + "org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand" + private val INSERT_INTO_HIVE_DIR_COMMAND = + "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand" + private val INSERT_INTO_HIVE_TABLE = + "org.apache.spark.sql.hive.execution.InsertIntoHiveTable" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b2e1f530b532..fba428a5e20f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,7 +21,6 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 47ff372992b9..4da58e44ee41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -58,4 +58,13 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty) } } + + test("SPARK-25421 DataWritingCommandExec should contains 'OutputPath' metadata") { + withTable("t") { + sql("CREATE TABLE t(col_I int) USING PARQUET") + val f = sql("INSERT OVERWRITE TABLE t SELECT 1") + assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata + .contains("OutputPath")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index aa573b54a2b6..060dc2916096 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,6 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 0c694910b06d..022f349a34ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -30,7 +30,6 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.client.HiveClientImpl