-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35531][SQL] Directly pass hive Table to HiveClient when call getPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable #34218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…e table with upper case schema
|
ping @cloud-fan |
|
Kubernetes integration test starting |
| val partColNameMap = buildLowerCasePartColNameMap(catalogTable) | ||
| val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec) | ||
| val res = client.getPartitions(db, table, metaStoreSpec) | ||
| val res = client.getPartitions(catalogTable, metaStoreSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu what's the diff before/after? I couldn't follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu what's the diff before/after? I couldn't follow.
Can refer to #32675 (comment)
|
@AngersZhuuuu, let's also revise the PR title "Fix can not insert into hive bucket table if create table with upper case schema" |
|
Test build #144007 has finished for PR 34218 at commit
|
|
Is this a regression, @AngersZhuuuu ? cc @gengliangwang since SPARK-35531 seems to be opened for 3.2.0. |
|
Kubernetes integration test status failure |
|
@dongjoon-hyun I can reproduce the issue on 3.0.0 and 3.1.1. It's a long-standing bug. |
|
Thank you for checking, @gengliangwang ! |
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Test build #144011 has finished for PR 34218 at commit
|
|
Kubernetes integration test status failure |
|
how about current @cloud-fan |
|
|
||
| val dropString = "DROP TABLE IF EXISTS test1" | ||
|
|
||
| spark.sql(dropString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't sql(dropString) work?
|
Test build #144025 has finished for PR 34218 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Test build #144044 has finished for PR 34218 at commit
|
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #144046 has finished for PR 34218 at commit
|
|
Kubernetes integration test status failure |
|
@AngersZhuuuu can you fix PR title? grammatically it doesn't make sense, and it doesn't really describe what the Pr proposes. The PR fixes Hive client's partition retrieval logic to respect case sensitivity. |
How about current? |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144050 has finished for PR 34218 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144054 has finished for PR 34218 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144060 has finished for PR 34218 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144067 has finished for PR 34218 at commit
|
|
thanks, merging to master! |
…etPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable
In current `HiveexternalCatalog.listpartitions`, it use
```
final def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}
```
It call `geTables` to get a raw HiveTable then convert it to a CatalogTable, in `getPartitions` it re-convert it to a HiveTable.
This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase.
In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also respect case sensitivity.
When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.
if we try to insert into this table, an HiveException reports bucket column is not in table schema.
here is a simple repro
```
spark.sql("""
CREATE TABLE TEST1(
V1 BIGINT,
S1 INT)
PARTITIONED BY (PK BIGINT)
CLUSTERED BY (V1)
SORTED BY (S1)
INTO 200 BUCKETS
STORED AS PARQUET """).show
spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
```
Error message:
```
scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
... 47 elided
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731)
at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222)
at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218)
at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91)
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
... 69 more
```
No
UT
Closes apache#34218 from AngersZhuuuu/SPARK-35531.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In current
HiveexternalCatalog.listpartitions, it useIt call
geTablesto get a raw HiveTable then convert it to a CatalogTable, ingetPartitionsit re-convert it to a HiveTable.This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase.
In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also respect case sensitivity.
Why are the changes needed?
When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.
if we try to insert into this table, an HiveException reports bucket column is not in table schema.
here is a simple repro
Error message:
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT