diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index fd791ce7c5e19..4dff1ec7ebfb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -164,8 +164,6 @@ class OrcFileFormat val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize - val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) val broadcastedConf = @@ -179,16 +177,18 @@ class OrcFileFormat val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val requestedColIdsOrEmptyFile = + val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) } - if (requestedColIdsOrEmptyFile.isEmpty) { + if (resultedColPruneInfo.isEmpty) { Iterator.empty } else { - val requestedColIds = requestedColIdsOrEmptyFile.get + val (requestedColIds, canPruneCols) = resultedColPruneInfo.get + val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, + dataSchema, resultSchema, partitionSchema, conf) assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index d274bcd0edd2c..e10253989788b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer} +import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer} import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -116,15 +116,17 @@ object OrcUtils extends Logging { } /** - * Returns the requested column ids from the given ORC file. Column id can be -1, which means the - * requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty. + * @return Returns the combination of requested column ids from the given ORC file and + * boolean flag to find if the pruneCols is allowed or not. Requested Column id can be + * -1, which means the requested column doesn't exist in the ORC file. Returns None + * if the given ORC file is empty. */ def requestedColumnIds( isCaseSensitive: Boolean, dataSchema: StructType, requiredSchema: StructType, reader: Reader, - conf: Configuration): Option[Array[Int]] = { + conf: Configuration): Option[(Array[Int], Boolean)] = { val orcFieldNames = reader.getSchema.getFieldNames.asScala if (orcFieldNames.isEmpty) { // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. @@ -136,6 +138,10 @@ object OrcUtils extends Logging { assert(orcFieldNames.length <= dataSchema.length, "The given data schema " + s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " + "no idea which columns were dropped, fail to read.") + // for ORC file written by Hive, no field names + // in the physical schema, there is a need to send the + // entire dataSchema instead of required schema. + // So pruneCols is not done in this case Some(requiredSchema.fieldNames.map { name => val index = dataSchema.fieldIndex(name) if (index < orcFieldNames.length) { @@ -143,7 +149,7 @@ object OrcUtils extends Logging { } else { -1 } - }) + }, false) } else { if (isCaseSensitive) { Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => @@ -152,7 +158,7 @@ object OrcUtils extends Logging { } else { -1 } - }) + }, true) } else { // Do case-insensitive resolution only if in case-insensitive mode val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT)) @@ -170,7 +176,7 @@ object OrcUtils extends Logging { idx } }.getOrElse(-1) - }) + }, true) } } } @@ -199,4 +205,25 @@ object OrcUtils extends Logging { s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>" case _ => dt.catalogString } + + /** + * @return Returns the result schema string based on the canPruneCols flag. + * resultSchemaString will be created using resultsSchema in case of + * canPruneCols is true and for canPruneCols as false value + * resultSchemaString will be created using the actual dataSchema. + */ + def orcResultSchemaString( + canPruneCols: Boolean, + dataSchema: StructType, + resultSchema: StructType, + partitionSchema: StructType, + conf: Configuration): String = { + val resultSchemaString = if (canPruneCols) { + OrcUtils.orcTypeDescriptionString(resultSchema) + } else { + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + resultSchemaString + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 03d58fdcb7f67..7f25f7bd135f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -66,24 +66,24 @@ case class OrcPartitionReaderFactory( override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val requestedColIdsOrEmptyFile = + val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (requestedColIdsOrEmptyFile.isEmpty) { + if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader[InternalRow] } else { - val requestedColIds = requestedColIdsOrEmptyFile.get + val (requestedColIds, canPruneCols) = resultedColPruneInfo.get + val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, + dataSchema, resultSchema, partitionSchema, conf) assert(requestedColIds.length == readDataSchema.length, "[BUG] requested column IDs do not match required schema") @@ -112,24 +112,25 @@ case class OrcPartitionReaderFactory( override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val requestedColIdsOrEmptyFile = + val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (requestedColIdsOrEmptyFile.isEmpty) { + if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader } else { - val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) + val (requestedDataColIds, canPruneCols) = resultedColPruneInfo.get + val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, + dataSchema, resultSchema, partitionSchema, conf) + val requestedColIds = requestedDataColIds ++ Array.fill(partitionSchema.length)(-1) assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 990d9425fb7fc..12ee5bea7c2f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -288,4 +288,32 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } } + + test("SPARK-32234 read ORC table with column names all starting with '_col'") { + Seq("native", "hive").foreach { orcImpl => + Seq("false", "true").foreach { vectorized => + withSQLConf( + SQLConf.ORC_IMPLEMENTATION.key -> orcImpl, + SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) { + withTable("test_hive_orc_impl") { + spark.sql( + s""" + | CREATE TABLE test_hive_orc_impl + | (_col1 INT, _col2 STRING, _col3 INT) + | STORED AS ORC + """.stripMargin) + spark.sql( + s""" + | INSERT INTO + | test_hive_orc_impl + | VALUES(9, '12', 2020) + """.stripMargin) + + val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") + checkAnswer(df, Row("12")) + } + } + } + } + } }