From 49a23502d60e1b9416d0f87484af1a72d60646a4 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Thu, 9 Jul 2020 01:08:37 +0530 Subject: [PATCH 01/18] add the code change to fix the sql query --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index fd791ce7c5e19..07451cf95da69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -160,11 +160,13 @@ class OrcFileFormat } val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) + val actualSchema = StructType(dataSchema.fields ++ partitionSchema.fields) val sqlConf = sparkSession.sessionState.conf val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) + val actualSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) @@ -209,7 +211,7 @@ class OrcFileFormat Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( - TypeDescription.fromString(resultSchemaString), + TypeDescription.fromString(actualSchemaString), resultSchema.fields, requestedDataColIds, requestedPartitionColIds, From 195bc232a51b9a8caefcc3569f69271553296da8 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Thu, 9 Jul 2020 21:56:06 +0530 Subject: [PATCH 02/18] handle the scenrios for orc file created by hive data --- .../datasources/orc/OrcFileFormat.scala | 17 ++++++-- .../orc/OrcColumnarBatchReaderSuite.scala | 39 +++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 07451cf95da69..7c4fda035fd8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.orc import java.io._ import java.net.URI +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.JobConf @@ -165,9 +167,7 @@ class OrcFileFormat val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize - val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) - val actualSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString) + var resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) val broadcastedConf = @@ -183,10 +183,19 @@ class OrcFileFormat val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val requestedColIdsOrEmptyFile = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => + // for ORC file written by Hive, no field names + // in the physical schema, there is a need to send the + // entire dataSchema instead of required schema + val orcFieldNames = reader.getSchema.getFieldNames.asScala + if (orcFieldNames.forall(_.startsWith("_col"))) { + resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) + } OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + if (requestedColIdsOrEmptyFile.isEmpty) { Iterator.empty } else { @@ -211,7 +220,7 @@ class OrcFileFormat Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( - TypeDescription.fromString(actualSchemaString), + TypeDescription.fromString(resultSchemaString), resultSchema.fields, requestedDataColIds, requestedPartitionColIds, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index 719bf91e1786b..357022ad92a75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -77,4 +77,43 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0)) } } + + test("orc data created by the hive tables having _col fields name") { + var error: Throwable = null + val table = """CREATE TABLE `test_date_hive_orc` + | (`_col1` INT,`_col2` STRING,`_col3` INT) + | USING orc""".stripMargin + spark.sql(table).collect + spark.sql("insert into test_date_hive_orc values(9, '12', 2020)").collect + val df = spark.sql("select _col2 from test_date_hive_orc") + try { + val data = df.collect() + assert(data.length == 1) + } catch { + case e: Throwable => + error = e + } + assert(error == null) + spark.sql(s"DROP TABLE IF EXISTS test_date_hive_orc") + } + + test("orc data created by the spark having proper fields name") { + var error: Throwable = null + val table = """CREATE TABLE `test_date_spark_orc` + | (`d_date_sk` INT,`d_date_id` STRING,`d_year` INT) + | USING orc""".stripMargin + spark.sql(table).collect + spark.sql("insert into test_date_spark_orc values(9, '12', 2020)").collect + val df = spark.sql("select d_date_id from test_date_spark_orc") + try { + val data = df.collect() + assert(data.length == 1) + } catch { + case e: Throwable => + error = e + } + assert(error == null) + spark.sql(s"DROP TABLE IF EXISTS test_date_spark_orc") + } + } From 9b51d67778c9b7805ed247e752d45364e910b6a8 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Fri, 10 Jul 2020 12:12:43 +0530 Subject: [PATCH 03/18] Refactor the unit test as per the review comments --- .../orc/OrcColumnarBatchReaderSuite.scala | 85 ++++++++++++------- 1 file changed, 53 insertions(+), 32 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index 357022ad92a75..74e148ed9cd82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.orc.TypeDescription -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.test.SharedSparkSession @@ -78,42 +78,63 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { } } - test("orc data created by the hive tables having _col fields name") { + test("SPARK-32234: orc data created by the hive tables having _col fields name") { var error: Throwable = null - val table = """CREATE TABLE `test_date_hive_orc` - | (`_col1` INT,`_col2` STRING,`_col3` INT) - | USING orc""".stripMargin - spark.sql(table).collect - spark.sql("insert into test_date_hive_orc values(9, '12', 2020)").collect - val df = spark.sql("select _col2 from test_date_hive_orc") - try { - val data = df.collect() - assert(data.length == 1) - } catch { - case e: Throwable => - error = e + withTable("test_date_hive_orc") { + spark.sql( + """ + |CREATE TABLE `test_date_hive_orc` + | (`_col1` INT,`_col2` STRING,`_col3` INT) + | USING orc + """.stripMargin) + spark.sql( + """insert into + | test_date_hive_orc + | values(9, '12', 2020) + """.stripMargin) + try { + val df = spark.sql("select _col2 from test_date_hive_orc") + checkAnswer(df, Row("12")) + } catch { + case e: Throwable => + error = e + } + assert(error == null) + spark.sql( + s""" + |DROP TABLE IF + | EXISTS test_date_hive_orc + """.stripMargin) } - assert(error == null) - spark.sql(s"DROP TABLE IF EXISTS test_date_hive_orc") } - test("orc data created by the spark having proper fields name") { + test("SPARK-32234: orc data created by the spark having proper fields name") { var error: Throwable = null - val table = """CREATE TABLE `test_date_spark_orc` - | (`d_date_sk` INT,`d_date_id` STRING,`d_year` INT) - | USING orc""".stripMargin - spark.sql(table).collect - spark.sql("insert into test_date_spark_orc values(9, '12', 2020)").collect - val df = spark.sql("select d_date_id from test_date_spark_orc") - try { - val data = df.collect() - assert(data.length == 1) - } catch { - case e: Throwable => - error = e + withTable("test_date_spark_orc") { + spark.sql( + """ + |CREATE TABLE `test_date_spark_orc` + | (`d_date_sk` INT,`d_date_id` STRING,`d_year` INT) + | USING orc + """.stripMargin) + spark.sql( + """insert into + | test_date_spark_orc + | values(9, '12', 2020) + """.stripMargin) + try { + val df = spark.sql("select d_date_id from test_date_spark_orc") + checkAnswer(df, Row("12")) + } catch { + case e: Throwable => + error = e + } + assert(error == null) + spark.sql( + s""" + |DROP TABLE IF + | EXISTS test_date_spark_orc + """.stripMargin) } - assert(error == null) - spark.sql(s"DROP TABLE IF EXISTS test_date_spark_orc") } - } From b9282d029d3cbd7324ee889743774848383d0779 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sat, 11 Jul 2020 12:18:17 +0530 Subject: [PATCH 04/18] Remove drop table from the test as its already handled in withTable --- .../orc/OrcColumnarBatchReaderSuite.scala | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index 74e148ed9cd82..f72599a1388ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -92,24 +92,13 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { | test_date_hive_orc | values(9, '12', 2020) """.stripMargin) - try { - val df = spark.sql("select _col2 from test_date_hive_orc") - checkAnswer(df, Row("12")) - } catch { - case e: Throwable => - error = e - } - assert(error == null) - spark.sql( - s""" - |DROP TABLE IF - | EXISTS test_date_hive_orc - """.stripMargin) + + val df = spark.sql("select _col2 from test_date_hive_orc") + checkAnswer(df, Row("12")) } } test("SPARK-32234: orc data created by the spark having proper fields name") { - var error: Throwable = null withTable("test_date_spark_orc") { spark.sql( """ @@ -122,19 +111,9 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { | test_date_spark_orc | values(9, '12', 2020) """.stripMargin) - try { - val df = spark.sql("select d_date_id from test_date_spark_orc") - checkAnswer(df, Row("12")) - } catch { - case e: Throwable => - error = e - } - assert(error == null) - spark.sql( - s""" - |DROP TABLE IF - | EXISTS test_date_spark_orc - """.stripMargin) + + val df = spark.sql("select d_date_id from test_date_spark_orc") + checkAnswer(df, Row("12")) } } } From ff938a562a0605e0ad76e36e8faa1098957fb127 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sat, 11 Jul 2020 13:50:24 +0530 Subject: [PATCH 05/18] Refactor the code change as per the review comments --- .../orc/OrcColumnarBatchReaderSuite.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index f72599a1388ea..f335c2a7a1233 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -79,21 +79,21 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { } test("SPARK-32234: orc data created by the hive tables having _col fields name") { - var error: Throwable = null withTable("test_date_hive_orc") { spark.sql( """ - |CREATE TABLE `test_date_hive_orc` - | (`_col1` INT,`_col2` STRING,`_col3` INT) + | CREATE TABLE test_date_hive_orc + | (_col1 INT, _col2 STRING, _col3 INT) | USING orc """.stripMargin) spark.sql( - """insert into + """ + | INSERT INTO | test_date_hive_orc - | values(9, '12', 2020) + | VALUES(9, '12', 2020) """.stripMargin) - val df = spark.sql("select _col2 from test_date_hive_orc") + val df = spark.sql("SELECT _col2 from test_date_hive_orc") checkAnswer(df, Row("12")) } } @@ -102,17 +102,18 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { withTable("test_date_spark_orc") { spark.sql( """ - |CREATE TABLE `test_date_spark_orc` - | (`d_date_sk` INT,`d_date_id` STRING,`d_year` INT) - | USING orc + | CREATE TABLE test_date_spark_orc + | (d_date_sk INT, d_date_id STRING, d_year INT) + | USING orc """.stripMargin) spark.sql( - """insert into + """ + | INSERT INTO | test_date_spark_orc - | values(9, '12', 2020) + | VALUES(9, '12', 2020) """.stripMargin) - val df = spark.sql("select d_date_id from test_date_spark_orc") + val df = spark.sql("SELECT d_date_id FROM test_date_spark_orc") checkAnswer(df, Row("12")) } } From 45be04884e90249e6813d02023258c2f8d1df9fe Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sat, 11 Jul 2020 20:56:16 +0530 Subject: [PATCH 06/18] add the unit test for ORC_VECTORIZED_READER_ENABLED --- .../orc/OrcColumnarBatchReaderSuite.scala | 58 +++++++------------ .../sql/hive/orc/HiveOrcQuerySuite.scala | 27 +++++++++ 2 files changed, 49 insertions(+), 36 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index f335c2a7a1233..afb703898cf89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -22,6 +22,7 @@ import org.apache.orc.TypeDescription import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.unsafe.types.UTF8String.fromString @@ -78,43 +79,28 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-32234: orc data created by the hive tables having _col fields name") { - withTable("test_date_hive_orc") { - spark.sql( - """ - | CREATE TABLE test_date_hive_orc - | (_col1 INT, _col2 STRING, _col3 INT) - | USING orc - """.stripMargin) - spark.sql( - """ - | INSERT INTO - | test_date_hive_orc - | VALUES(9, '12', 2020) - """.stripMargin) + test("SPARK-32234: orc data created by the hive tables having _col fields" + + " name for vectorized reader") { + Seq(false, true).foreach { vectorized => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + withTable("test_hive_orc_vect_read") { + spark.sql( + """ + | CREATE TABLE test_hive_orc_vect_read + | (_col1 INT, _col2 STRING, _col3 INT) + | USING orc + """.stripMargin) + spark.sql( + """ + | INSERT INTO + | test_hive_orc_vect_read + | VALUES(9, '12', 2020) + """.stripMargin) - val df = spark.sql("SELECT _col2 from test_date_hive_orc") - checkAnswer(df, Row("12")) - } - } - - test("SPARK-32234: orc data created by the spark having proper fields name") { - withTable("test_date_spark_orc") { - spark.sql( - """ - | CREATE TABLE test_date_spark_orc - | (d_date_sk INT, d_date_id STRING, d_year INT) - | USING orc - """.stripMargin) - spark.sql( - """ - | INSERT INTO - | test_date_spark_orc - | VALUES(9, '12', 2020) - """.stripMargin) - - val df = spark.sql("SELECT d_date_id FROM test_date_spark_orc") - checkAnswer(df, Row("12")) + val df = spark.sql("SELECT _col2 FROM test_hive_orc_vect_read") + checkAnswer(df, Row("12")) + } + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 990d9425fb7fc..15c50aea714af 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -288,4 +288,31 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } } + + test("SPARK-32234: orc data created by the hive tables having _col fields name" + + " for ORC_IMPLEMENTATION") { + Seq("native", "hive").foreach { orcImpl => + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImpl) { + withTempPath { dir => + withTable("test_hive_orc_impl") { + spark.sql( + s""" + | CREATE TABLE test_hive_orc_impl + | (_col1 INT, _col2 STRING, _col3 INT) + | STORED AS ORC LOCATION '$dir' + """.stripMargin) + spark.sql( + """ + | INSERT INTO + | test_hive_orc_impl + | VALUES(9, '12', 2020) + """.stripMargin) + + val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") + checkAnswer(df, Row("12")) + } + } + } + } + } } From c2ca484902905e87be95f43b2643eb21fccd061b Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sun, 12 Jul 2020 11:42:56 +0530 Subject: [PATCH 07/18] add the code change to handle the schema sent through requestedColumnIds --- .../datasources/orc/OrcFileFormat.scala | 14 ++++------ .../execution/datasources/orc/OrcUtils.scala | 26 ++++++++++------- .../v2/orc/OrcPartitionReaderFactory.scala | 9 +++--- .../orc/OrcColumnarBatchReaderSuite.scala | 28 +------------------ .../sql/hive/orc/HiveOrcQuerySuite.scala | 25 +++++++++++++++++ 5 files changed, 52 insertions(+), 50 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 7c4fda035fd8c..8a6f43bd34c0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -183,23 +183,19 @@ class OrcFileFormat val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val requestedColIdsOrEmptyFile = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => - // for ORC file written by Hive, no field names - // in the physical schema, there is a need to send the - // entire dataSchema instead of required schema - val orcFieldNames = reader.getSchema.getFieldNames.asScala - if (orcFieldNames.forall(_.startsWith("_col"))) { - resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) - } OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) } + if (requestedColIdsOrEmptyFile._2) { + resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) + } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - if (requestedColIdsOrEmptyFile.isEmpty) { + if (requestedColIdsOrEmptyFile._1.isEmpty) { Iterator.empty } else { - val requestedColIds = requestedColIdsOrEmptyFile.get + val requestedColIds = requestedColIdsOrEmptyFile._1.get assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index d274bcd0edd2c..868033481dd5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -116,19 +116,21 @@ object OrcUtils extends Logging { } /** - * Returns the requested column ids from the given ORC file. Column id can be -1, which means the - * requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty. + * Returns the requested column ids from the given ORC file and Boolean flag to use actual + * schema or result scehma. Column id can be -1, which means the requested column doesn't + * exist in the ORC file. Returns None if the given ORC file is empty. */ def requestedColumnIds( isCaseSensitive: Boolean, dataSchema: StructType, requiredSchema: StructType, reader: Reader, - conf: Configuration): Option[Array[Int]] = { + conf: Configuration): (Option[Array[Int]], Boolean) = { + var sendActualSchema = false val orcFieldNames = reader.getSchema.getFieldNames.asScala if (orcFieldNames.isEmpty) { // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. - None + (None, sendActualSchema) } else { if (orcFieldNames.forall(_.startsWith("_col"))) { // This is a ORC file written by Hive, no field names in the physical schema, assume the @@ -136,27 +138,31 @@ object OrcUtils extends Logging { assert(orcFieldNames.length <= dataSchema.length, "The given data schema " + s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " + "no idea which columns were dropped, fail to read.") - Some(requiredSchema.fieldNames.map { name => + (Some(requiredSchema.fieldNames.map { name => val index = dataSchema.fieldIndex(name) if (index < orcFieldNames.length) { + // for ORC file written by Hive, no field names + // in the physical schema, there is a need to send the + // entire dataSchema instead of required schema + sendActualSchema = true index } else { -1 } - }) + }), sendActualSchema) } else { if (isCaseSensitive) { - Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => + (Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => if (orcFieldNames.indexWhere(caseSensitiveResolution(_, name)) != -1) { idx } else { -1 } - }) + }), sendActualSchema) } else { // Do case-insensitive resolution only if in case-insensitive mode val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT)) - Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) => + (Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) => caseInsensitiveOrcFieldMap .get(requiredFieldName.toLowerCase(Locale.ROOT)) .map { matchedOrcFields => @@ -170,7 +176,7 @@ object OrcUtils extends Logging { idx } }.getOrElse(-1) - }) + }), sendActualSchema) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 03d58fdcb7f67..11943600ecdda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -80,10 +80,10 @@ case class OrcPartitionReaderFactory( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (requestedColIdsOrEmptyFile.isEmpty) { + if (requestedColIdsOrEmptyFile._1.isEmpty) { new EmptyPartitionReader[InternalRow] } else { - val requestedColIds = requestedColIdsOrEmptyFile.get + val requestedColIds = requestedColIdsOrEmptyFile._1.get assert(requestedColIds.length == readDataSchema.length, "[BUG] requested column IDs do not match required schema") @@ -126,10 +126,11 @@ case class OrcPartitionReaderFactory( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (requestedColIdsOrEmptyFile.isEmpty) { + if (requestedColIdsOrEmptyFile._1.isEmpty) { new EmptyPartitionReader } else { - val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) + val requestedColIds = + requestedColIdsOrEmptyFile._1.get ++ Array.fill(partitionSchema.length)(-1) assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index afb703898cf89..719bf91e1786b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.orc.TypeDescription -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.unsafe.types.UTF8String.fromString @@ -78,29 +77,4 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0)) } } - - test("SPARK-32234: orc data created by the hive tables having _col fields" + - " name for vectorized reader") { - Seq(false, true).foreach { vectorized => - withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - withTable("test_hive_orc_vect_read") { - spark.sql( - """ - | CREATE TABLE test_hive_orc_vect_read - | (_col1 INT, _col2 STRING, _col3 INT) - | USING orc - """.stripMargin) - spark.sql( - """ - | INSERT INTO - | test_hive_orc_vect_read - | VALUES(9, '12', 2020) - """.stripMargin) - - val df = spark.sql("SELECT _col2 FROM test_hive_orc_vect_read") - checkAnswer(df, Row("12")) - } - } - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 15c50aea714af..e00b2daf76e0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -289,6 +289,31 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } + test("SPARK-32234: orc data created by the hive tables having _col fields" + + " name for vectorized reader") { + Seq(false, true).foreach { vectorized => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + withTable("test_hive_orc_vect_read") { + spark.sql( + """ + | CREATE TABLE test_hive_orc_vect_read + | (_col1 INT, _col2 STRING, _col3 INT) + | USING orc + """.stripMargin) + spark.sql( + """ + | INSERT INTO + | test_hive_orc_vect_read + | VALUES(9, '12', 2020) + """.stripMargin) + + val df = spark.sql("SELECT _col2 FROM test_hive_orc_vect_read") + checkAnswer(df, Row("12")) + } + } + } + } + test("SPARK-32234: orc data created by the hive tables having _col fields name" + " for ORC_IMPLEMENTATION") { Seq("native", "hive").foreach { orcImpl => From f8f7aff7d95cf33f14ee066d199f7f7a7418bbe3 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sun, 12 Jul 2020 13:03:42 +0530 Subject: [PATCH 08/18] removed the extra import from OrcFileFormat.scala --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 2 -- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 8a6f43bd34c0b..29635aec6d367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.orc import java.io._ import java.net.URI -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.JobConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 868033481dd5b..496c7e57ff4af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -117,7 +117,7 @@ object OrcUtils extends Logging { /** * Returns the requested column ids from the given ORC file and Boolean flag to use actual - * schema or result scehma. Column id can be -1, which means the requested column doesn't + * schema or result schema. Column id can be -1, which means the requested column doesn't * exist in the ORC file. Returns None if the given ORC file is empty. */ def requestedColumnIds( From c2d7a217d407436b36efa7bbf67c3a0595fbc8c9 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Mon, 13 Jul 2020 10:01:19 +0530 Subject: [PATCH 09/18] refactor the code using seprate varible for sendActualSchema --- .../execution/datasources/orc/OrcFileFormat.scala | 8 ++++---- .../sql/execution/datasources/orc/OrcUtils.scala | 2 +- .../v2/orc/OrcPartitionReaderFactory.scala | 12 ++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 29635aec6d367..18fcf57633cad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -179,21 +179,21 @@ class OrcFileFormat val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val requestedColIdsOrEmptyFile = + val (requestedColIdsOrEmptyFile, sendActualSchema) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) } - if (requestedColIdsOrEmptyFile._2) { + if (sendActualSchema) { resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - if (requestedColIdsOrEmptyFile._1.isEmpty) { + if (requestedColIdsOrEmptyFile.isEmpty) { Iterator.empty } else { - val requestedColIds = requestedColIdsOrEmptyFile._1.get + val requestedColIds = requestedColIdsOrEmptyFile.get assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 496c7e57ff4af..66db2c4316d04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -116,7 +116,7 @@ object OrcUtils extends Logging { } /** - * Returns the requested column ids from the given ORC file and Boolean flag to use actual + * @return Returns the requested column ids from the given ORC file and Boolean flag to use actual * schema or result schema. Column id can be -1, which means the requested column doesn't * exist in the ORC file. Returns None if the given ORC file is empty. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 11943600ecdda..9f522aeb9fb7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -74,16 +74,16 @@ case class OrcPartitionReaderFactory( val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val requestedColIdsOrEmptyFile = + val (requestedColIdsOrEmptyFile, sendActualSchema) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (requestedColIdsOrEmptyFile._1.isEmpty) { + if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader[InternalRow] } else { - val requestedColIds = requestedColIdsOrEmptyFile._1.get + val requestedColIds = requestedColIdsOrEmptyFile.get assert(requestedColIds.length == readDataSchema.length, "[BUG] requested column IDs do not match required schema") @@ -120,17 +120,17 @@ case class OrcPartitionReaderFactory( val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val requestedColIdsOrEmptyFile = + val (requestedColIdsOrEmptyFile, sendActualSchema) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (requestedColIdsOrEmptyFile._1.isEmpty) { + if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader } else { val requestedColIds = - requestedColIdsOrEmptyFile._1.get ++ Array.fill(partitionSchema.length)(-1) + requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) From 4469e6d1c5970bc5de3d4a4dc728e9775e43e141 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Mon, 13 Jul 2020 10:23:37 +0530 Subject: [PATCH 10/18] Merge the orcimpl and vectorised test for complete code coverage --- .../v2/orc/OrcPartitionReaderFactory.scala | 7 +-- .../sql/hive/orc/HiveOrcQuerySuite.scala | 63 +++++++------------ 2 files changed, 24 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 9f522aeb9fb7c..1f3b2f7712df3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -74,7 +74,7 @@ case class OrcPartitionReaderFactory( val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, sendActualSchema) = + val (requestedColIdsOrEmptyFile, _) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) @@ -120,7 +120,7 @@ case class OrcPartitionReaderFactory( val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, sendActualSchema) = + val (requestedColIdsOrEmptyFile, _) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) @@ -129,8 +129,7 @@ case class OrcPartitionReaderFactory( if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader } else { - val requestedColIds = - requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) + val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index e00b2daf76e0a..b8f1e77b142c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -289,52 +289,31 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } - test("SPARK-32234: orc data created by the hive tables having _col fields" + - " name for vectorized reader") { - Seq(false, true).foreach { vectorized => - withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - withTable("test_hive_orc_vect_read") { - spark.sql( - """ - | CREATE TABLE test_hive_orc_vect_read - | (_col1 INT, _col2 STRING, _col3 INT) - | USING orc - """.stripMargin) - spark.sql( - """ - | INSERT INTO - | test_hive_orc_vect_read - | VALUES(9, '12', 2020) - """.stripMargin) - - val df = spark.sql("SELECT _col2 FROM test_hive_orc_vect_read") - checkAnswer(df, Row("12")) - } - } - } - } - test("SPARK-32234: orc data created by the hive tables having _col fields name" + " for ORC_IMPLEMENTATION") { Seq("native", "hive").foreach { orcImpl => - withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImpl) { - withTempPath { dir => - withTable("test_hive_orc_impl") { - spark.sql( - s""" - | CREATE TABLE test_hive_orc_impl - | (_col1 INT, _col2 STRING, _col3 INT) - | STORED AS ORC LOCATION '$dir' - """.stripMargin) - spark.sql( - """ - | INSERT INTO - | test_hive_orc_impl - | VALUES(9, '12', 2020) + Seq("false", "true").foreach { vectorized => + withSQLConf( + SQLConf.ORC_IMPLEMENTATION.key -> orcImpl, + SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) { + withTempPath { dir => + withTable("test_hive_orc_impl") { + spark.sql( + s""" + | CREATE TABLE test_hive_orc_impl + | (_col1 INT, _col2 STRING, _col3 INT) + | STORED AS ORC LOCATION '$dir' """.stripMargin) - - val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") - checkAnswer(df, Row("12")) + spark.sql( + """ + | INSERT INTO + | test_hive_orc_impl + | VALUES(9, '12', 2020) + """.stripMargin) + + val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") + checkAnswer(df, Row("12")) + } } } } From 9de35164528dc67a23b5137d0a5e81dd9371d6e4 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Tue, 14 Jul 2020 23:48:19 +0530 Subject: [PATCH 11/18] reafactor the code and add similar logic in file source v2 code path --- .../datasources/orc/OrcFileFormat.scala | 4 ++-- .../execution/datasources/orc/OrcUtils.scala | 12 +++++----- .../v2/orc/OrcPartitionReaderFactory.scala | 22 ++++++++++++++----- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 18fcf57633cad..1f804d7d0dbf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -179,13 +179,13 @@ class OrcFileFormat val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, sendActualSchema) = + val (requestedColIdsOrEmptyFile, canPruneCols) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) } - if (sendActualSchema) { + if (canPruneCols) { resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 66db2c4316d04..fa7a992d04980 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -126,11 +126,11 @@ object OrcUtils extends Logging { requiredSchema: StructType, reader: Reader, conf: Configuration): (Option[Array[Int]], Boolean) = { - var sendActualSchema = false + var canPruneCols = false val orcFieldNames = reader.getSchema.getFieldNames.asScala if (orcFieldNames.isEmpty) { // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. - (None, sendActualSchema) + (None, canPruneCols) } else { if (orcFieldNames.forall(_.startsWith("_col"))) { // This is a ORC file written by Hive, no field names in the physical schema, assume the @@ -144,12 +144,12 @@ object OrcUtils extends Logging { // for ORC file written by Hive, no field names // in the physical schema, there is a need to send the // entire dataSchema instead of required schema - sendActualSchema = true + canPruneCols = true index } else { -1 } - }), sendActualSchema) + }), canPruneCols) } else { if (isCaseSensitive) { (Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => @@ -158,7 +158,7 @@ object OrcUtils extends Logging { } else { -1 } - }), sendActualSchema) + }), canPruneCols) } else { // Do case-insensitive resolution only if in case-insensitive mode val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT)) @@ -176,7 +176,7 @@ object OrcUtils extends Logging { idx } }.getOrElse(-1) - }), sendActualSchema) + }), canPruneCols) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 1f3b2f7712df3..7e62af70d6043 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -54,6 +54,7 @@ case class OrcPartitionReaderFactory( readDataSchema: StructType, partitionSchema: StructType) extends FilePartitionReaderFactory { private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields) + private val actualSchema = StructType(dataSchema.fields ++ partitionSchema.fields) private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val capacity = sqlConf.orcVectorizedReaderBatchSize @@ -66,20 +67,24 @@ case class OrcPartitionReaderFactory( override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + var resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, _) = + val (requestedColIdsOrEmptyFile, canPruneCols) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } + if (canPruneCols) { + resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader[InternalRow] } else { @@ -112,20 +117,25 @@ case class OrcPartitionReaderFactory( override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + var resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, _) = + val (requestedColIdsOrEmptyFile, canPruneCols) = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } + if (canPruneCols) { + resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + + if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader } else { From f8ece1fe1eeb9e2cdf2c591dc7e2cfc9af002340 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 15 Jul 2020 00:12:39 +0530 Subject: [PATCH 12/18] refactor the code --- .../sql/execution/datasources/orc/OrcFileFormat.scala | 2 +- .../spark/sql/execution/datasources/orc/OrcUtils.scala | 10 +++++----- .../datasources/v2/orc/OrcPartitionReaderFactory.scala | 5 ++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 1f804d7d0dbf9..322a1b8138c07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -185,7 +185,7 @@ class OrcFileFormat isCaseSensitive, dataSchema, requiredSchema, reader, conf) } - if (canPruneCols) { + if (!canPruneCols) { resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index fa7a992d04980..3381935d4c779 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -126,7 +126,7 @@ object OrcUtils extends Logging { requiredSchema: StructType, reader: Reader, conf: Configuration): (Option[Array[Int]], Boolean) = { - var canPruneCols = false + var canPruneCols = true val orcFieldNames = reader.getSchema.getFieldNames.asScala if (orcFieldNames.isEmpty) { // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. @@ -138,13 +138,13 @@ object OrcUtils extends Logging { assert(orcFieldNames.length <= dataSchema.length, "The given data schema " + s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " + "no idea which columns were dropped, fail to read.") + // for ORC file written by Hive, no field names + // in the physical schema, there is a need to send the + // entire dataSchema instead of required schema + canPruneCols = false (Some(requiredSchema.fieldNames.map { name => val index = dataSchema.fieldIndex(name) if (index < orcFieldNames.length) { - // for ORC file written by Hive, no field names - // in the physical schema, there is a need to send the - // entire dataSchema instead of required schema - canPruneCols = true index } else { -1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 7e62af70d6043..6858a80b848dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -80,7 +80,7 @@ case class OrcPartitionReaderFactory( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (canPruneCols) { + if (!canPruneCols) { resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) @@ -130,12 +130,11 @@ case class OrcPartitionReaderFactory( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (canPruneCols) { + if (!canPruneCols) { resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader } else { From 5b8715e8862c5f0d8c3b0f0d2f8a930fb46db7bd Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 15 Jul 2020 13:18:29 +0530 Subject: [PATCH 13/18] Refactor some method return type and use it the code --- .../datasources/orc/OrcFileFormat.scala | 19 ++++----- .../execution/datasources/orc/OrcUtils.scala | 21 +++++----- .../v2/orc/OrcPartitionReaderFactory.scala | 42 +++++++++---------- 3 files changed, 40 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 322a1b8138c07..b66d4f1b3c2dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -160,12 +160,10 @@ class OrcFileFormat } val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) - val actualSchema = StructType(dataSchema.fields ++ partitionSchema.fields) val sqlConf = sparkSession.sessionState.conf val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize - var resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) val broadcastedConf = @@ -179,21 +177,22 @@ class OrcFileFormat val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, canPruneCols) = + val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, requiredSchema, reader, conf) } - if (!canPruneCols) { - resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - - if (requestedColIdsOrEmptyFile.isEmpty) { + if (resultedColPruneInfo.isEmpty) { Iterator.empty } else { - val requestedColIds = requestedColIdsOrEmptyFile.get + val (requestedColIds, canPruneCols) = resultedColPruneInfo.get + val resultSchemaString = if (canPruneCols) { + OrcUtils.orcTypeDescriptionString(resultSchema) + } else { + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 3381935d4c779..93f1bad4d68c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -125,12 +125,11 @@ object OrcUtils extends Logging { dataSchema: StructType, requiredSchema: StructType, reader: Reader, - conf: Configuration): (Option[Array[Int]], Boolean) = { - var canPruneCols = true + conf: Configuration): Option[(Array[Int], Boolean)] = { val orcFieldNames = reader.getSchema.getFieldNames.asScala if (orcFieldNames.isEmpty) { // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. - (None, canPruneCols) + None } else { if (orcFieldNames.forall(_.startsWith("_col"))) { // This is a ORC file written by Hive, no field names in the physical schema, assume the @@ -140,29 +139,29 @@ object OrcUtils extends Logging { "no idea which columns were dropped, fail to read.") // for ORC file written by Hive, no field names // in the physical schema, there is a need to send the - // entire dataSchema instead of required schema - canPruneCols = false - (Some(requiredSchema.fieldNames.map { name => + // entire dataSchema instead of required schema. + // So pruneCols is not done in this case + Some(requiredSchema.fieldNames.map { name => val index = dataSchema.fieldIndex(name) if (index < orcFieldNames.length) { index } else { -1 } - }), canPruneCols) + }, false) } else { if (isCaseSensitive) { - (Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => + Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => if (orcFieldNames.indexWhere(caseSensitiveResolution(_, name)) != -1) { idx } else { -1 } - }), canPruneCols) + }, true) } else { // Do case-insensitive resolution only if in case-insensitive mode val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT)) - (Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) => + Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) => caseInsensitiveOrcFieldMap .get(requiredFieldName.toLowerCase(Locale.ROOT)) .map { matchedOrcFields => @@ -176,7 +175,7 @@ object OrcUtils extends Logging { idx } }.getOrElse(-1) - }), canPruneCols) + }, true) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 6858a80b848dd..aee107a8dd475 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -54,7 +54,6 @@ case class OrcPartitionReaderFactory( readDataSchema: StructType, partitionSchema: StructType) extends FilePartitionReaderFactory { private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields) - private val actualSchema = StructType(dataSchema.fields ++ partitionSchema.fields) private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val capacity = sqlConf.orcVectorizedReaderBatchSize @@ -67,28 +66,28 @@ case class OrcPartitionReaderFactory( override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - var resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, canPruneCols) = + val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (!canPruneCols) { - resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - - if (requestedColIdsOrEmptyFile.isEmpty) { + if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader[InternalRow] } else { - val requestedColIds = requestedColIdsOrEmptyFile.get + val (requestedColIds, canPruneCols) = resultedColPruneInfo.get + val resultSchemaString = if (canPruneCols) { + OrcUtils.orcTypeDescriptionString(resultSchema) + } else { + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) assert(requestedColIds.length == readDataSchema.length, "[BUG] requested column IDs do not match required schema") @@ -117,29 +116,30 @@ case class OrcPartitionReaderFactory( override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - var resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val (requestedColIdsOrEmptyFile, canPruneCols) = + val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, reader, conf) } - if (!canPruneCols) { - resultSchemaString = OrcUtils.orcTypeDescriptionString(actualSchema) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - - if (requestedColIdsOrEmptyFile.isEmpty) { + if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader } else { - val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) - assert(requestedColIds.length == resultSchema.length, + val (requestedColIds, canPruneCols) = resultedColPruneInfo.get + val resultSchemaString = if (canPruneCols) { + OrcUtils.orcTypeDescriptionString(resultSchema) + } else { + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) + assert(requestedDataColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) @@ -155,7 +155,7 @@ case class OrcPartitionReaderFactory( batchReader.initBatch( TypeDescription.fromString(resultSchemaString), resultSchema.fields, - requestedColIds, + requestedDataColIds, requestedPartitionColIds, file.partitionValues) new PartitionRecordReader(batchReader) From d0f6b9b5daacd72fb36ef42117a6651492685bf9 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 15 Jul 2020 14:30:02 +0530 Subject: [PATCH 14/18] Removed the code of location for create table in unit test --- .../sql/hive/orc/HiveOrcQuerySuite.scala | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index b8f1e77b142c1..7790f2a3eff39 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -296,24 +296,22 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { withSQLConf( SQLConf.ORC_IMPLEMENTATION.key -> orcImpl, SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) { - withTempPath { dir => - withTable("test_hive_orc_impl") { - spark.sql( - s""" - | CREATE TABLE test_hive_orc_impl - | (_col1 INT, _col2 STRING, _col3 INT) - | STORED AS ORC LOCATION '$dir' + withTable("test_hive_orc_impl") { + spark.sql( + """ + | CREATE TABLE test_hive_orc_impl + | (_col1 INT, _col2 STRING, _col3 INT) + | STORED AS ORC """.stripMargin) - spark.sql( - """ - | INSERT INTO - | test_hive_orc_impl - | VALUES(9, '12', 2020) - """.stripMargin) - - val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") - checkAnswer(df, Row("12")) - } + spark.sql( + """ + | INSERT INTO + | test_hive_orc_impl + | VALUES(9, '12', 2020) + """.stripMargin) + + val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") + checkAnswer(df, Row("12")) } } } From c79794f43ee7cc5431463a667799c4ff2fb37cc2 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 15 Jul 2020 16:18:21 +0530 Subject: [PATCH 15/18] fix the scala style in the unit test --- .../spark/sql/hive/orc/HiveOrcQuerySuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 7790f2a3eff39..6173009909324 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -298,17 +298,17 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) { withTable("test_hive_orc_impl") { spark.sql( - """ + s""" | CREATE TABLE test_hive_orc_impl | (_col1 INT, _col2 STRING, _col3 INT) | STORED AS ORC - """.stripMargin) + """.stripMargin) spark.sql( - """ - | INSERT INTO - | test_hive_orc_impl - | VALUES(9, '12', 2020) - """.stripMargin) + s""" + | INSERT INTO + | test_hive_orc_impl + | VALUES(9, '12', 2020) + """.stripMargin) val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl") checkAnswer(df, Row("12")) From cf6872989fdcb5396357c0e4cd3b3529e1334e6a Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 15 Jul 2020 19:53:50 +0530 Subject: [PATCH 16/18] created new method for duplicate code --- .../datasources/orc/OrcFileFormat.scala | 8 ++--- .../execution/datasources/orc/OrcUtils.scala | 30 ++++++++++++++++--- .../v2/orc/OrcPartitionReaderFactory.scala | 16 +++------- .../sql/hive/orc/HiveOrcQuerySuite.scala | 3 +- 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index b66d4f1b3c2dd..4dff1ec7ebfb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -187,12 +187,8 @@ class OrcFileFormat Iterator.empty } else { val (requestedColIds, canPruneCols) = resultedColPruneInfo.get - val resultSchemaString = if (canPruneCols) { - OrcUtils.orcTypeDescriptionString(resultSchema) - } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, + dataSchema, resultSchema, partitionSchema, conf) assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 93f1bad4d68c7..e10253989788b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer} +import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer} import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -116,9 +116,10 @@ object OrcUtils extends Logging { } /** - * @return Returns the requested column ids from the given ORC file and Boolean flag to use actual - * schema or result schema. Column id can be -1, which means the requested column doesn't - * exist in the ORC file. Returns None if the given ORC file is empty. + * @return Returns the combination of requested column ids from the given ORC file and + * boolean flag to find if the pruneCols is allowed or not. Requested Column id can be + * -1, which means the requested column doesn't exist in the ORC file. Returns None + * if the given ORC file is empty. */ def requestedColumnIds( isCaseSensitive: Boolean, @@ -204,4 +205,25 @@ object OrcUtils extends Logging { s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>" case _ => dt.catalogString } + + /** + * @return Returns the result schema string based on the canPruneCols flag. + * resultSchemaString will be created using resultsSchema in case of + * canPruneCols is true and for canPruneCols as false value + * resultSchemaString will be created using the actual dataSchema. + */ + def orcResultSchemaString( + canPruneCols: Boolean, + dataSchema: StructType, + resultSchema: StructType, + partitionSchema: StructType, + conf: Configuration): String = { + val resultSchemaString = if (canPruneCols) { + OrcUtils.orcTypeDescriptionString(resultSchema) + } else { + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + } + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + resultSchemaString + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index aee107a8dd475..bbcdbd795f5c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -82,12 +82,8 @@ case class OrcPartitionReaderFactory( new EmptyPartitionReader[InternalRow] } else { val (requestedColIds, canPruneCols) = resultedColPruneInfo.get - val resultSchemaString = if (canPruneCols) { - OrcUtils.orcTypeDescriptionString(resultSchema) - } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, + dataSchema, resultSchema, partitionSchema, conf) assert(requestedColIds.length == readDataSchema.length, "[BUG] requested column IDs do not match required schema") @@ -132,12 +128,8 @@ case class OrcPartitionReaderFactory( new EmptyPartitionReader } else { val (requestedColIds, canPruneCols) = resultedColPruneInfo.get - val resultSchemaString = if (canPruneCols) { - OrcUtils.orcTypeDescriptionString(resultSchema) - } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) + val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, + dataSchema, resultSchema, partitionSchema, conf) val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) assert(requestedDataColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 6173009909324..fcb7b3373ac78 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -289,8 +289,7 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } - test("SPARK-32234: orc data created by the hive tables having _col fields name" + - " for ORC_IMPLEMENTATION") { + test("SPARK-32234: read ORC table with column names all starting with '_col'") { Seq("native", "hive").foreach { orcImpl => Seq("false", "true").foreach { vectorized => withSQLConf( From 6150b08470384aa2280c5ffd141971aa6ff52410 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 15 Jul 2020 22:41:30 +0530 Subject: [PATCH 17/18] switch the name of requestedColIds and requestedDataColIds --- .../datasources/v2/orc/OrcPartitionReaderFactory.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index bbcdbd795f5c7..7f25f7bd135f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -127,11 +127,11 @@ case class OrcPartitionReaderFactory( if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader } else { - val (requestedColIds, canPruneCols) = resultedColPruneInfo.get + val (requestedDataColIds, canPruneCols) = resultedColPruneInfo.get val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, dataSchema, resultSchema, partitionSchema, conf) - val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) - assert(requestedDataColIds.length == resultSchema.length, + val requestedColIds = requestedDataColIds ++ Array.fill(partitionSchema.length)(-1) + assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) @@ -147,7 +147,7 @@ case class OrcPartitionReaderFactory( batchReader.initBatch( TypeDescription.fromString(resultSchemaString), resultSchema.fields, - requestedDataColIds, + requestedColIds, requestedPartitionColIds, file.partitionValues) new PartitionRecordReader(batchReader) From c0f62095f272aed48f0055afe8e7777a2f90fc54 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Thu, 16 Jul 2020 10:36:55 +0530 Subject: [PATCH 18/18] fix the name of the unit test --- .../scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index fcb7b3373ac78..12ee5bea7c2f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -289,7 +289,7 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } - test("SPARK-32234: read ORC table with column names all starting with '_col'") { + test("SPARK-32234 read ORC table with column names all starting with '_col'") { Seq("native", "hive").foreach { orcImpl => Seq("false", "true").foreach { vectorized => withSQLConf(