-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19152][SQL]DataFrameWriter.saveAsTable support hive append #16552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
25b39fa
b463ac7
29e1ee2
429a0ab
21c5e3f
2bf67c7
6b8f625
1145e52
0b9dc3a
2f542ff
cb7a1be
7186938
b167b51
d3c81d4
6c09477
98ec55a
f34ab6d
722ad76
7bf5b50
59db8e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| org.apache.spark.sql.hive.orc.OrcFileFormat | ||
| org.apache.spark.sql.hive.execution.HiveFileFormat | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution | |
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
| import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} | ||
| import org.apache.spark.sql.execution.command.RunnableCommand | ||
|
|
@@ -31,13 +31,12 @@ import org.apache.spark.sql.hive.MetastoreRelation | |
| * | ||
| * @param tableDesc the Table Describe, which may contains serde, storage handler etc. | ||
| * @param query the query whose result will be insert into the new relation | ||
| * @param ignoreIfExists allow continue working if it's already exists, otherwise | ||
| * raise exception | ||
| * @param mode SaveMode | ||
| */ | ||
| case class CreateHiveTableAsSelectCommand( | ||
| tableDesc: CatalogTable, | ||
| query: LogicalPlan, | ||
| ignoreIfExists: Boolean) | ||
| mode: SaveMode) | ||
| extends RunnableCommand { | ||
|
|
||
| private val tableIdentifier = tableDesc.identifier | ||
|
|
@@ -67,7 +66,7 @@ case class CreateHiveTableAsSelectCommand( | |
| withFormat | ||
| } | ||
|
|
||
| sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) | ||
| sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like we don't need to build |
||
|
|
||
| // Get the Metastore Relation | ||
| sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { | ||
|
|
@@ -80,11 +79,18 @@ case class CreateHiveTableAsSelectCommand( | |
| // add the relation into catalog, just in case of failure occurs while data | ||
| // processing. | ||
| if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { | ||
| if (ignoreIfExists) { | ||
| // table already exists, will do nothing, to keep consistent with Hive | ||
| } else { | ||
| assert(mode != SaveMode.Overwrite, | ||
| s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") | ||
|
|
||
| if (mode == SaveMode.ErrorIfExists) { | ||
| throw new AnalysisException(s"$tableIdentifier already exists.") | ||
| } | ||
| if (mode == SaveMode.Ignore) { | ||
| // Since the table already exists and the save mode is Ignore, we will just return. | ||
| return Seq.empty | ||
| } | ||
| sparkSession.sessionState.executePlan(InsertIntoTable( | ||
| metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd | ||
| } else { | ||
| try { | ||
| sparkSession.sessionState.executePlan(InsertIntoTable( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -419,12 +419,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv | |
| sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value") | ||
|
|
||
| val df = sql(s"SELECT key, value FROM $tableName") | ||
| val e = intercept[AnalysisException] { | ||
| df.write.mode(SaveMode.Append).saveAsTable(tableName) | ||
| }.getMessage | ||
| assert(e.contains("Saving data in the Hive serde table default.tab1 is not supported " + | ||
| "yet. Please use the insertInto() API as an alternative.")) | ||
|
|
||
| df.write.insertInto(tableName) | ||
| checkAnswer( | ||
| sql(s"SELECT * FROM $tableName"), | ||
|
|
@@ -1167,16 +1161,15 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv | |
|
|
||
| test("create a temp view using hive") { | ||
| val tableName = "tab1" | ||
| withTable (tableName) { | ||
| val e = intercept[ClassNotFoundException] { | ||
| sql( | ||
| s""" | ||
| |CREATE TEMPORARY VIEW $tableName | ||
| |(col1 int) | ||
| |USING hive | ||
| """.stripMargin) | ||
| }.getMessage | ||
| assert(e.contains("Failed to find data source: hive")) | ||
| withTable(tableName) { | ||
| sql( | ||
| s""" | ||
| |CREATE TEMPORARY VIEW $tableName | ||
| |(col1 int) | ||
| |USING hive | ||
| """.stripMargin) | ||
| val tempView = spark.sessionState.catalog.getTempView(tableName) | ||
| assert(tempView.isDefined, "create a temp view using hive should success") | ||
|
||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this auto-generated?