Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Feb 10, 2022

What changes were proposed in this pull request?

In current HiveexternalCatalog.listpartitions, it use

  final def getPartitions(
      db: String,
      table: String,
      partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
    getPartitions(getTable(db, table), partialSpec)
  }

It call geTables to get a raw HiveTable then convert it to a CatalogTable, in getPartitions it re-convert it to a HiveTable.
This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase.

In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also respect case sensitivity.

Why are the changes needed?

When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.

if we try to insert into this table, an HiveException reports bucket column is not in table schema.

here is a simple repro

spark.sql("""
  CREATE TABLE TEST1(
    V1 BIGINT,
    S1 INT)
  PARTITIONED BY (PK BIGINT)
  CLUSTERED BY (V1)
  SORTED BY (S1)
  INTO 200 BUCKETS
  STORED AS PARQUET """).show

spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show

Error message:

scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
  at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
  ... 47 elided
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
  at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
  ... 69 more

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

…etPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable

In current `HiveexternalCatalog.listpartitions`, it use
```
  final def getPartitions(
      db: String,
      table: String,
      partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
    getPartitions(getTable(db, table), partialSpec)
  }
```
It call `geTables` to get a raw HiveTable then convert it to a CatalogTable, in `getPartitions` it re-convert it to a HiveTable.
This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase.

In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also  respect case sensitivity.

When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.

if we try to insert into this table, an HiveException reports bucket column is not in table schema.

here is a simple repro
```
spark.sql("""
  CREATE TABLE TEST1(
    V1 BIGINT,
    S1 INT)
  PARTITIONED BY (PK BIGINT)
  CLUSTERED BY (V1)
  SORTED BY (S1)
  INTO 200 BUCKETS
  STORED AS PARQUET """).show

spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
```
Error message:
```
scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
  at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
  ... 47 elided
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
  at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
  ... 69 more
```

No

UT

Closes apache#34218 from AngersZhuuuu/SPARK-35531.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@github-actions github-actions bot added the SQL label Feb 10, 2022
@AngersZhuuuu AngersZhuuuu changed the title [SPARK-35531][SQL] Directly pass hive Table to HiveClient when call g… [SPARK-35531][SQL][3.1] Directly pass hive Table to HiveClient when call g… Feb 10, 2022
@AngersZhuuuu
Copy link
Contributor Author

ping @cloud-fan

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-35531][SQL][3.1] Directly pass hive Table to HiveClient when call g… [SPARK-35531][SQL][3.1] Directly pass hive Table to HiveClient when call getPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable Feb 10, 2022
@cloud-fan
Copy link
Contributor

thanks, merging to 3.1!

cloud-fan pushed a commit that referenced this pull request Feb 10, 2022
…all getPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable

### What changes were proposed in this pull request?
In current `HiveexternalCatalog.listpartitions`, it use
```
  final def getPartitions(
      db: String,
      table: String,
      partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
    getPartitions(getTable(db, table), partialSpec)
  }
```
It call `geTables` to get a raw HiveTable then convert it to a CatalogTable, in `getPartitions` it re-convert it to a HiveTable.
This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase.

In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also  respect case sensitivity.

### Why are the changes needed?
When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.

if we try to insert into this table, an HiveException reports bucket column is not in table schema.

here is a simple repro
```
spark.sql("""
  CREATE TABLE TEST1(
    V1 BIGINT,
    S1 INT)
  PARTITIONED BY (PK BIGINT)
  CLUSTERED BY (V1)
  SORTED BY (S1)
  INTO 200 BUCKETS
  STORED AS PARQUET """).show

spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
```
Error message:
```
scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
  at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
  ... 47 elided
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
  at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
  ... 69 more
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #35475 from AngersZhuuuu/SPARK-35531-3.1.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan cloud-fan closed this Feb 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants