Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,23 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
throw new AnalysisException("Saving data into a view is not allowed.")
}

if (DDLUtils.isHiveTable(existingTable)) {
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
"not supported yet. Please use the insertInto() API as an alternative.")
val (isProviderMatch, existingProvider, specifiedProvider) =
DDLUtils.isHiveTable(existingTable) match {
case false =>
val existing = DataSource.lookupDataSource(existingTable.provider.get)
val specified = DataSource.lookupDataSource(tableDesc.provider.get)
(existing == specified, existing.getSimpleName, specified.getSimpleName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this line?

case true =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have HiveFileFormat, and we can make it implement DataSourceRegister, then DataSource.lookupDataSource("hive") can work.

val existing = existingTable.provider.get
val specified = tableDesc.provider.get
(existing == specified, existing, specified)
}

// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
if (!isProviderMatch) {
throw new AnalysisException(s"The format of the existing table $tableName is " +
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
s"`${specifiedProvider.getSimpleName}`.")
s"`$existingProvider`. It doesn't match the specified format `$specifiedProvider`.")
}

if (analyzedQuery.schema.length != existingTable.schema.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,11 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)

case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
// Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
// tables yet.
if (mode == SaveMode.Append) {
throw new AnalysisException(
"CTAS for hive serde tables does not support append semantics.")
}

val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
CreateHiveTableAsSelectCommand(
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query,
mode == SaveMode.Ignore)
mode)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.command.RunnableCommand
Expand All @@ -31,13 +31,12 @@ import org.apache.spark.sql.hive.MetastoreRelation
*
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param ignoreIfExists allow continue working if it's already exists, otherwise
* raise exception
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
ignoreIfExists: Boolean)
mode: SaveMode)
extends RunnableCommand {

private val tableIdentifier = tableDesc.identifier
Expand Down Expand Up @@ -69,7 +68,7 @@ case class CreateHiveTableAsSelectCommand(
withFormat
}

sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we don't need to build withSchema anymore, the schema will be set in AnalyzeCreateTable


// Get the Metastore Relation
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
Expand All @@ -82,11 +81,18 @@ case class CreateHiveTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (ignoreIfExists) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")

if (mode == SaveMode.ErrorIfExists) {
throw new AnalysisException(s"$tableIdentifier already exists.")
}
if (mode == SaveMode.Ignore) {
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty
}
sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
} else {
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value")

val df = sql(s"SELECT key, value FROM $tableName")
val e = intercept[AnalysisException] {
df.write.mode(SaveMode.Append).saveAsTable(tableName)
}.getMessage
assert(e.contains("Saving data in the Hive serde table default.tab1 is not supported " +
"yet. Please use the insertInto() API as an alternative."))

df.write.insertInto(tableName)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,15 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)

Seq(10 -> "y").toDF("i", "j")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a new test, to append to a hive table, also test append to a data source table with hive provider and check the error message

.write.format("hive").mode("append").saveAsTable("t")
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Row(10, "y") :: Nil)

Seq("y" -> 10).toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t")
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b")
:: Row(10, "y") :: Row(null, "10") :: Nil)

val e = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
}
Expand Down