-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19152][SQL]DataFrameWriter.saveAsTable support hive append #16552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
25b39fa
b463ac7
29e1ee2
429a0ab
21c5e3f
2bf67c7
6b8f625
1145e52
0b9dc3a
2f542ff
cb7a1be
7186938
b167b51
d3c81d4
6c09477
98ec55a
f34ab6d
722ad76
7bf5b50
59db8e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| org.apache.spark.sql.hive.orc.OrcFileFormat | ||
| org.apache.spark.sql.hive.execution.HiveFileFormat | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this auto-generated? |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution | |
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
| import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} | ||
| import org.apache.spark.sql.execution.command.RunnableCommand | ||
|
|
@@ -31,13 +31,12 @@ import org.apache.spark.sql.hive.MetastoreRelation | |
| * | ||
| * @param tableDesc the Table Describe, which may contains serde, storage handler etc. | ||
| * @param query the query whose result will be insert into the new relation | ||
| * @param ignoreIfExists allow continue working if it's already exists, otherwise | ||
| * raise exception | ||
| * @param mode SaveMode | ||
| */ | ||
| case class CreateHiveTableAsSelectCommand( | ||
| tableDesc: CatalogTable, | ||
| query: LogicalPlan, | ||
| ignoreIfExists: Boolean) | ||
| mode: SaveMode) | ||
| extends RunnableCommand { | ||
|
|
||
| private val tableIdentifier = tableDesc.identifier | ||
|
|
@@ -67,7 +66,7 @@ case class CreateHiveTableAsSelectCommand( | |
| withFormat | ||
| } | ||
|
|
||
| sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) | ||
| sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like we don't need to build |
||
|
|
||
| // Get the Metastore Relation | ||
| sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { | ||
|
|
@@ -80,11 +79,18 @@ case class CreateHiveTableAsSelectCommand( | |
| // add the relation into catalog, just in case of failure occurs while data | ||
| // processing. | ||
| if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { | ||
| if (ignoreIfExists) { | ||
| // table already exists, will do nothing, to keep consistent with Hive | ||
| } else { | ||
| assert(mode != SaveMode.Overwrite, | ||
| s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") | ||
|
|
||
| if (mode == SaveMode.ErrorIfExists) { | ||
| throw new AnalysisException(s"$tableIdentifier already exists.") | ||
| } | ||
| if (mode == SaveMode.Ignore) { | ||
| // Since the table already exists and the save mode is Ignore, we will just return. | ||
| return Seq.empty | ||
| } | ||
| sparkSession.sessionState.executePlan(InsertIntoTable( | ||
| metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd | ||
| } else { | ||
| try { | ||
| sparkSession.sessionState.executePlan(InsertIntoTable( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1461,6 +1461,25 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| }) | ||
| } | ||
|
|
||
| test("run sql directly on files - hive") { | ||
| withTable("t") { | ||
|
||
| val df = spark.range(100).toDF("id") | ||
| df.write.format("hive").saveAsTable("t") | ||
| val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) | ||
|
|
||
| var e = intercept[AnalysisException] { | ||
| sql(s"select id from hive.`${table.location}`") | ||
| } | ||
| assert(e.message.contains("Unsupported data source type for direct query on files: hive")) | ||
|
|
||
| // data source type is case insensitive | ||
| e = intercept[AnalysisException] { | ||
| sql(s"select id from HIVE.`${table.location}`") | ||
| } | ||
| assert(e.message.contains("Unsupported data source type for direct query on files: HIVE")) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-8976 Wrong Result for Rollup #1") { | ||
| checkAnswer(sql( | ||
| "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive data source can only be used with tables, you cannot use it with CREATE TEMP VIEW USING