diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index fc2068cac5ab..956c1789a655 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,7 +21,7 @@ import javax.annotation.Nullable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortDirection} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} @@ -63,6 +63,13 @@ case class CatalogColumn( comment: Option[String] = None) +/** + * Sort ordering of a table. + */ +case class CatalogSortOrder( + column: CatalogColumn, + direction: SortDirection) + /** * A partition (Hive style) defined in the catalog. * @@ -86,7 +93,7 @@ case class CatalogTable( storage: CatalogStorageFormat, schema: Seq[CatalogColumn], partitionColumnNames: Seq[String] = Seq.empty, - sortColumnNames: Seq[String] = Seq.empty, + sortColumns: Seq[CatalogSortOrder] = Seq.empty, bucketColumnNames: Seq[String] = Seq.empty, numBuckets: Int = -1, owner: String = "", @@ -104,7 +111,7 @@ case class CatalogTable( s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") } requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(sortColumnNames, "sort") + requireSubsetOfSchema(sortColumns.map(_.column.name), "sort") requireSubsetOfSchema(bucketColumnNames, "bucket") /** Columns this table is partitioned by. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 0f90715a90e1..b3110d57bdc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -392,7 +392,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } else { append(buffer, "Num Buckets:", metadata.numBuckets.toString, "") append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", metadata.sortColumns.mkString("[", ", ", "]"), "") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 1671228fd9b4..ac4c28d91bc0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -22,14 +22,15 @@ import scala.collection.JavaConverters._ import com.google.common.base.Objects import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order} import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer._ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation @@ -64,6 +65,14 @@ private[hive] case class MetastoreRelation( new FieldSchema(c.name, c.dataType, c.comment.orNull) } + private def toHiveOrder(so: CatalogSortOrder): Order = { + val order = so.direction match { + case Ascending => HIVE_COLUMN_ORDER_ASC + case Descending => HIVE_COLUMN_ORDER_DESC + } + new Order(so.column.name, order) + } + // TODO: merge this with HiveClientImpl#toHiveTable @transient val hiveQlTable: HiveTable = { // We start by constructing an API table as Hive performs several important transformations @@ -91,6 +100,16 @@ private[hive] case class MetastoreRelation( catalogTable.partitionColumnNames.contains(c.getName) } sd.setCols(schema.asJava) + + if (catalogTable.bucketColumnNames.nonEmpty) { + sd.setBucketCols(catalogTable.bucketColumnNames.toList.asJava) + sd.setNumBuckets(catalogTable.numBuckets) + + if (catalogTable.sortColumns.nonEmpty) { + sd.setSortCols(catalogTable.sortColumns.map(toHiveOrder).asJava) + } + } + tTable.setPartitionKeys(partCols.asJava) catalogTable.storage.locationUri.foreach(sd.setLocation) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bb324592028b..cc1ae8655974 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.{NoSuchObjectException, PrincipalTyp import org.apache.hadoop.hive.metastore.api.{ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer._ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState @@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CausedBy, CircularBuffer, Utils} @@ -336,8 +337,22 @@ private[hive] class HiveClientImpl( Option(client.getTable(dbName, tableName, false)).map { h => // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema + val cols = h.getCols.asScala.map(fromHiveColumn) val partCols = h.getPartCols.asScala.map(fromHiveColumn) - val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols + val schema = cols ++ partCols + + val sortColumns = h.getSortCols.asScala.map(o => { + val column = cols.find(_.name.equalsIgnoreCase(o.getCol)) + if (column.isEmpty) { + throw new AnalysisException(s"No match found for sort column name = ${column.get} " + + s"in dbName = $dbName, tableName = $tableName. " + + s"Known table columns are ${cols.mkString("[", ", ", "]")}") + } + + val sortOrder = if (o.getOrder == HIVE_COLUMN_ORDER_ASC) Ascending else Descending + CatalogSortOrder(column.get, sortOrder) + }) + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -348,7 +363,7 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), - sortColumnNames = Seq(), // TODO: populate this + sortColumns = sortColumns, bucketColumnNames = h.getBucketCols.asScala, numBuckets = h.getNumBuckets, owner = h.getOwner,