Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1092,14 +1092,23 @@ private[hive] object HiveClientImpl extends Logging {
hiveTable.setViewExpandedText(t)
}

// hive may convert schema into lower cases while bucketSpec will not
// only convert if case not match
def restoreHiveBucketSpecColNames(schema: StructType, names: Seq[String]): Seq[String] = {
names.map { name =>
schema.find(col => SQLConf.get.resolver(col.name, name)).map(_.name).getOrElse(name)
}
}

table.bucketSpec match {
case Some(bucketSpec) if !HiveExternalCatalog.isDatasourceTable(table) =>
hiveTable.setNumBuckets(bucketSpec.numBuckets)
hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: hiveTable.setFields lower-cases the column names, but hiveTable.setBucketCols does not. And this causes the exception?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

hiveTable.setBucketCols(
restoreHiveBucketSpecColNames(table.schema, bucketSpec.bucketColumnNames).toList.asJava)

if (bucketSpec.sortColumnNames.nonEmpty) {
hiveTable.setSortCols(
bucketSpec.sortColumnNames
restoreHiveBucketSpecColNames(table.schema, bucketSpec.sortColumnNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I still can't understand how the bug happens.

In this toHiveTable method, the input CatalogTable should guarantee sanity: the partition/bucket column names should match the schema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org/apache/spark/sql/hive/client/HiveClient.scala
final def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}

in this method, we get table from metasotre and pass it into getPartitions
and then call toHiveTable to convert a catalog table into HiveTable.

Copy link
Contributor

@cloud-fan cloud-fan Jun 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is a unnecessary hive table -> CatalogTable -> hive table convertion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this convertion is not unnecessary since the hive client interface require a CatalogTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the hive client interface? For example

trait HiveClient {

  def getRawTableOption(dbName: String, tableName: String): Option[HiveTable]
  final def getTableOption(dbName: String, tableName: String): Option[CatalogTable] = {
    getRawTableOption(dbName, tableName).map(convertHiveTableToCatalogTable)
  }
  ...
  def getPartitionOption(
      table: HiveTable,
      spec: TablePartitionSpec): Option[CatalogTablePartition] = {
    getPartitions(getRawTable(db, table), partialSpec)
  }
  ...
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not the only place that have this issue. if we set spark.sql.statistics.size.autoUpdate.enabled=true, you can see this issue as well. for alter table, we have to do a catalogTable->HiveTable conversion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalogTable->HiveTable is fine, as long as the catalogTable is correctly initialized. The problem I see here is, we get catalogTable by HiveClient.getTable which doesn't go through the intialization logic in HiveExternalCatalog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu can you take this over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu can you take this over?

Sure.

.map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
.toList
.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.util.Locale

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -870,4 +871,68 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
assert(e.contains("Partition spec is invalid"))
}
}

test("SPARK-35531 Insert data with different cases") {
withTable("TEST1") {
val createHive =
"""
|create table TEST1(
|v1 BIGINT,
|s1 INT)
|partitioned by (pk BIGINT)
|clustered by (v1)
|sorted by (s1)
|into 200 buckets
|STORED AS PARQUET
|""".stripMargin

val insertString =
"""
|insert into test1
|select
|* from values(1,1,1)
|""".stripMargin

val dropString = "drop table if exists test1"


spark.sql(dropString)
spark.sql(createHive.toLowerCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))

spark.sql(dropString)
spark.sql(createHive.toUpperCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))

val createSpark =
"""
|create table TEST1(
|v1 BIGINT,
|s1 INT)
|using parquet
|partitioned by (pk BIGINT)
|clustered by (v1)
|sorted by (s1)
|into 200 buckets
|
|""".stripMargin

spark.sql(dropString)
spark.sql(createSpark.toLowerCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))


spark.sql(dropString)
spark.sql(createSpark.toUpperCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))
}
}
}