Skip to content

Conversation

@opensky142857
Copy link

@opensky142857 opensky142857 commented May 26, 2021

What changes were proposed in this pull request?

when convert to HiveTable, respect table schema cases.

Why are the changes needed?

When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.

if we try to insert into this table, an HiveException reports bucket column is not in table schema.

here is a simple repro

spark.sql("""
  CREATE TABLE TEST1(
    V1 BIGINT,
    S1 INT)
  PARTITIONED BY (PK BIGINT)
  CLUSTERED BY (V1)
  SORTED BY (S1)
  INTO 200 BUCKETS
  STORED AS PARQUET """).show

spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show

Error message:

scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
  at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
  ... 47 elided
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
  at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
  ... 69 more

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@opensky142857 opensky142857 changed the title [SPARK-35531]Can not insert into hive bucket table if create table wi… [SPARK-35531][SQL]Can not insert into hive bucket table if create table wi… May 26, 2021
@HyukjinKwon HyukjinKwon changed the title [SPARK-35531][SQL]Can not insert into hive bucket table if create table wi… [SPARK-35531][SQL] Can not insert into hive bucket table if create table with upper case schema May 26, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions github-actions bot added the SQL label May 26, 2021
@wangyum
Copy link
Member

wangyum commented May 28, 2021

@cloud-fan
Copy link
Contributor

Can you post the full stacktrace? I'm a bit curious about how/where the error happens.

@wangyum
Copy link
Member

wangyum commented May 31, 2021

@cloud-fan I have added the stacktrace to PR description.

table.bucketSpec match {
case Some(bucketSpec) if !HiveExternalCatalog.isDatasourceTable(table) =>
hiveTable.setNumBuckets(bucketSpec.numBuckets)
hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: hiveTable.setFields lower-cases the column names, but hiveTable.setBucketCols does not. And this causes the exception?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@cloud-fan
Copy link
Contributor

the table schema will be stored as lower cases while bucket column info will stay the same with user input.

I'm not sure this is true. Table schema and bucketed columns are both stored in the Spark-specific table properties which are case-preserving.

@opensky142857
Copy link
Author

the table schema will be stored as lower cases while bucket column info will stay the same with user input.

I'm not sure this is true. Table schema and bucketed columns are both stored in the Spark-specific table properties which are case-preserving.

the schema here comes from metastore when we call methods like 'getRawTableOption' and the Spark schema info in properties don't overwrite the schema field in those cases.

if (bucketSpec.sortColumnNames.nonEmpty) {
hiveTable.setSortCols(
bucketSpec.sortColumnNames
restoreHiveBucketSpecColNames(table.schema, bucketSpec.sortColumnNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I still can't understand how the bug happens.

In this toHiveTable method, the input CatalogTable should guarantee sanity: the partition/bucket column names should match the schema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org/apache/spark/sql/hive/client/HiveClient.scala
final def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}

in this method, we get table from metasotre and pass it into getPartitions
and then call toHiveTable to convert a catalog table into HiveTable.

Copy link
Contributor

@cloud-fan cloud-fan Jun 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is a unnecessary hive table -> CatalogTable -> hive table convertion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this convertion is not unnecessary since the hive client interface require a CatalogTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the hive client interface? For example

trait HiveClient {

  def getRawTableOption(dbName: String, tableName: String): Option[HiveTable]
  final def getTableOption(dbName: String, tableName: String): Option[CatalogTable] = {
    getRawTableOption(dbName, tableName).map(convertHiveTableToCatalogTable)
  }
  ...
  def getPartitionOption(
      table: HiveTable,
      spec: TablePartitionSpec): Option[CatalogTablePartition] = {
    getPartitions(getRawTable(db, table), partialSpec)
  }
  ...
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not the only place that have this issue. if we set spark.sql.statistics.size.autoUpdate.enabled=true, you can see this issue as well. for alter table, we have to do a catalogTable->HiveTable conversion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalogTable->HiveTable is fine, as long as the catalogTable is correctly initialized. The problem I see here is, we get catalogTable by HiveClient.getTable which doesn't go through the intialization logic in HiveExternalCatalog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu can you take this over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu can you take this over?

Sure.

@github-actions
Copy link

github-actions bot commented Oct 3, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 3, 2021
@github-actions github-actions bot closed this Oct 4, 2021
cloud-fan pushed a commit that referenced this pull request Nov 22, 2022
### What changes were proposed in this pull request?

Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable .
`HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will  store schema as lowercase  and keep bucket columns as they are.
`HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable

### Why are the changes needed?

Bug fix, refer to #32675 (comment)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Update exists UT

Closes #38495 from wankunde/write_stats_directly.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?

Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable .
`HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will  store schema as lowercase  and keep bucket columns as they are.
`HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable

### Why are the changes needed?

Bug fix, refer to apache#32675 (comment)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Update exists UT

Closes apache#38495 from wankunde/write_stats_directly.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
### What changes were proposed in this pull request?

Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable .
`HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will  store schema as lowercase  and keep bucket columns as they are.
`HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable

### Why are the changes needed?

Bug fix, refer to apache#32675 (comment)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Update exists UT

Closes apache#38495 from wankunde/write_stats_directly.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
### What changes were proposed in this pull request?

Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable .
`HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will  store schema as lowercase  and keep bucket columns as they are.
`HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable

### Why are the changes needed?

Bug fix, refer to apache#32675 (comment)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Update exists UT

Closes apache#38495 from wankunde/write_stats_directly.

Authored-by: Kun Wan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants