diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index b237a07c72d0..b67fd846c7b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -47,7 +47,8 @@ private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration], @transient sqlContext: SQLContext, - partitioningAttributes: Seq[Attribute] = Nil) + partitioningAttributes: Seq[Attribute] = Nil, + metastoreAttributes: Option[Seq[Attribute]] = None) extends LeafNode with MultiInstanceRelation { self: Product => @@ -55,19 +56,28 @@ private[sql] case class ParquetRelation( /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter - .readMetaData(new Path(path), conf) + .readMetaData(new Path(path.split(",").head), conf) .getFileMetaData .getSchema /** Attributes */ - override val output = - partitioningAttributes ++ - ParquetTypesConverter.readSchemaFromFile( - new Path(path.split(",").head), - conf, - sqlContext.isParquetBinaryAsString) + override val output = { + // All non-partitioning attributes from Parquet metadata + val parquetAttributes = ParquetTypesConverter + .readSchemaFromFile(new Path(path.split(",").head), conf, sqlContext.isParquetBinaryAsString) + .filter(a => partitioningAttributes.find(_.name == a.name).isEmpty) + // Parquet is case sensitive while Hive is not, have to restore case information for non- + // partitioning keys here. + val attributes = metastoreAttributes.map { attrs => + (attrs, parquetAttributes.map(_.name)).zipped.map(_ withName _) + }.getOrElse { + parquetAttributes + } + partitioningAttributes ++ attributes + } - override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] + override def newInstance() = ParquetRelation( + path, conf, sqlContext, partitioningAttributes, metastoreAttributes).asInstanceOf[this.type] // Equals must also take into account the output attributes so that we can distinguish between // different instances of the same relation, @@ -89,7 +99,7 @@ private[sql] object ParquetRelation { // checks first to see if there's any handlers already set // and if not it creates them. If this method executes prior // to that class being loaded then: - // 1) there's no handlers installed so there's none to + // 1) there's no handlers installed so there's none to // remove. But when it IS finally loaded the desired affect // of removing them is circumvented. // 2) The parquet.Log static initializer calls setUseParentHanders(false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 304b9a73ee91..b8926a66797e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -345,7 +345,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { TakeOrdered, ParquetOperations, InMemoryScans, - ParquetConversion, // Must be before HiveTableScans HiveTableScans, DataSinks, Scripts, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 91a157785d5b..7839a567e82b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -22,16 +22,14 @@ import java.util.{List => JList} import scala.util.parsing.combinator.RegexParsers -import org.apache.hadoop.util.ReflectionUtils - import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} +import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} +import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi @@ -42,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.util.Utils /* Implicit conversions */ @@ -81,9 +80,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } // Since HiveQL is case insensitive for table names we make them all lowercase. - MetastoreRelation( + val relation = MetastoreRelation( databaseName, tblName, alias)( table.getTTable, partitions.map(part => part.getTPartition))(hive) + + if (hive.convertMetastoreParquet && + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) { + val path = if (relation.hiveQlTable.isPartitioned) { + partitions.map(_.getLocation).mkString(",") + } else { + relation.hiveQlTable.getDataLocation.toString + } + + ParquetRelation( + path, + Some(hive.sparkContext.hadoopConfiguration), + hive, + relation.partitionKeys, + Some(relation.attributes)) + } else { + relation + } } } @@ -145,9 +162,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName") tbl.setSerializationLib(classOf[LazySimpleSerDe].getName()) - import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.io.Text + import org.apache.hadoop.mapred.TextInputFormat tbl.setInputFormatClass(classOf[TextInputFormat]) tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]]) @@ -497,7 +514,7 @@ private[hive] case class MetastoreRelation val output = attributes ++ partitionKeys /** An attribute map that can be used to lookup original attributes based on expression id. */ - val attributeMap = AttributeMap(output.map(o => (o,o))) + val attributeMap = AttributeMap(output.map(o => (o, o))) /** An attribute map for determining the ordinal for non-partition columns. */ val columnOrdinals = AttributeMap(attributes.zipWithIndex) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 56fc85239e1c..732f166a7d0e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -19,21 +19,13 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.ql.parse.ASTNode -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.types.StringType -import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} -import org.apache.spark.sql.hive +import org.apache.spark.sql.execution.{DescribeCommand, SparkPlan} import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} - -import scala.collection.JavaConversions._ +import org.apache.spark.sql.{SQLContext, Strategy, hive} private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -41,121 +33,6 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext - /** - * :: Experimental :: - * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet - * table scan operator. - * - * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring - * but since this is after the code freeze for 1.1 all logic is here to minimize disruption. - * - * Other issues: - * - Much of this logic assumes case insensitive resolution. - */ - @Experimental - object ParquetConversion extends Strategy { - implicit class LogicalPlanHacks(s: SchemaRDD) { - def lowerCase = - new SchemaRDD(s.sqlContext, s.logicalPlan) - - def addPartitioningAttributes(attrs: Seq[Attribute]) = { - // Don't add the partitioning key if its already present in the data. - if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) { - s - } else { - new SchemaRDD( - s.sqlContext, - s.logicalPlan transform { - case p: ParquetRelation => p.copy(partitioningAttributes = attrs) - }) - } - } - } - - implicit class PhysicalPlanHacks(originalPlan: SparkPlan) { - def fakeOutput(newOutput: Seq[Attribute]) = - OutputFaker( - originalPlan.output.map(a => - newOutput.find(a.name.toLowerCase == _.name.toLowerCase) - .getOrElse( - sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), - originalPlan) - } - - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) - if relation.tableDesc.getSerdeClassName.contains("Parquet") && - hiveContext.convertMetastoreParquet => - - // Filter out all predicates that only deal with partition keys - val partitionsKeys = AttributeSet(relation.partitionKeys) - val (pruningPredicates, otherPredicates) = predicates.partition { - _.references.subsetOf(partitionsKeys) - } - - // We are going to throw the predicates and projection back at the whole optimization - // sequence so lets unresolve all the attributes, allowing them to be rebound to the - // matching parquet attributes. - val unresolvedOtherPredicates = otherPredicates.map(_ transform { - case a: AttributeReference => UnresolvedAttribute(a.name) - }).reduceOption(And).getOrElse(Literal(true)) - - val unresolvedProjection = projectList.map(_ transform { - case a: AttributeReference => UnresolvedAttribute(a.name) - }) - - if (relation.hiveQlTable.isPartitioned) { - val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) - // Translate the predicate so that it automatically casts the input values to the correct - // data types during evaluation - val castedPredicate = rawPredicate transform { - case a: AttributeReference => - val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) - val key = relation.partitionKeys(idx) - Cast(BoundReference(idx, StringType, nullable = true), key.dataType) - } - - val inputData = new GenericMutableRow(relation.partitionKeys.size) - val pruningCondition = - if(codegenEnabled) { - GeneratePredicate(castedPredicate) - } else { - InterpretedPredicate(castedPredicate) - } - - val partitions = relation.hiveQlPartitions.filter { part => - val partitionValues = part.getValues - var i = 0 - while (i < partitionValues.size()) { - inputData(i) = partitionValues(i) - i += 1 - } - pruningCondition(inputData) - } - - hiveContext - .parquetFile(partitions.map(_.getLocation).mkString(",")) - .addPartitioningAttributes(relation.partitionKeys) - .lowerCase - .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)):: Nil - } else { - hiveContext - .parquetFile(relation.hiveQlTable.getDataLocation.toString) - .lowerCase - .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)) :: Nil - } - case _ => Nil - } - } - object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) =>