From ab2d921329324cfbeaaf77938e6342e4f3fe223f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 22 Nov 2014 09:36:13 +0800 Subject: [PATCH 1/4] Removes the Parquet hacks --- .../org/apache/spark/sql/SQLContext.scala | 15 +++ .../spark/sql/parquet/ParquetRelation.scala | 19 +-- .../apache/spark/sql/hive/HiveContext.scala | 1 - .../spark/sql/hive/HiveMetastoreCatalog.scala | 26 ++-- .../spark/sql/hive/HiveStrategies.scala | 127 +----------------- 5 files changed, 46 insertions(+), 142 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 31cc4170aa86..ddf5acc5e586 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -164,6 +164,21 @@ class SQLContext(@transient val sparkContext: SparkContext) def parquetFile(path: String): SchemaRDD = new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this)) + /** + * Loads a Parquet file with a given schema, returning the result as a [[SchemaRDD]]. + * + * @group userf + */ + def parquetFile(path: String, schema: StructType): SchemaRDD = + new SchemaRDD( + this, + parquet.ParquetRelation( + path, + Some(sparkContext.hadoopConfiguration), + this, + Nil, + Some(schema.toAttributes))) + /** * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]]. * It goes through the entire dataset once to determine the schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index b237a07c72d0..6a4d29074dc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -28,8 +28,9 @@ import parquet.schema.MessageType import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.types.StructType /** * Relation that consists of data stored in a Parquet columnar format. @@ -47,7 +48,8 @@ private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration], @transient sqlContext: SQLContext, - partitioningAttributes: Seq[Attribute] = Nil) + partitioningAttributes: Seq[Attribute] = Nil, + inheritedOutput: Option[Seq[Attribute]] = None) extends LeafNode with MultiInstanceRelation { self: Product => @@ -60,12 +62,13 @@ private[sql] case class ParquetRelation( .getSchema /** Attributes */ - override val output = + override val output = inheritedOutput.getOrElse { partitioningAttributes ++ - ParquetTypesConverter.readSchemaFromFile( - new Path(path.split(",").head), - conf, - sqlContext.isParquetBinaryAsString) + ParquetTypesConverter.readSchemaFromFile( + new Path(path.split(",").head), + conf, + sqlContext.isParquetBinaryAsString) + } override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] @@ -89,7 +92,7 @@ private[sql] object ParquetRelation { // checks first to see if there's any handlers already set // and if not it creates them. If this method executes prior // to that class being loaded then: - // 1) there's no handlers installed so there's none to + // 1) there's no handlers installed so there's none to // remove. But when it IS finally loaded the desired affect // of removing them is circumvented. // 2) The parquet.Log static initializer calls setUseParentHanders(false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 304b9a73ee91..b8926a66797e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -345,7 +345,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { TakeOrdered, ParquetOperations, InMemoryScans, - ParquetConversion, // Must be before HiveTableScans HiveTableScans, DataSinks, Scripts, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 91a157785d5b..c042a70a4f17 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -22,16 +22,14 @@ import java.util.{List => JList} import scala.util.parsing.combinator.RegexParsers -import org.apache.hadoop.util.ReflectionUtils - import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} +import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} +import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi @@ -81,9 +79,21 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } // Since HiveQL is case insensitive for table names we make them all lowercase. - MetastoreRelation( + val relation = MetastoreRelation( databaseName, tblName, alias)( table.getTTable, partitions.map(part => part.getTPartition))(hive) + + val schema = StructType.fromAttributes(relation.output) + + if (relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) { + if (relation.hiveQlTable.isPartitioned) { + hive.parquetFile(partitions.map(_.getLocation).mkString(","), schema).logicalPlan + } else { + hive.parquetFile(relation.hiveQlTable.getDataLocation.toString, schema).logicalPlan + } + } else { + relation + } } } @@ -145,9 +155,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName") tbl.setSerializationLib(classOf[LazySimpleSerDe].getName()) - import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.io.Text + import org.apache.hadoop.mapred.TextInputFormat tbl.setInputFormatClass(classOf[TextInputFormat]) tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]]) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 56fc85239e1c..732f166a7d0e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -19,21 +19,13 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.ql.parse.ASTNode -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.types.StringType -import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} -import org.apache.spark.sql.hive +import org.apache.spark.sql.execution.{DescribeCommand, SparkPlan} import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} - -import scala.collection.JavaConversions._ +import org.apache.spark.sql.{SQLContext, Strategy, hive} private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -41,121 +33,6 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext - /** - * :: Experimental :: - * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet - * table scan operator. - * - * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring - * but since this is after the code freeze for 1.1 all logic is here to minimize disruption. - * - * Other issues: - * - Much of this logic assumes case insensitive resolution. - */ - @Experimental - object ParquetConversion extends Strategy { - implicit class LogicalPlanHacks(s: SchemaRDD) { - def lowerCase = - new SchemaRDD(s.sqlContext, s.logicalPlan) - - def addPartitioningAttributes(attrs: Seq[Attribute]) = { - // Don't add the partitioning key if its already present in the data. - if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) { - s - } else { - new SchemaRDD( - s.sqlContext, - s.logicalPlan transform { - case p: ParquetRelation => p.copy(partitioningAttributes = attrs) - }) - } - } - } - - implicit class PhysicalPlanHacks(originalPlan: SparkPlan) { - def fakeOutput(newOutput: Seq[Attribute]) = - OutputFaker( - originalPlan.output.map(a => - newOutput.find(a.name.toLowerCase == _.name.toLowerCase) - .getOrElse( - sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), - originalPlan) - } - - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) - if relation.tableDesc.getSerdeClassName.contains("Parquet") && - hiveContext.convertMetastoreParquet => - - // Filter out all predicates that only deal with partition keys - val partitionsKeys = AttributeSet(relation.partitionKeys) - val (pruningPredicates, otherPredicates) = predicates.partition { - _.references.subsetOf(partitionsKeys) - } - - // We are going to throw the predicates and projection back at the whole optimization - // sequence so lets unresolve all the attributes, allowing them to be rebound to the - // matching parquet attributes. - val unresolvedOtherPredicates = otherPredicates.map(_ transform { - case a: AttributeReference => UnresolvedAttribute(a.name) - }).reduceOption(And).getOrElse(Literal(true)) - - val unresolvedProjection = projectList.map(_ transform { - case a: AttributeReference => UnresolvedAttribute(a.name) - }) - - if (relation.hiveQlTable.isPartitioned) { - val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) - // Translate the predicate so that it automatically casts the input values to the correct - // data types during evaluation - val castedPredicate = rawPredicate transform { - case a: AttributeReference => - val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) - val key = relation.partitionKeys(idx) - Cast(BoundReference(idx, StringType, nullable = true), key.dataType) - } - - val inputData = new GenericMutableRow(relation.partitionKeys.size) - val pruningCondition = - if(codegenEnabled) { - GeneratePredicate(castedPredicate) - } else { - InterpretedPredicate(castedPredicate) - } - - val partitions = relation.hiveQlPartitions.filter { part => - val partitionValues = part.getValues - var i = 0 - while (i < partitionValues.size()) { - inputData(i) = partitionValues(i) - i += 1 - } - pruningCondition(inputData) - } - - hiveContext - .parquetFile(partitions.map(_.getLocation).mkString(",")) - .addPartitioningAttributes(relation.partitionKeys) - .lowerCase - .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)):: Nil - } else { - hiveContext - .parquetFile(relation.hiveQlTable.getDataLocation.toString) - .lowerCase - .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)) :: Nil - } - case _ => Nil - } - } - object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) => From fe31d517fdebce51d4d2f7319b8fa9efde3bed7f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 25 Nov 2014 23:59:31 +0800 Subject: [PATCH 2/4] Fixes Parquet partitioning --- .../spark/sql/parquet/ParquetRelation.scala | 3 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 36 +++++++++++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 6a4d29074dc5..c404cb58d78c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -70,7 +70,8 @@ private[sql] case class ParquetRelation( sqlContext.isParquetBinaryAsString) } - override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] + override def newInstance() = ParquetRelation( + path, conf, sqlContext, partitioningAttributes, inheritedOutput).asInstanceOf[this.type] // Equals must also take into account the output attributes so that we can distinguish between // different instances of the same relation, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c042a70a4f17..332fc91b6fbe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.util.Utils /* Implicit conversions */ @@ -83,14 +84,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with databaseName, tblName, alias)( table.getTTable, partitions.map(part => part.getTPartition))(hive) - val schema = StructType.fromAttributes(relation.output) - - if (relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) { - if (relation.hiveQlTable.isPartitioned) { - hive.parquetFile(partitions.map(_.getLocation).mkString(","), schema).logicalPlan + if (hive.convertMetastoreParquet && + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) { + + val (path, partitionAttributes) = if (relation.hiveQlTable.isPartitioned) { + val partLocs = partitions.map(_.getLocation).mkString(",") + val partAttrs = table.getPartCols.map { f => + val hivePartKey = relation + .partitionKeys + .find(_.name.toLowerCase == f.getName.toLowerCase) + .get + + AttributeReference( + f.getName, + HiveMetastoreTypes.toDataType(f.getType), + nullable = true)( + exprId = hivePartKey.exprId, + qualifiers = Seq(alias.getOrElse(tableName))) + } + (partLocs, partAttrs) } else { - hive.parquetFile(relation.hiveQlTable.getDataLocation.toString, schema).logicalPlan + (relation.hiveQlTable.getDataLocation.toString, Nil) } + + ParquetRelation( + path, + Some(hive.sparkContext.hadoopConfiguration), + hive, + partitionAttributes, + Some(relation.output)) } else { relation } @@ -507,7 +529,7 @@ private[hive] case class MetastoreRelation val output = attributes ++ partitionKeys /** An attribute map that can be used to lookup original attributes based on expression id. */ - val attributeMap = AttributeMap(output.map(o => (o,o))) + val attributeMap = AttributeMap(output.map(o => (o, o))) /** An attribute map for determining the ordinal for non-partition columns. */ val columnOrdinals = AttributeMap(attributes.zipWithIndex) From a578f1b1b240530443d19f045bade1b2b0c8efc4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 1 Dec 2014 18:39:49 +0800 Subject: [PATCH 3/4] Fixed ParquetMetastoreSuite --- .../spark/sql/parquet/ParquetRelation.scala | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index c404cb58d78c..83aeca88b84b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -28,9 +28,9 @@ import parquet.schema.MessageType import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.types.StructType +import scala.collection.JavaConversions._ /** * Relation that consists of data stored in a Parquet columnar format. @@ -49,7 +49,7 @@ private[sql] case class ParquetRelation( @transient conf: Option[Configuration], @transient sqlContext: SQLContext, partitioningAttributes: Seq[Attribute] = Nil, - inheritedOutput: Option[Seq[Attribute]] = None) + inheritedAttributes: Option[Seq[Attribute]] = None) extends LeafNode with MultiInstanceRelation { self: Product => @@ -57,21 +57,32 @@ private[sql] case class ParquetRelation( /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter - .readMetaData(new Path(path), conf) + .readMetaData(new Path(path.split(",").head), conf) .getFileMetaData .getSchema /** Attributes */ - override val output = inheritedOutput.getOrElse { - partitioningAttributes ++ - ParquetTypesConverter.readSchemaFromFile( - new Path(path.split(",").head), - conf, - sqlContext.isParquetBinaryAsString) + override val output = { + inheritedAttributes.map { hiveAttributes => + // Hive is case insensitive, have to restore case information here. + val parquetFieldNames = parquetSchema.getFields.map(_.getName).toSet + hiveAttributes.map { a => + parquetFieldNames + .find(_.toLowerCase == a.name.toLowerCase) + .map(a.withName) + .getOrElse(a) + } + }.getOrElse { + partitioningAttributes ++ + ParquetTypesConverter.readSchemaFromFile( + new Path(path.split(",").head), + conf, + sqlContext.isParquetBinaryAsString) + } } override def newInstance() = ParquetRelation( - path, conf, sqlContext, partitioningAttributes, inheritedOutput).asInstanceOf[this.type] + path, conf, sqlContext, partitioningAttributes, inheritedAttributes).asInstanceOf[this.type] // Equals must also take into account the output attributes so that we can distinguish between // different instances of the same relation, From f6a587fa35eb00eb1cb9b5c66a83fc326e50f243 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 1 Dec 2014 23:58:56 +0800 Subject: [PATCH 4/4] Code cleanup --- .../org/apache/spark/sql/SQLContext.scala | 15 ---------- .../spark/sql/parquet/ParquetRelation.scala | 29 ++++++++----------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 25 ++++------------ 3 files changed, 17 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ddf5acc5e586..31cc4170aa86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -164,21 +164,6 @@ class SQLContext(@transient val sparkContext: SparkContext) def parquetFile(path: String): SchemaRDD = new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this)) - /** - * Loads a Parquet file with a given schema, returning the result as a [[SchemaRDD]]. - * - * @group userf - */ - def parquetFile(path: String, schema: StructType): SchemaRDD = - new SchemaRDD( - this, - parquet.ParquetRelation( - path, - Some(sparkContext.hadoopConfiguration), - this, - Nil, - Some(schema.toAttributes))) - /** * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]]. * It goes through the entire dataset once to determine the schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 83aeca88b84b..b67fd846c7b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import scala.collection.JavaConversions._ /** * Relation that consists of data stored in a Parquet columnar format. @@ -49,7 +48,7 @@ private[sql] case class ParquetRelation( @transient conf: Option[Configuration], @transient sqlContext: SQLContext, partitioningAttributes: Seq[Attribute] = Nil, - inheritedAttributes: Option[Seq[Attribute]] = None) + metastoreAttributes: Option[Seq[Attribute]] = None) extends LeafNode with MultiInstanceRelation { self: Product => @@ -63,26 +62,22 @@ private[sql] case class ParquetRelation( /** Attributes */ override val output = { - inheritedAttributes.map { hiveAttributes => - // Hive is case insensitive, have to restore case information here. - val parquetFieldNames = parquetSchema.getFields.map(_.getName).toSet - hiveAttributes.map { a => - parquetFieldNames - .find(_.toLowerCase == a.name.toLowerCase) - .map(a.withName) - .getOrElse(a) - } + // All non-partitioning attributes from Parquet metadata + val parquetAttributes = ParquetTypesConverter + .readSchemaFromFile(new Path(path.split(",").head), conf, sqlContext.isParquetBinaryAsString) + .filter(a => partitioningAttributes.find(_.name == a.name).isEmpty) + // Parquet is case sensitive while Hive is not, have to restore case information for non- + // partitioning keys here. + val attributes = metastoreAttributes.map { attrs => + (attrs, parquetAttributes.map(_.name)).zipped.map(_ withName _) }.getOrElse { - partitioningAttributes ++ - ParquetTypesConverter.readSchemaFromFile( - new Path(path.split(",").head), - conf, - sqlContext.isParquetBinaryAsString) + parquetAttributes } + partitioningAttributes ++ attributes } override def newInstance() = ParquetRelation( - path, conf, sqlContext, partitioningAttributes, inheritedAttributes).asInstanceOf[this.type] + path, conf, sqlContext, partitioningAttributes, metastoreAttributes).asInstanceOf[this.type] // Equals must also take into account the output attributes so that we can distinguish between // different instances of the same relation, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 332fc91b6fbe..7839a567e82b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -86,33 +86,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with if (hive.convertMetastoreParquet && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) { - - val (path, partitionAttributes) = if (relation.hiveQlTable.isPartitioned) { - val partLocs = partitions.map(_.getLocation).mkString(",") - val partAttrs = table.getPartCols.map { f => - val hivePartKey = relation - .partitionKeys - .find(_.name.toLowerCase == f.getName.toLowerCase) - .get - - AttributeReference( - f.getName, - HiveMetastoreTypes.toDataType(f.getType), - nullable = true)( - exprId = hivePartKey.exprId, - qualifiers = Seq(alias.getOrElse(tableName))) - } - (partLocs, partAttrs) + val path = if (relation.hiveQlTable.isPartitioned) { + partitions.map(_.getLocation).mkString(",") } else { - (relation.hiveQlTable.getDataLocation.toString, Nil) + relation.hiveQlTable.getDataLocation.toString } ParquetRelation( path, Some(hive.sparkContext.hadoopConfiguration), hive, - partitionAttributes, - Some(relation.output)) + relation.partitionKeys, + Some(relation.attributes)) } else { relation }