-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22977][SQL] fix web UI SQL tab for CTAS #20521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,8 +31,10 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute | ||
| import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat | ||
| import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider | ||
| import org.apache.spark.sql.execution.datasources.json.JsonFileFormat | ||
|
|
@@ -435,10 +437,11 @@ case class DataSource( | |
| } | ||
|
|
||
| /** | ||
| * Writes the given [[LogicalPlan]] out in this [[FileFormat]]. | ||
| * Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]]. | ||
| * The returned command is unresolved and need to be analyzed. | ||
| */ | ||
| private def planForWritingFileFormat( | ||
| format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = { | ||
| format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = { | ||
| // Don't glob path for the write path. The contracts here are: | ||
| // 1. Only one output path can be specified on the write path; | ||
| // 2. Output path must be a legal HDFS style file system path; | ||
|
|
@@ -482,9 +485,24 @@ case class DataSource( | |
| /** | ||
| * Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for | ||
| * the following reading. | ||
| * | ||
| * @param mode The save mode for this writing. | ||
| * @param data The input query plan that produces the data to be written. Note that this plan | ||
| * is analyzed and optimized. | ||
| * @param outputColumns The original output columns of the input query plan. The optimizer may not | ||
| * preserve the output column's names' case, so we need this parameter | ||
| * instead of `data.output`. | ||
| * @param physicalPlan The physical plan of the input query plan. We should run the writing | ||
| * command with this physical plan instead of creating a new physical plan, | ||
| * so that the metrics can be correctly linked to the given physical plan and | ||
| * shown in the web UI. | ||
| */ | ||
| def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = { | ||
| if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { | ||
| def writeAndRead( | ||
| mode: SaveMode, | ||
| data: LogicalPlan, | ||
| outputColumns: Seq[Attribute], | ||
| physicalPlan: SparkPlan): BaseRelation = { | ||
| if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { | ||
| throw new AnalysisException("Cannot save interval data type into external storage.") | ||
| } | ||
|
|
||
|
|
@@ -493,9 +511,23 @@ case class DataSource( | |
| dataSource.createRelation( | ||
| sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) | ||
| case format: FileFormat => | ||
| sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd | ||
| val cmd = planForWritingFileFormat(format, mode, data) | ||
| val resolvedPartCols = cmd.partitionColumns.map { col => | ||
| // The partition columns created in `planForWritingFileFormat` should always be | ||
| // `UnresolvedAttribute` with a single name part. | ||
| assert(col.isInstanceOf[UnresolvedAttribute]) | ||
| val unresolved = col.asInstanceOf[UnresolvedAttribute] | ||
| assert(unresolved.nameParts.length == 1) | ||
| val name = unresolved.nameParts.head | ||
| outputColumns.find(a => equality(a.name, name)).getOrElse { | ||
| throw new AnalysisException( | ||
| s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]") | ||
| } | ||
| } | ||
| val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns) | ||
|
||
| resolved.run(sparkSession, physicalPlan) | ||
| // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring | ||
| copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() | ||
| copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation() | ||
| case _ => | ||
| sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive.execution | |
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} | ||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} | ||
| import org.apache.spark.sql.execution.command.RunnableCommand | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.command.DataWritingCommand | ||
|
|
||
|
|
||
| /** | ||
|
|
@@ -36,15 +37,15 @@ import org.apache.spark.sql.execution.command.RunnableCommand | |
| case class CreateHiveTableAsSelectCommand( | ||
| tableDesc: CatalogTable, | ||
| query: LogicalPlan, | ||
| outputColumns: Seq[Attribute], | ||
| mode: SaveMode) | ||
| extends RunnableCommand { | ||
| extends DataWritingCommand { | ||
|
|
||
| private val tableIdentifier = tableDesc.identifier | ||
|
|
||
| override def innerChildren: Seq[LogicalPlan] = Seq(query) | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { | ||
| override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| if (catalog.tableExists(tableIdentifier)) { | ||
| assert(mode != SaveMode.Overwrite, | ||
| s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") | ||
|
|
||
|
|
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand( | |
| return Seq.empty | ||
| } | ||
|
|
||
| sparkSession.sessionState.executePlan( | ||
| InsertIntoTable( | ||
| UnresolvedRelation(tableIdentifier), | ||
| Map(), | ||
| query, | ||
| overwrite = false, | ||
| ifPartitionNotExists = false)).toRdd | ||
| InsertIntoHiveTable( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan this change from
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it affect web UI SQL tab?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah good catch! I don't think we can revert here, as we need to execute the physical plan given as a parameter. I think we should improve the hive table conversion optimizer rule, and handle CTAS as well.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I see. Thanks. |
||
| tableDesc, | ||
| Map.empty, | ||
| query, | ||
| overwrite = false, | ||
| ifPartitionNotExists = false, | ||
| outputColumns = outputColumns).run(sparkSession, child) | ||
| } else { | ||
| // TODO ideally, we should get the output data ready first and then | ||
| // add the relation into catalog, just in case of failure occurs while data | ||
| // processing. | ||
| assert(tableDesc.schema.isEmpty) | ||
| sparkSession.sessionState.catalog.createTable( | ||
| tableDesc.copy(schema = query.schema), ignoreIfExists = false) | ||
| catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false) | ||
|
|
||
| try { | ||
| sparkSession.sessionState.executePlan( | ||
| InsertIntoTable( | ||
| UnresolvedRelation(tableIdentifier), | ||
| Map(), | ||
| query, | ||
| overwrite = true, | ||
| ifPartitionNotExists = false)).toRdd | ||
| // Read back the metadata of the table which was created just now. | ||
| val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) | ||
| // For CTAS, there is no static partition values to insert. | ||
| val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap | ||
| InsertIntoHiveTable( | ||
| createdTableMeta, | ||
| partition, | ||
| query, | ||
| overwrite = true, | ||
| ifPartitionNotExists = false, | ||
| outputColumns = outputColumns).run(sparkSession, child) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // drop the created table. | ||
| sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, | ||
| purge = false) | ||
| catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false) | ||
| throw e | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto | |
| "src") | ||
| } | ||
|
|
||
| test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kinda a "bad" test. The bug was we optimize the CTAS input query twice, but here we are testing the if the EXPLAIN result of CTAS only contains analyzed query, which is specific to how we fix that bug at that time. |
||
| withTempView("jt") { | ||
| val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS() | ||
| spark.read.json(ds).createOrReplaceTempView("jt") | ||
| val outputs = sql( | ||
| s""" | ||
| |EXPLAIN EXTENDED | ||
| |CREATE TABLE t1 | ||
| |AS | ||
| |SELECT * FROM jt | ||
| """.stripMargin).collect().map(_.mkString).mkString | ||
|
|
||
| val shouldContain = | ||
| "== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" :: | ||
| "== Optimized Logical Plan ==" :: "== Physical Plan ==" :: | ||
| "CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil | ||
| for (key <- shouldContain) { | ||
| assert(outputs.contains(key), s"$key doesn't exist in result") | ||
| } | ||
|
|
||
| val physicalIndex = outputs.indexOf("== Physical Plan ==") | ||
| assert(outputs.substring(physicalIndex).contains("Subquery"), | ||
| "Physical Plan should contain SubqueryAlias since the query should not be optimized") | ||
| } | ||
| } | ||
|
|
||
| test("explain output of physical plan should contain proper codegen stage ID") { | ||
| checkKeywordsExist(sql( | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally I think it's hacky to analyze/optimize/plan/execute a query during the execution of another query. Not only CTAS, other commands like
CreateView,CacheTableetc. also have this issue. This is a surgical fix for Spark 2.3, so I didn't change this part and leave it for 2.4.