From dc943a445047a21a88ab19566eab672e8921dcc1 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 3 Aug 2016 07:51:05 +0530 Subject: [PATCH 1/3] [SPARK-14387][SQL] Exceptions thrown when querying ORC tables --- .../spark/sql/hive/orc/OrcFileFormat.scala | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 1d3c4663c339..b6476694084e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -100,6 +100,36 @@ private[sql] class OrcFileFormat true } + def mapRequiredColumns( + conf: Configuration, + dataSchema: StructType, + physicalSchema: StructType, + requiredSchema: StructType): StructType = { + /** + * requiredSchema names might not match with physical schema names. + * + * This is especially true when data is generated via Hive wherein + * orc files would have column names as _col0, _col1 etc. This is + * fixed in Hive 2.0, where in physical col names would match that + * of metastore. + * + * To make it backward compatible, it is required to map physical + * names to that of requiredSchema. + */ + // for requiredSchema, get the ordinal from dataSchema + val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted + + // for ids, get corresponding name from physicalSchema (e.g _col1 in + // case of hive. otherwise it would match physical name) + val names = ids.map(i => physicalSchema.fieldNames(i)) + HiveShim.appendReadColumns(conf, ids, names) + + val mappedReqPhysicalSchemaStruct = + StructType(physicalSchema.filter(struct => names.contains(struct.name))) + + mappedReqPhysicalSchemaStruct + } + override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -130,7 +160,9 @@ private[sql] class OrcFileFormat Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + // Get StructType for newly mapped schema + val mappedReqPhysicalSchema = + mapRequiredColumns(conf, dataSchema, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -151,7 +183,7 @@ private[sql] class OrcFileFormat // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, - requiredSchema, + mappedReqPhysicalSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), new RecordReaderIterator[OrcStruct](orcRecordReader)) } @@ -306,11 +338,4 @@ private[orc] object OrcRelation extends HiveInspectors { maybeStructOI.map(unwrap).getOrElse(Iterator.empty) } - - def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) - } } From 58162b9a2195b582ce076902891a2b223c77a225 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 3 Aug 2016 08:18:39 +0530 Subject: [PATCH 2/3] [SPARK-14387][SQL] Exceptions thrown when querying ORC tables --- .../spark/sql/hive/orc/OrcFileFormat.scala | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index b6476694084e..2faf5d0e98a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -105,17 +105,14 @@ private[sql] class OrcFileFormat dataSchema: StructType, physicalSchema: StructType, requiredSchema: StructType): StructType = { - /** - * requiredSchema names might not match with physical schema names. - * - * This is especially true when data is generated via Hive wherein - * orc files would have column names as _col0, _col1 etc. This is - * fixed in Hive 2.0, where in physical col names would match that - * of metastore. - * - * To make it backward compatible, it is required to map physical - * names to that of requiredSchema. - */ + + // requiredSchema names might not match with physical schema names. + // This is especially true when data is generated via Hive wherein + // orc files would have column names as _col0, _col1 etc. This is + // fixed in Hive 2.0, where in physical col names would match that + // of metastore. To make it backward compatible, it is required to + // map physical names to that of requiredSchema. + // for requiredSchema, get the ordinal from dataSchema val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted From 046e0c433997b71c85635535b5039e0405cabd13 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 3 Aug 2016 16:35:35 +0530 Subject: [PATCH 3/3] [SPARK-14387][SQL] Enable Hive-1.x ORC compatibility with spark.sql.hive.convertMetastoreOrc --- .../data/files/hive_1.x_orc/dummy_orc.orc | Bin 0 -> 392 bytes .../spark/sql/hive/orc/OrcQuerySuite.scala | 26 ++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc new file mode 100644 index 0000000000000000000000000000000000000000..fb35cb0df83ce92d7e46a033697b872e16a3bae5 GIT binary patch literal 392 zcmeYdau#G@;9?VE;b0A5umv(@xR@Cj7=%PQScUjG?mSz@B=7`CNIYmf#2vuk!@%&^ z=y6gi1EXRrUzlpC9;3U&hmW6%OUp_`nz~skoLyAJ4A=}982A|&4zPf9Gnp|k6qmFx z)aUElho@_>d$2C}C|euwbnb;8=K|~)SQ!|Y1ge0hCOu9NPMGlE8Pmlcp{|5ko=(CZ z#!NqBKrB8FW5&!dMKwM)g*2u_O~&JQt}Ht9#HDusl}|rT{8^O4Pyy8c>0#2t1m1)P zLQfM!A3fjkIY)Bx0bvEjj|)BumwQ3wt!B@kn7OjDtq+tK?+FNYvi30Y zFkAv^PY|;>rE&D=K^+0Dg{vPdIJipX&_U%RM^305m6|_ielo*Co~bh|W;D*Y+i + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { + withTable("dummy_orc") { + // Hive 1.x can have virtual columns as follows in ORC files + // Type: struct<_col0:int,_col1:string> in hive_1.x_orc + spark.sql( + s""" + |CREATE EXTERNAL TABLE dummy_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '${getHiveFile("data/files/hive_1.x_orc/")}' + """.stripMargin) + + val df = spark.sql("SELECT key, value FROM dummy_orc LIMIT 1") + checkAnswer(df, singleRowDF) + } + } + } + } + test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { withTempView("single") { val singleRowDF = Seq((0, "foo")).toDF("key", "value")