Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,37 @@ private[sql] case class ParquetRelation(
path: String,
@transient conf: Option[Configuration],
@transient sqlContext: SQLContext,
partitioningAttributes: Seq[Attribute] = Nil)
partitioningAttributes: Seq[Attribute] = Nil,
metastoreAttributes: Option[Seq[Attribute]] = None)
extends LeafNode with MultiInstanceRelation {

self: Product =>

/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
ParquetTypesConverter
.readMetaData(new Path(path), conf)
.readMetaData(new Path(path.split(",").head), conf)
.getFileMetaData
.getSchema

/** Attributes */
override val output =
partitioningAttributes ++
ParquetTypesConverter.readSchemaFromFile(
new Path(path.split(",").head),
conf,
sqlContext.isParquetBinaryAsString)
override val output = {
// All non-partitioning attributes from Parquet metadata
val parquetAttributes = ParquetTypesConverter
.readSchemaFromFile(new Path(path.split(",").head), conf, sqlContext.isParquetBinaryAsString)
.filter(a => partitioningAttributes.find(_.name == a.name).isEmpty)
// Parquet is case sensitive while Hive is not, have to restore case information for non-
// partitioning keys here.
val attributes = metastoreAttributes.map { attrs =>
(attrs, parquetAttributes.map(_.name)).zipped.map(_ withName _)
}.getOrElse {
parquetAttributes
}
partitioningAttributes ++ attributes
}

override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
override def newInstance() = ParquetRelation(
path, conf, sqlContext, partitioningAttributes, metastoreAttributes).asInstanceOf[this.type]

// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
Expand All @@ -89,7 +99,7 @@ private[sql] object ParquetRelation {
// checks first to see if there's any handlers already set
// and if not it creates them. If this method executes prior
// to that class being loaded then:
// 1) there's no handlers installed so there's none to
// 1) there's no handlers installed so there's none to
// remove. But when it IS finally loaded the desired affect
// of removing them is circumvented.
// 2) The parquet.Log static initializer calls setUseParentHanders(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ import java.util.{List => JList}

import scala.util.parsing.combinator.RegexParsers

import org.apache.hadoop.util.ReflectionUtils

import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
Expand All @@ -42,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.util.Utils

/* Implicit conversions */
Expand Down Expand Up @@ -81,9 +80,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

// Since HiveQL is case insensitive for table names we make them all lowercase.
MetastoreRelation(
val relation = MetastoreRelation(
databaseName, tblName, alias)(
table.getTTable, partitions.map(part => part.getTPartition))(hive)

if (hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent two spaces?

val path = if (relation.hiveQlTable.isPartitioned) {
partitions.map(_.getLocation).mkString(",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, where are we doing partition pruning based on predicates now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, forgot that in case of MetastoreRelation partition pruning is done within HiveTableScan... I'll add a WIP tag to this PR and add back partition pruning.

} else {
relation.hiveQlTable.getDataLocation.toString
}

ParquetRelation(
path,
Some(hive.sparkContext.hadoopConfiguration),
hive,
relation.partitionKeys,
Some(relation.attributes))
} else {
relation
}
}
}

Expand Down Expand Up @@ -145,9 +162,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName")
tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())

import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextInputFormat

tbl.setInputFormatClass(classOf[TextInputFormat])
tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]])
Expand Down Expand Up @@ -497,7 +514,7 @@ private[hive] case class MetastoreRelation
val output = attributes ++ partitionKeys

/** An attribute map that can be used to lookup original attributes based on expression id. */
val attributeMap = AttributeMap(output.map(o => (o,o)))
val attributeMap = AttributeMap(output.map(o => (o, o)))

/** An attribute map for determining the ordinal for non-partition columns. */
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
Expand Down
127 changes: 2 additions & 125 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,143 +19,20 @@ package org.apache.spark.sql.hive

import org.apache.hadoop.hive.ql.parse.ASTNode

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.execution.{DescribeCommand, SparkPlan}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}

import scala.collection.JavaConversions._
import org.apache.spark.sql.{SQLContext, Strategy, hive}

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SQLContext#SparkPlanner =>

val hiveContext: HiveContext

/**
* :: Experimental ::
* Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
* table scan operator.
*
* TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
* but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
*
* Other issues:
* - Much of this logic assumes case insensitive resolution.
*/
@Experimental
object ParquetConversion extends Strategy {
implicit class LogicalPlanHacks(s: SchemaRDD) {
def lowerCase =
new SchemaRDD(s.sqlContext, s.logicalPlan)

def addPartitioningAttributes(attrs: Seq[Attribute]) = {
// Don't add the partitioning key if its already present in the data.
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
s
} else {
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
}
}
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
def fakeOutput(newOutput: Seq[Attribute]) =
OutputFaker(
originalPlan.output.map(a =>
newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
.getOrElse(
sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
originalPlan)
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
hiveContext.convertMetastoreParquet =>

// Filter out all predicates that only deal with partition keys
val partitionsKeys = AttributeSet(relation.partitionKeys)
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.subsetOf(partitionsKeys)
}

// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
val unresolvedOtherPredicates = otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
}).reduceOption(And).getOrElse(Literal(true))

val unresolvedProjection = projectList.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
})

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
// data types during evaluation
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if(codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
}
pruningCondition(inputData)
}

hiveContext
.parquetFile(partitions.map(_.getLocation).mkString(","))
.addPartitioningAttributes(relation.partitionKeys)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)):: Nil
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.toString)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
case _ => Nil
}
}

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
Expand Down