Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import javax.annotation.Nullable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortDirection}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}

Expand Down Expand Up @@ -63,6 +63,13 @@ case class CatalogColumn(
comment: Option[String] = None)


/**
* Sort ordering of a table.
*/
case class CatalogSortOrder(
column: CatalogColumn,
direction: SortDirection)

/**
* A partition (Hive style) defined in the catalog.
*
Expand All @@ -86,7 +93,7 @@ case class CatalogTable(
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
partitionColumnNames: Seq[String] = Seq.empty,
sortColumnNames: Seq[String] = Seq.empty,
sortColumns: Seq[CatalogSortOrder] = Seq.empty,
bucketColumnNames: Seq[String] = Seq.empty,
numBuckets: Int = -1,
owner: String = "",
Expand All @@ -104,7 +111,7 @@ case class CatalogTable(
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
requireSubsetOfSchema(sortColumnNames, "sort")
requireSubsetOfSchema(sortColumns.map(_.column.name), "sort")
requireSubsetOfSchema(bucketColumnNames, "bucket")

/** Columns this table is partitioned by. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
} else {
append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", metadata.sortColumns.mkString("[", ", ", "]"), "")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import scala.collection.JavaConverters._
import com.google.common.base.Objects
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order}
import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer._
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.FileRelation
Expand Down Expand Up @@ -64,6 +65,14 @@ private[hive] case class MetastoreRelation(
new FieldSchema(c.name, c.dataType, c.comment.orNull)
}

private def toHiveOrder(so: CatalogSortOrder): Order = {
val order = so.direction match {
case Ascending => HIVE_COLUMN_ORDER_ASC
case Descending => HIVE_COLUMN_ORDER_DESC
}
new Order(so.column.name, order)
}

// TODO: merge this with HiveClientImpl#toHiveTable
@transient val hiveQlTable: HiveTable = {
// We start by constructing an API table as Hive performs several important transformations
Expand Down Expand Up @@ -91,6 +100,16 @@ private[hive] case class MetastoreRelation(
catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)

if (catalogTable.bucketColumnNames.nonEmpty) {
sd.setBucketCols(catalogTable.bucketColumnNames.toList.asJava)
sd.setNumBuckets(catalogTable.numBuckets)

if (catalogTable.sortColumns.nonEmpty) {
sd.setSortCols(catalogTable.sortColumns.map(toHiveOrder).asJava)
}
}

tTable.setPartitionKeys(partCols.asJava)

catalogTable.storage.locationUri.foreach(sd.setLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.{NoSuchObjectException, PrincipalTyp
import org.apache.hadoop.hive.metastore.api.{ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer._
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CausedBy, CircularBuffer, Utils}

Expand Down Expand Up @@ -336,8 +337,22 @@ private[hive] class HiveClientImpl(
Option(client.getTable(dbName, tableName, false)).map { h =>
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val cols = h.getCols.asScala.map(fromHiveColumn)
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
val schema = cols ++ partCols

val sortColumns = h.getSortCols.asScala.map(o => {
val column = cols.find(_.name.equalsIgnoreCase(o.getCol))
if (column.isEmpty) {
throw new AnalysisException(s"No match found for sort column name = ${column.get} " +
s"in dbName = $dbName, tableName = $tableName. " +
s"Known table columns are ${cols.mkString("[", ", ", "]")}")
}

val sortOrder = if (o.getOrder == HIVE_COLUMN_ORDER_ASC) Ascending else Descending
CatalogSortOrder(column.get, sortOrder)
})

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
Expand All @@ -348,7 +363,7 @@ private[hive] class HiveClientImpl(
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
sortColumnNames = Seq(), // TODO: populate this
sortColumns = sortColumns,
bucketColumnNames = h.getBucketCols.asScala,
numBuckets = h.getNumBuckets,
owner = h.getOwner,
Expand Down