Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.client

import java.io.PrintStream

import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}

import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -215,19 +217,17 @@ private[hive] trait HiveClient {
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
final def getPartitions(
def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]

/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
def getPartitions(
catalogTable: CatalogTable,
hiveTable: HiveTable,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]

/** Returns partitions filtered by predicates for the given table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -733,14 +733,22 @@ private[hive] class HiveClientImpl(
Option(hivePartition).map(fromHivePartition)
}

override def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
getPartitions(
getRawTableOption(db, table).getOrElse(throw new NoSuchTableException(db, table)),
partialSpec)
}

/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
override def getPartitions(
table: CatalogTable,
hiveTable: HiveTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val partSpec = spec match {
case None => CatalogTypes.emptyTablePartitionSpec
case Some(s) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.util.Locale

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -854,4 +855,64 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
assert(e.contains("Partition spec is invalid"))
}
}

test("SPARK-35531: Insert data with different cases of bucket column") {
withTable("TEST1") {
val createHive =
"""
|CREATE TABLE TEST1(
|v1 BIGINT,
|s1 INT)
|PARTITIONED BY (pk BIGINT)
|CLUSTERED BY (v1)
|SORTED BY (s1)
|INTO 200 BUCKETS
|STORED AS PARQUET
""".stripMargin

val insertString =
"""
|INSERT INTO test1
|SELECT * FROM VALUES(1,1,1)
""".stripMargin

val dropString = "DROP TABLE IF EXISTS test1"

spark.sql(dropString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't sql(dropString) work?

spark.sql(createHive.toLowerCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))

spark.sql(dropString)
spark.sql(createHive.toUpperCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))

val createSpark =
"""
|CREATE TABLE TEST1(
|v1 BIGINT,
|s1 INT)
|USING PARQUET
|PARTITIONED BY (pk BIGINT)
|CLUSTERED BY (v1)
|SORTED BY (s1)
|INTO 200 BUCKETS
""".stripMargin

spark.sql(dropString)
spark.sql(createSpark.toLowerCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))

spark.sql(dropString)
spark.sql(createSpark.toUpperCase(Locale.ROOT))

spark.sql(insertString.toLowerCase(Locale.ROOT))
spark.sql(insertString.toUpperCase(Locale.ROOT))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class VersionsSuite extends SparkFunSuite with Logging {

test(s"$version: getPartitions(catalogTable)") {
assert(testPartitionCount ==
client.getPartitions(client.getTable("default", "src_part")).size)
client.getPartitions("default", "src_part", None).size)
}

test(s"$version: getPartitionsByFilter") {
Expand Down