From 083f6ec70b2f03d2efc36ccfb00e67af6d39b9d8 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sun, 10 Mar 2019 11:25:34 +0800 Subject: [PATCH 01/15] orc merge schema --- .../apache/spark/sql/internal/SQLConf.scala | 8 ++ .../datasources/orc/OrcFileFormat.scala | 2 +- .../datasources/orc/OrcOptions.scala | 11 +++ .../execution/datasources/orc/OrcUtils.scala | 86 ++++++++++++++++++- .../datasources/v2/orc/OrcTable.scala | 4 +- .../datasources/orc/OrcUtilsSuite.scala | 65 ++++++++++++++ .../spark/sql/hive/orc/OrcFileFormat.scala | 21 +++-- .../spark/sql/hive/orc/OrcFileOperator.scala | 13 +++ .../sql/hive/orc/HiveOrcFileFormatSuite.scala | 69 +++++++++++++++ 9 files changed, 270 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71c830207701..d6fcead43547 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -552,6 +552,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ORC_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.orc.mergeSchema") + .doc("When true, the Orc data source merges schemas collected from all data files, " + + "otherwise the schema is picked from a random data file.") + .booleanConf + .createWithDefault(false) + val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + "when reading data stored in HDFS. This configuration will be deprecated in the future " + @@ -1907,6 +1913,8 @@ class SQLConf extends Serializable with Logging { def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) + def isOrcSchemaMergingEnabled: Boolean = getConf(ORC_SCHEMA_MERGING_ENABLED) + def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 01f8ce7911d4..f7c12598da20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -94,7 +94,7 @@ class OrcFileFormat sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - OrcUtils.readSchema(sparkSession, files) + OrcUtils.inferSchema(sparkSession, files, options) } override def prepareWrite( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index 0ad3862f6cf0..25f022bcdde8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -57,9 +57,20 @@ class OrcOptions( } shortOrcCompressionCodecNames(codecName) } + + /** + * Whether it merges schemas or not. When the given Orc files have different schemas, + * the schemas can be merged. By default use the value specified in SQLConf. + */ + val mergeSchema: Boolean = parameters + .get(MERGE_SCHEMA) + .map(_.toBoolean) + .getOrElse(sqlConf.isOrcSchemaMergingEnabled) } object OrcOptions { + val MERGE_SCHEMA = "mergeSchema" + // The ORC compression short names private val shortOrcCompressionCodecNames = Map( "none" -> "NONE", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index fb9f87ccdddd..f1052eab276e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types._ +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} object OrcUtils extends Logging { @@ -82,7 +83,6 @@ object OrcUtils extends Logging { : Option[StructType] = { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() - // TODO: We need to support merge schema. Please see SPARK-11412. files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { case Some(schema) => logDebug(s"Reading schema from file $files, got Hive schema string: $schema") @@ -90,6 +90,90 @@ object OrcUtils extends Logging { } } + /** + * Read single ORC file schema using native version of ORC + */ + def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) + : Option[StructType] = { + OrcUtils.readSchema(new Path(file), conf, ignoreCorruptFiles) + .map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType]) + } + + /** + * Figures out a merged ORC schema with a distributed Spark job. + * This function is used by both `org.apache.spark.sql.hive.orc.OrcFileFormat` + * and `org.apache.spark.sql.execution.datasources.orc.OrcFileFormat`. + */ + def mergeSchemasInParallel( + sparkSession: SparkSession, + files: Seq[FileStatus], + singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]) + : Option[StructType] = { + val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) + + val filePaths = files.map(_.getPath.toString) + + // Set the number of partitions to prevent following schema reads from generating many tasks + // in case of a small number of orc files. + val numParallelism = Math.min(Math.max(filePaths.size, 1), + sparkSession.sparkContext.defaultParallelism) + + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + + // Issues a Spark job to read ORC schema in parallel. + val partiallyMergedSchemas = + sparkSession + .sparkContext + .parallelize(filePaths, numParallelism) + .mapPartitions { iterator => + // Reads Orc schema in multi-threaded manner. + val partFiles = iterator.toSeq + val schemas = ThreadUtils.parmap(partFiles, "readingOrcSchemas", 8) { currentFile => + singleFileSchemaReader(currentFile, serializedConf.value, ignoreCorruptFiles) + }.flatten + + if (schemas.isEmpty) { + Iterator.empty + } else { + var mergedSchema = schemas.head + schemas.tail.foreach { schema => + try { + mergedSchema = mergedSchema.merge(schema) + } catch { case cause: SparkException => + throw new SparkException( + s"Failed merging schema:\n${schema.treeString}", cause) + } + } + Iterator.single(mergedSchema) + } + }.collect() + + if (partiallyMergedSchemas.isEmpty) { + None + } else { + var finalSchema = partiallyMergedSchemas.head + partiallyMergedSchemas.tail.foreach { schema => + try { + finalSchema = finalSchema.merge(schema) + } catch { case cause: SparkException => + throw new SparkException( + s"Failed merging schema:\n${schema.treeString}", cause) + } + } + Some(finalSchema) + } + } + + def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) + : Option[StructType] = { + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + if (orcOptions.mergeSchema) { + OrcUtils.mergeSchemasInParallel(sparkSession, files, OrcUtils.singleFileSchemaReader) + } else { + OrcUtils.readSchema(sparkSession, files) + } + } + /** * Returns the requested column ids from the given ORC file. Column id can be -1, which means the * requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index 1cc6e61c845c..ab2db49d0e09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.orc +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession @@ -39,7 +41,7 @@ case class OrcTable( new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = - OrcUtils.readSchema(sparkSession, files) + OrcUtils.inferSchema(sparkSession, files, options.asMap().asScala.toMap) override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = new OrcWriteBuilder(options, paths, formatName, supportsDataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala new file mode 100644 index 000000000000..b023ea632a57 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkException +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +class OrcUtilsSuite extends OrcTest with SharedSQLContext { + + test("read and merge orc schemas in parallel") { + def testMergeSchemasInParallel(ignoreCorruptFiles: Boolean): Unit = { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString) { + withTempDir { dir => + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val basePath = dir.getCanonicalPath + + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + val path3 = new Path(basePath, "third") + + spark.range(1).toDF("a").coalesce(1).write.orc(path1.toString) + spark.range(1, 2).toDF("b").coalesce(1).write.orc(path2.toString) + spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) + + val fileStatuses = + Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten + + val schema = OrcUtils.mergeSchemasInParallel( + spark, + fileStatuses, + OrcUtils.singleFileSchemaReader) + + assert(schema.isDefined == true) + assert(schema.get == StructType(Seq( + StructField("a", LongType, true), + StructField("b", LongType, true)))) + } + } + } + + testMergeSchemasInParallel(true) + val exception = intercept[SparkException] { + testMergeSchemasInParallel(false) + }.getCause + assert(exception.getCause.getMessage.contains("Could not read footer for file")) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 9ac3e98f5f0b..333aa5fd66c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -48,6 +48,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcOptions +import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types._ @@ -67,12 +68,20 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles - OrcFileOperator.readSchema( - files.map(_.getPath.toString), - Some(sparkSession.sessionState.newHadoopConf()), - ignoreCorruptFiles - ) + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + if (orcOptions.mergeSchema) { + OrcUtils.mergeSchemasInParallel( + sparkSession, + files, + OrcFileOperator.singleFileSchemaReader) + } else { + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + OrcFileOperator.readSchema( + files.map(_.getPath.toString), + Some(sparkSession.sessionState.newHadoopConf()), + ignoreCorruptFiles + ) + } } override def prepareWrite( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 713b70f252b6..90e4e700bb74 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -101,6 +101,19 @@ private[hive] object OrcFileOperator extends Logging { } } + /** + * Read single ORC file schema using Hive ORC library + */ + def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) + : Option[StructType] = { + getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => { + val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $file, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] + }) + } + def getObjectInspector( path: String, conf: Option[Configuration]): Option[StructObjectInspector] = { getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala new file mode 100644 index 000000000000..1dd0f9b89942 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.orc.{OrcTest, OrcUtils} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +class HiveOrcFileFormatSuite extends OrcTest with TestHiveSingleton { + + test("read and merge orc schemas in parallel using hive orc lib") { + def testMergeSchemasInParallelUsingHiveOrcLib(ignoreCorruptFiles: Boolean): Unit = { + withSQLConf( + SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString, + SQLConf.ORC_IMPLEMENTATION.key -> "hive") { + withTempDir { dir => + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val basePath = dir.getCanonicalPath + + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + val path3 = new Path(basePath, "third") + + spark.range(1).toDF("a").coalesce(1).write.orc(path1.toString) + spark.range(1, 2).toDF("b").coalesce(1).write.orc(path2.toString) + spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) + + val fileStatuses = + Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten + + val schema = OrcUtils.mergeSchemasInParallel( + spark, + fileStatuses, + OrcFileOperator.singleFileSchemaReader) + + assert(schema.isDefined == true) + assert(schema.get == StructType(Seq( + StructField("a", LongType, true), + StructField("b", LongType, true)))) + } + } + } + + testMergeSchemasInParallelUsingHiveOrcLib(true) + val exception = intercept[SparkException] { + testMergeSchemasInParallelUsingHiveOrcLib(false) + }.getCause + assert(exception.getCause.getMessage.contains("Could not read footer for file")) + } +} From 7d833b0d37c3cb646810d723651e9ceaa96da1fb Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Tue, 21 May 2019 23:03:17 +0800 Subject: [PATCH 02/15] update test suites --- .../execution/datasources/orc/OrcUtils.scala | 2 - .../datasources/v2/orc/OrcTable.scala | 2 +- .../datasources/orc/OrcSourceSuite.scala | 52 +++++++++++++- .../datasources/orc/OrcUtilsSuite.scala | 65 ----------------- .../sql/hive/orc/HiveOrcFileFormatSuite.scala | 69 ------------------- .../sql/hive/orc/HiveOrcSourceSuite.scala | 4 ++ 6 files changed, 55 insertions(+), 139 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index f1052eab276e..8ffc8df288d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -101,8 +101,6 @@ object OrcUtils extends Logging { /** * Figures out a merged ORC schema with a distributed Spark job. - * This function is used by both `org.apache.spark.sql.hive.orc.OrcFileFormat` - * and `org.apache.spark.sql.execution.datasources.orc.OrcFileFormat`. */ def mergeSchemasInParallel( sparkSession: SparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index ab2db49d0e09..3fe433861a3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -41,7 +41,7 @@ case class OrcTable( new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = - OrcUtils.inferSchema(sparkSession, files, options.asMap().asScala.toMap) + OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap) override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = new OrcWriteBuilder(options, paths, formatName, supportsDataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 8f9cc629880e..18f3cf92c51b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import java.util.Locale import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.orc.OrcConf.COMPRESS import org.apache.orc.OrcFile import org.apache.orc.OrcProto.ColumnEncoding.Kind.{DICTIONARY_V2, DIRECT, DIRECT_V2} @@ -31,10 +31,11 @@ import org.apache.orc.OrcProto.Stream.Kind import org.apache.orc.impl.RecordReaderImpl import org.scalatest.BeforeAndAfterAll -import org.apache.spark.SPARK_VERSION_SHORT +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.util.Utils case class OrcData(intField: Int, stringField: String) @@ -188,6 +189,49 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } + protected def testMergeSchemasInParallel( + ignoreCorruptFiles: Boolean, + singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]): Unit = { + withSQLConf( + SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString, + SQLConf.ORC_IMPLEMENTATION.key -> orcImp) { + withTempDir { dir => + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val basePath = dir.getCanonicalPath + + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + val path3 = new Path(basePath, "third") + + spark.range(1).toDF("a").coalesce(1).write.orc(path1.toString) + spark.range(1, 2).toDF("b").coalesce(1).write.orc(path2.toString) + spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) + + val fileStatuses = + Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten + + val schema = OrcUtils.mergeSchemasInParallel( + spark, + fileStatuses, + singleFileSchemaReader) + + assert(schema.isDefined == true) + assert(schema.get == StructType(Seq( + StructField("a", LongType, true), + StructField("b", LongType, true)))) + } + } + } + + protected def testMergeSchemasInParallel( + singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]): Unit = { + testMergeSchemasInParallel(true, singleFileSchemaReader) + val exception = intercept[SparkException] { + testMergeSchemasInParallel(false, singleFileSchemaReader) + }.getCause + assert(exception.getCause.getMessage.contains("Could not read footer for file")) + } + test("create temporary orc table") { checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10)) @@ -377,4 +421,8 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext { test("Enforce direct encoding column-wise selectively") { testSelectiveDictionaryEncoding(isSelective = true) } + + test("SPARK-11412 read and merge orc schemas in parallel") { + testMergeSchemasInParallel(OrcUtils.singleFileSchemaReader) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala deleted file mode 100644 index b023ea632a57..000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtilsSuite.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.orc - -import org.apache.hadoop.fs.{FileSystem, Path} - -import org.apache.spark.SparkException -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{LongType, StructField, StructType} - -class OrcUtilsSuite extends OrcTest with SharedSQLContext { - - test("read and merge orc schemas in parallel") { - def testMergeSchemasInParallel(ignoreCorruptFiles: Boolean): Unit = { - withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString) { - withTempDir { dir => - val fs = FileSystem.get(spark.sessionState.newHadoopConf()) - val basePath = dir.getCanonicalPath - - val path1 = new Path(basePath, "first") - val path2 = new Path(basePath, "second") - val path3 = new Path(basePath, "third") - - spark.range(1).toDF("a").coalesce(1).write.orc(path1.toString) - spark.range(1, 2).toDF("b").coalesce(1).write.orc(path2.toString) - spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) - - val fileStatuses = - Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten - - val schema = OrcUtils.mergeSchemasInParallel( - spark, - fileStatuses, - OrcUtils.singleFileSchemaReader) - - assert(schema.isDefined == true) - assert(schema.get == StructType(Seq( - StructField("a", LongType, true), - StructField("b", LongType, true)))) - } - } - } - - testMergeSchemasInParallel(true) - val exception = intercept[SparkException] { - testMergeSchemasInParallel(false) - }.getCause - assert(exception.getCause.getMessage.contains("Could not read footer for file")) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala deleted file mode 100644 index 1dd0f9b89942..000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFileFormatSuite.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.orc - -import org.apache.hadoop.fs.{FileSystem, Path} - -import org.apache.spark.SparkException -import org.apache.spark.sql.execution.datasources.orc.{OrcTest, OrcUtils} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{LongType, StructField, StructType} - -class HiveOrcFileFormatSuite extends OrcTest with TestHiveSingleton { - - test("read and merge orc schemas in parallel using hive orc lib") { - def testMergeSchemasInParallelUsingHiveOrcLib(ignoreCorruptFiles: Boolean): Unit = { - withSQLConf( - SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString, - SQLConf.ORC_IMPLEMENTATION.key -> "hive") { - withTempDir { dir => - val fs = FileSystem.get(spark.sessionState.newHadoopConf()) - val basePath = dir.getCanonicalPath - - val path1 = new Path(basePath, "first") - val path2 = new Path(basePath, "second") - val path3 = new Path(basePath, "third") - - spark.range(1).toDF("a").coalesce(1).write.orc(path1.toString) - spark.range(1, 2).toDF("b").coalesce(1).write.orc(path2.toString) - spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) - - val fileStatuses = - Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten - - val schema = OrcUtils.mergeSchemasInParallel( - spark, - fileStatuses, - OrcFileOperator.singleFileSchemaReader) - - assert(schema.isDefined == true) - assert(schema.get == StructType(Seq( - StructField("a", LongType, true), - StructField("b", LongType, true)))) - } - } - } - - testMergeSchemasInParallelUsingHiveOrcLib(true) - val exception = intercept[SparkException] { - testMergeSchemasInParallelUsingHiveOrcLib(false) - }.getCause - assert(exception.getCause.getMessage.contains("Could not read footer for file")) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 6bcb2225e66d..f6f3d7ca9104 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -166,4 +166,8 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { } } } + + test("SPARK-11412 read and merge orc schemas in parallel") { + testMergeSchemasInParallel(OrcFileOperator.singleFileSchemaReader) + } } From d9a0ff24e0bf552ce6468653f6ca0bba13b3de76 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sun, 9 Jun 2019 16:12:49 +0800 Subject: [PATCH 03/15] update based on comments --- .../execution/datasources/orc/OrcUtils.scala | 5 ++-- .../datasources/ReadSchemaSuite.scala | 26 +++++++++++++++++++ .../spark/sql/hive/orc/OrcFileOperator.scala | 2 +- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 8ffc8df288d5..9e40a78d93d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -92,9 +92,10 @@ object OrcUtils extends Logging { /** * Read single ORC file schema using native version of ORC + * This is visible for testing. */ def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[StructType] = { + : Option[StructType] = { OrcUtils.readSchema(new Path(file), conf, ignoreCorruptFiles) .map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType]) } @@ -106,7 +107,7 @@ object OrcUtils extends Logging { sparkSession: SparkSession, files: Seq[FileStatus], singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]) - : Option[StructType] = { + : Option[StructType] = { val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) val filePaths = files.map(_.getPath.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala index 8c95349ef3be..af99e5bc6634 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.internal.SQLConf * * -> OrcReadSchemaSuite * -> VectorizedOrcReadSchemaSuite + * -> MergedOrcReadSchemaSuite * * -> ParquetReadSchemaSuite * -> VectorizedParquetReadSchemaSuite @@ -134,6 +135,31 @@ class VectorizedOrcReadSchemaSuite } } +class MergedOrcReadSchemaSuite + extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest + with HideColumnInTheMiddleTest + with AddNestedColumnTest + with HideNestedColumnTest + with ChangePositionTest + with BooleanTypeTest + with IntegralTypeTest + with ToDoubleTypeTest { + + override val format: String = "orc" + + override def beforeAll() { + super.beforeAll() + originalConf = spark.conf.get(SQLConf.ORC_SCHEMA_MERGING_ENABLED) + spark.conf.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, "true") + } + + override def afterAll() { + spark.conf.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, originalConf) + super.afterAll() + } +} + class ParquetReadSchemaSuite extends ReadSchemaSuite with AddColumnIntoTheMiddleTest diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 90e4e700bb74..de0d1ebd9a2e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -105,7 +105,7 @@ private[hive] object OrcFileOperator extends Logging { * Read single ORC file schema using Hive ORC library */ def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[StructType] = { + : Option[StructType] = { getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => { val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] val schema = readerInspector.getTypeName From 70bc31b0fca6d35863779dfd91e1340e6ab464fd Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sun, 9 Jun 2019 19:17:29 +0800 Subject: [PATCH 04/15] refactor with ParquetFileFormat's mergeSchema --- .../datasources/SchemaMergeUtils.scala | 105 ++++++++++++++++++ .../execution/datasources/orc/OrcUtils.scala | 79 ++----------- .../parquet/ParquetFileFormat.scala | 79 ++----------- .../datasources/orc/OrcSourceSuite.scala | 17 +-- .../spark/sql/hive/orc/OrcFileFormat.scala | 5 +- .../spark/sql/hive/orc/OrcFileOperator.scala | 26 +++-- .../sql/hive/orc/HiveOrcSourceSuite.scala | 2 +- 7 files changed, 152 insertions(+), 161 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala new file mode 100644 index 000000000000..eef997471733 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +object SchemaMergeUtils extends Logging { + /** + * Figures out a merged Parquet/ORC schema with a distributed Spark job. + */ + def mergeSchemasInParallel( + sparkSession: SparkSession, + files: Seq[FileStatus], + parallelSchemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]) + : Option[StructType] = { + val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) + + // !! HACK ALERT !! + // + // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es + // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable` + // but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well + // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These + // facts virtually prevents us to serialize `FileStatus`es. + // + // Since Parquet only relies on path and length information of those `FileStatus`es to read + // footers, here we just extract them (which can be easily serialized), send them to executor + // side, and resemble fake `FileStatus`es there. + val partialFileStatusInfo = files.map(f => (f.getPath.toString, f.getLen)) + + // Set the number of partitions to prevent following schema reads from generating many tasks + // in case of a small number of orc files. + val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), + sparkSession.sparkContext.defaultParallelism) + + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + + // Issues a Spark job to read Parquet/ORC schema in parallel. + val partiallyMergedSchemas = + sparkSession + .sparkContext + .parallelize(partialFileStatusInfo, numParallelism) + .mapPartitions { iterator => + // Resembles fake `FileStatus`es with serialized path and length information. + val fakeFileStatuses = iterator.map { case (path, length) => + new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) + }.toSeq + + // Reads schemas in multi-threaded manner within each task + val schemas = parallelSchemaReader(fakeFileStatuses, serializedConf.value, ignoreCorruptFiles) + + if (schemas.isEmpty) { + Iterator.empty + } else { + var mergedSchema = schemas.head + schemas.tail.foreach { schema => + try { + mergedSchema = mergedSchema.merge(schema) + } catch { case cause: SparkException => + throw new SparkException( + s"Failed merging schema:\n${schema.treeString}", cause) + } + } + Iterator.single(mergedSchema) + } + }.collect() + + if (partiallyMergedSchemas.isEmpty) { + None + } else { + var finalSchema = partiallyMergedSchemas.head + partiallyMergedSchemas.tail.foreach { schema => + try { + finalSchema = finalSchema.merge(schema) + } catch { case cause: SparkException => + throw new SparkException( + s"Failed merging schema:\n${schema.treeString}", cause) + } + } + Some(finalSchema) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 9e40a78d93d4..3e880c070351 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.execution.datasources.SchemaMergeUtils import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -91,83 +92,23 @@ object OrcUtils extends Logging { } /** - * Read single ORC file schema using native version of ORC + * Reads ORC file schemas in multi-threaded manner, using native version of ORC. * This is visible for testing. */ - def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[StructType] = { - OrcUtils.readSchema(new Path(file), conf, ignoreCorruptFiles) - .map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType]) - } - - /** - * Figures out a merged ORC schema with a distributed Spark job. - */ - def mergeSchemasInParallel( - sparkSession: SparkSession, - files: Seq[FileStatus], - singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]) - : Option[StructType] = { - val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) - - val filePaths = files.map(_.getPath.toString) - - // Set the number of partitions to prevent following schema reads from generating many tasks - // in case of a small number of orc files. - val numParallelism = Math.min(Math.max(filePaths.size, 1), - sparkSession.sparkContext.defaultParallelism) - - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles - - // Issues a Spark job to read ORC schema in parallel. - val partiallyMergedSchemas = - sparkSession - .sparkContext - .parallelize(filePaths, numParallelism) - .mapPartitions { iterator => - // Reads Orc schema in multi-threaded manner. - val partFiles = iterator.toSeq - val schemas = ThreadUtils.parmap(partFiles, "readingOrcSchemas", 8) { currentFile => - singleFileSchemaReader(currentFile, serializedConf.value, ignoreCorruptFiles) - }.flatten - - if (schemas.isEmpty) { - Iterator.empty - } else { - var mergedSchema = schemas.head - schemas.tail.foreach { schema => - try { - mergedSchema = mergedSchema.merge(schema) - } catch { case cause: SparkException => - throw new SparkException( - s"Failed merging schema:\n${schema.treeString}", cause) - } - } - Iterator.single(mergedSchema) - } - }.collect() - - if (partiallyMergedSchemas.isEmpty) { - None - } else { - var finalSchema = partiallyMergedSchemas.head - partiallyMergedSchemas.tail.foreach { schema => - try { - finalSchema = finalSchema.merge(schema) - } catch { case cause: SparkException => - throw new SparkException( - s"Failed merging schema:\n${schema.treeString}", cause) - } - } - Some(finalSchema) - } + def readOrcSchemasInParallel( + files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = { + ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile => + OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles) + .map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType]) + }.flatten } def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) : Option[StructType] = { val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) if (orcOptions.mergeSchema) { - OrcUtils.mergeSchemasInParallel(sparkSession, files, OrcUtils.singleFileSchemaReader) + SchemaMergeUtils.mergeSchemasInParallel( + sparkSession, files, OrcUtils.readOrcSchemasInParallel) } else { OrcUtils.readSchema(sparkSession, files) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f2da159c5c95..ec78f9582711 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -574,79 +574,18 @@ object ParquetFileFormat extends Logging { sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp - val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) - // !! HACK ALERT !! - // - // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es - // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable` - // but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well - // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These - // facts virtually prevents us to serialize `FileStatus`es. - // - // Since Parquet only relies on path and length information of those `FileStatus`es to read - // footers, here we just extract them (which can be easily serialized), send them to executor - // side, and resemble fake `FileStatus`es there. - val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) - - // Set the number of partitions to prevent following schema reads from generating many tasks - // in case of a small number of parquet files. - val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), - sparkSession.sparkContext.defaultParallelism) - - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles - - // Issues a Spark job to read Parquet schema in parallel. - val partiallyMergedSchemas = - sparkSession - .sparkContext - .parallelize(partialFileStatusInfo, numParallelism) - .mapPartitions { iterator => - // Resembles fake `FileStatus`es with serialized path and length information. - val fakeFileStatuses = iterator.map { case (path, length) => - new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) - }.toSeq - - // Reads footers in multi-threaded manner within each task - val footers = - ParquetFileFormat.readParquetFootersInParallel( - serializedConf.value, fakeFileStatuses, ignoreCorruptFiles) - - // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` - val converter = new ParquetToSparkSchemaConverter( - assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp) - if (footers.isEmpty) { - Iterator.empty - } else { - var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter) - footers.tail.foreach { footer => - val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter) - try { - mergedSchema = mergedSchema.merge(schema) - } catch { case cause: SparkException => - throw new SparkException( - s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause) - } - } - Iterator.single(mergedSchema) - } - }.collect() + val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { + // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` + val converter = new ParquetToSparkSchemaConverter( + assumeBinaryIsString = assumeBinaryIsString, + assumeInt96IsTimestamp = assumeInt96IsTimestamp) - if (partiallyMergedSchemas.isEmpty) { - None - } else { - var finalSchema = partiallyMergedSchemas.head - partiallyMergedSchemas.tail.foreach { schema => - try { - finalSchema = finalSchema.merge(schema) - } catch { case cause: SparkException => - throw new SparkException( - s"Failed merging schema:\n${schema.treeString}", cause) - } - } - Some(finalSchema) + readParquetFootersInParallel(conf, files, ignoreCorruptFiles) + .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) } + + SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 18f3cf92c51b..fa04779c1f92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import java.util.Locale import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.orc.OrcConf.COMPRESS import org.apache.orc.OrcFile import org.apache.orc.OrcProto.ColumnEncoding.Kind.{DICTIONARY_V2, DIRECT, DIRECT_V2} @@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY} +import org.apache.spark.sql.execution.datasources.SchemaMergeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -191,7 +192,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { protected def testMergeSchemasInParallel( ignoreCorruptFiles: Boolean, - singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]): Unit = { + parallelSchemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { withSQLConf( SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString, SQLConf.ORC_IMPLEMENTATION.key -> orcImp) { @@ -210,10 +211,10 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { val fileStatuses = Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten - val schema = OrcUtils.mergeSchemasInParallel( + val schema = SchemaMergeUtils.mergeSchemasInParallel( spark, fileStatuses, - singleFileSchemaReader) + parallelSchemaReader) assert(schema.isDefined == true) assert(schema.get == StructType(Seq( @@ -224,10 +225,10 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } protected def testMergeSchemasInParallel( - singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]): Unit = { - testMergeSchemasInParallel(true, singleFileSchemaReader) + parallelSchemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { + testMergeSchemasInParallel(true, parallelSchemaReader) val exception = intercept[SparkException] { - testMergeSchemasInParallel(false, singleFileSchemaReader) + testMergeSchemasInParallel(false, parallelSchemaReader) }.getCause assert(exception.getCause.getMessage.contains("Could not read footer for file")) } @@ -423,6 +424,6 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext { } test("SPARK-11412 read and merge orc schemas in parallel") { - testMergeSchemasInParallel(OrcUtils.singleFileSchemaReader) + testMergeSchemasInParallel(OrcUtils.readOrcSchemasInParallel) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 333aa5fd66c9..7f2eb14956dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -48,7 +48,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcOptions -import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types._ @@ -70,10 +69,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable files: Seq[FileStatus]): Option[StructType] = { val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) if (orcOptions.mergeSchema) { - OrcUtils.mergeSchemasInParallel( + SchemaMergeUtils.mergeSchemasInParallel( sparkSession, files, - OrcFileOperator.singleFileSchemaReader) + OrcFileOperator.readOrcSchemasInParallel) } else { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles OrcFileOperator.readSchema( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index de0d1ebd9a2e..1496352e9762 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.orc import java.io.IOException import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader} import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector @@ -29,6 +29,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils private[hive] object OrcFileOperator extends Logging { /** @@ -102,16 +103,21 @@ private[hive] object OrcFileOperator extends Logging { } /** - * Read single ORC file schema using Hive ORC library + * Reads ORC file schemas in multi-threaded manner, using Hive ORC library. + * This is visible for testing. */ - def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[StructType] = { - getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => { - val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] - val schema = readerInspector.getTypeName - logDebug(s"Reading schema from file $file, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] - }) + def readOrcSchemasInParallel( + partFiles: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) + : Seq[StructType] = { + ThreadUtils.parmap(partFiles, "readingOrcSchemas", 8) { currentFile => + val file = currentFile.getPath.toString + getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => { + val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $file., got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] + }) + }.flatten } def getObjectInspector( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index f6f3d7ca9104..3104fb4d8173 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -168,6 +168,6 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { } test("SPARK-11412 read and merge orc schemas in parallel") { - testMergeSchemasInParallel(OrcFileOperator.singleFileSchemaReader) + testMergeSchemasInParallel(OrcFileOperator.readOrcSchemasInParallel) } } From 1d1035056dacea19d8ff6841c59c79596a48cd68 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sun, 9 Jun 2019 22:56:02 +0800 Subject: [PATCH 05/15] remove redundant spaces --- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 3e880c070351..12d4244e1981 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -104,7 +104,7 @@ object OrcUtils extends Logging { } def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) - : Option[StructType] = { + : Option[StructType] = { val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) if (orcOptions.mergeSchema) { SchemaMergeUtils.mergeSchemasInParallel( From da35351236e7bc09075532daaf2ee40fc685b509 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Mon, 10 Jun 2019 11:24:00 +0800 Subject: [PATCH 06/15] fix scala style issue --- .../spark/sql/execution/datasources/SchemaMergeUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index eef997471733..c5a01ae1b309 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -69,7 +69,8 @@ object SchemaMergeUtils extends Logging { }.toSeq // Reads schemas in multi-threaded manner within each task - val schemas = parallelSchemaReader(fakeFileStatuses, serializedConf.value, ignoreCorruptFiles) + val schemas = parallelSchemaReader( + fakeFileStatuses, serializedConf.value, ignoreCorruptFiles) if (schemas.isEmpty) { Iterator.empty From e527c194f6334ff697b1df33e30b91ace6c34e57 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 13 Jun 2019 00:24:43 +0800 Subject: [PATCH 07/15] add test case and fix code style --- .../datasources/SchemaMergeUtils.scala | 19 +++++----- .../datasources/ReadSchemaSuite.scala | 15 +++----- .../datasources/orc/OrcSourceSuite.scala | 36 +++++++++++++++---- .../spark/sql/hive/orc/OrcFileOperator.scala | 4 +-- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index c5a01ae1b309..9c92066b0a1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -31,21 +31,22 @@ object SchemaMergeUtils extends Logging { * Figures out a merged Parquet/ORC schema with a distributed Spark job. */ def mergeSchemasInParallel( - sparkSession: SparkSession, - files: Seq[FileStatus], - parallelSchemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]) - : Option[StructType] = { + sparkSession: SparkSession, + files: Seq[FileStatus], + schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]) + : Option[StructType] = { val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) // !! HACK ALERT !! // - // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es - // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable` + // Parquet/Orc requires `FileStatus`es to read footers. + // Here we try to send cached `FileStatus`es to executor side to avoid fetching them again. + // However, `FileStatus` is not `Serializable` // but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These // facts virtually prevents us to serialize `FileStatus`es. // - // Since Parquet only relies on path and length information of those `FileStatus`es to read + // Since Parquet/Orc only relies on path and length information of those `FileStatus`es to read // footers, here we just extract them (which can be easily serialized), send them to executor // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = files.map(f => (f.getPath.toString, f.getLen)) @@ -68,9 +69,7 @@ object SchemaMergeUtils extends Logging { new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) }.toSeq - // Reads schemas in multi-threaded manner within each task - val schemas = parallelSchemaReader( - fakeFileStatuses, serializedConf.value, ignoreCorruptFiles) + val schemas = schemaReader(fakeFileStatuses, serializedConf.value, ignoreCorruptFiles) if (schemas.isEmpty) { Iterator.empty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala index af99e5bc6634..d5502ba5737c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.SparkConf import org.apache.spark.sql.internal.SQLConf /** @@ -148,16 +149,10 @@ class MergedOrcReadSchemaSuite override val format: String = "orc" - override def beforeAll() { - super.beforeAll() - originalConf = spark.conf.get(SQLConf.ORC_SCHEMA_MERGING_ENABLED) - spark.conf.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, "true") - } - - override def afterAll() { - spark.conf.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, originalConf) - super.afterAll() - } + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, "true") } class ParquetReadSchemaSuite diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index fa04779c1f92..effe492395ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -192,7 +192,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { protected def testMergeSchemasInParallel( ignoreCorruptFiles: Boolean, - parallelSchemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { + schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { withSQLConf( SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString, SQLConf.ORC_IMPLEMENTATION.key -> orcImp) { @@ -214,9 +214,9 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { val schema = SchemaMergeUtils.mergeSchemasInParallel( spark, fileStatuses, - parallelSchemaReader) + schemaReader) - assert(schema.isDefined == true) + assert(schema.isDefined) assert(schema.get == StructType(Seq( StructField("a", LongType, true), StructField("b", LongType, true)))) @@ -225,10 +225,10 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } protected def testMergeSchemasInParallel( - parallelSchemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { - testMergeSchemasInParallel(true, parallelSchemaReader) + schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { + testMergeSchemasInParallel(true, schemaReader) val exception = intercept[SparkException] { - testMergeSchemasInParallel(false, parallelSchemaReader) + testMergeSchemasInParallel(false, schemaReader) }.getCause assert(exception.getCause.getMessage.contains("Could not read footer for file")) } @@ -377,6 +377,30 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { assert(version === SPARK_VERSION_SHORT) } } + + test("SPARK-11412 test orc merge schema option") { + val conf = spark.sessionState.conf + // Test if the default of spark.sql.orc.mergeSchema is false + assert(new OrcOptions(Map.empty[String, String], conf).mergeSchema == false) + + // OrcOptions's parameters have a higher priority than SQL configuration. + // `mergeSchema` -> `spark.sql.orc.mergeSchema` + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { + assert(new OrcOptions(Map.empty[String, String], conf).mergeSchema == true) + val map1 = Map(OrcOptions.MERGE_SCHEMA -> "true") + val map2 = Map(OrcOptions.MERGE_SCHEMA -> "false") + assert(new OrcOptions(map1, conf).mergeSchema == true) + assert(new OrcOptions(map2, conf).mergeSchema == false) + } + + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { + assert(new OrcOptions(Map.empty[String, String], conf).mergeSchema == true) + val map1 = Map(OrcOptions.MERGE_SCHEMA -> "true") + val map2 = Map(OrcOptions.MERGE_SCHEMA -> "false") + assert(new OrcOptions(map1, conf).mergeSchema == true) + assert(new OrcOptions(map2, conf).mergeSchema == false) + } + } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 1496352e9762..1a5f47bf5aa7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -107,8 +107,8 @@ private[hive] object OrcFileOperator extends Logging { * This is visible for testing. */ def readOrcSchemasInParallel( - partFiles: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) - : Seq[StructType] = { + partFiles: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) + : Seq[StructType] = { ThreadUtils.parmap(partFiles, "readingOrcSchemas", 8) { currentFile => val file = currentFile.getPath.toString getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => { From ac11880c9c3e1754f60559bdb17745ac3acd2117 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 13 Jun 2019 00:43:05 +0800 Subject: [PATCH 08/15] update --- .../spark/sql/execution/datasources/SchemaMergeUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index 9c92066b0a1f..20ea9bfd918d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -39,14 +39,14 @@ object SchemaMergeUtils extends Logging { // !! HACK ALERT !! // - // Parquet/Orc requires `FileStatus`es to read footers. + // Parquet/ORC requires `FileStatus`es to read footers. // Here we try to send cached `FileStatus`es to executor side to avoid fetching them again. // However, `FileStatus` is not `Serializable` // but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These // facts virtually prevents us to serialize `FileStatus`es. // - // Since Parquet/Orc only relies on path and length information of those `FileStatus`es to read + // Since Parquet/ORC only relies on path and length information of those `FileStatus`es to read // footers, here we just extract them (which can be easily serialized), send them to executor // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = files.map(f => (f.getPath.toString, f.getLen)) From b8d216fc8fb6954579f08762b60737abb8d39d29 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 13 Jun 2019 00:52:13 +0800 Subject: [PATCH 09/15] update --- .../spark/sql/execution/datasources/orc/OrcSourceSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index effe492395ba..589fafd1bf38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -386,7 +386,6 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { // OrcOptions's parameters have a higher priority than SQL configuration. // `mergeSchema` -> `spark.sql.orc.mergeSchema` withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { - assert(new OrcOptions(Map.empty[String, String], conf).mergeSchema == true) val map1 = Map(OrcOptions.MERGE_SCHEMA -> "true") val map2 = Map(OrcOptions.MERGE_SCHEMA -> "false") assert(new OrcOptions(map1, conf).mergeSchema == true) @@ -394,7 +393,6 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { - assert(new OrcOptions(Map.empty[String, String], conf).mergeSchema == true) val map1 = Map(OrcOptions.MERGE_SCHEMA -> "true") val map2 = Map(OrcOptions.MERGE_SCHEMA -> "false") assert(new OrcOptions(map1, conf).mergeSchema == true) From 93532141438815bf1f3de0bde3a18ce8b931a97e Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Mon, 17 Jun 2019 11:49:57 +0800 Subject: [PATCH 10/15] add test case --- .../datasources/orc/OrcSourceSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 589fafd1bf38..7c713a935b1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -399,6 +399,31 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { assert(new OrcOptions(map2, conf).mergeSchema == false) } } + + test("SPARK-11412 test enabling/disabling schema merging") { + def testSchemaMerging(expectedColumnNumber: Int): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + spark.range(0, 10).toDF("b").write.orc(new Path(basePath, "foo=2").toString) + assert(spark.read.orc(basePath).columns.length === expectedColumnNumber) + + // OrcOptions.MERGE_SCHEMA has higher priority + assert(spark.read.option(OrcOptions.MERGE_SCHEMA, true) + .orc(basePath).columns.length === 3) + assert(spark.read.option(OrcOptions.MERGE_SCHEMA, false) + .orc(basePath).columns.length === 2) + } + } + + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { + testSchemaMerging(3) + } + + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { + testSchemaMerging(2) + } + } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { From 43f7b58a8ef18ff4ce45dec749346acac1a5f75f Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Mon, 17 Jun 2019 16:27:15 +0800 Subject: [PATCH 11/15] update comment --- .../spark/sql/execution/datasources/SchemaMergeUtils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index 20ea9bfd918d..99882b0f7c7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -38,15 +38,16 @@ object SchemaMergeUtils extends Logging { val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) // !! HACK ALERT !! + // Here is a hack for Parquet, but it can be used by Orc as well. // - // Parquet/ORC requires `FileStatus`es to read footers. + // Parquet requires `FileStatus`es to read footers. // Here we try to send cached `FileStatus`es to executor side to avoid fetching them again. // However, `FileStatus` is not `Serializable` // but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These // facts virtually prevents us to serialize `FileStatus`es. // - // Since Parquet/ORC only relies on path and length information of those `FileStatus`es to read + // Since Parquet only relies on path and length information of those `FileStatus`es to read // footers, here we just extract them (which can be easily serialized), send them to executor // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = files.map(f => (f.getPath.toString, f.getLen)) From bc4618f8b368bf0ce58e0bcc1a1feb6bea6048fc Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Tue, 25 Jun 2019 21:23:57 +0800 Subject: [PATCH 12/15] add negative test cases --- .../datasources/orc/OrcSourceSuite.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 7c713a935b1e..58f6a779f53d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -424,6 +424,55 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { testSchemaMerging(2) } } + + test("SPARK-11412 test enabling/disabling schema merging with data type conflicts") { + def testSchemaMergingWithDataTypeConflicts(expectedColumnNumber: Int): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + spark.range(0, 10).map(s => s"value_$s").toDF("a") + .write.orc(new Path(basePath, "foo=2").toString) + assert(spark.read.orc(basePath).columns.length === expectedColumnNumber) + } + } + + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { + val exception = intercept[SparkException] { + testSchemaMergingWithDataTypeConflicts(3) + }.getCause + assert(exception.getMessage.contains( + "Failed to merge incompatible data types bigint and string")) + } + + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { + testSchemaMergingWithDataTypeConflicts(2) + } + } + + test("SPARK-11412 test schema merging with corrupt files") { + def testSchemaMerging(ignoreCorruptFiles: Boolean, expectedColumnNumber: Int): Unit = { + withSQLConf( + SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString) { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + spark.range(0, 10).toDF("b").write.orc(new Path(basePath, "foo=2").toString) + spark.range(0, 10).toDF("c").write.json(new Path(basePath, "foo=3").toString) + assert(spark.read.orc(basePath).columns.length === expectedColumnNumber) + } + } + } + + // ignore corrupt files + testSchemaMerging(true, 3) + + // don't ignore corrupt files + val exception = intercept[SparkException] { + testSchemaMerging(false, 3) + }.getCause + assert(exception.getCause.getMessage.contains("Could not read footer for file")) + } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { From 2ea9eb333e7f78f99ad229df25c6c2f38d2a6abc Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Wed, 26 Jun 2019 10:53:12 +0800 Subject: [PATCH 13/15] update test case --- .../execution/datasources/orc/OrcSourceSuite.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 58f6a779f53d..946b465589d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -440,8 +440,15 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { val exception = intercept[SparkException] { testSchemaMergingWithDataTypeConflicts(3) }.getCause - assert(exception.getMessage.contains( - "Failed to merge incompatible data types bigint and string")) + + val innerMessage = orcImp match { + case "native" => exception.getMessage + case "hive" => exception.getCause.getMessage + case impl => + throw new UnsupportedOperationException(s"Unknown ORC implementation: $impl") + } + + assert(innerMessage.contains("Failed to merge incompatible data types bigint and string")) } withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { From 50c3906519020c00e4ca3b6bec567aec98692456 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Wed, 26 Jun 2019 12:49:43 +0800 Subject: [PATCH 14/15] update test case --- .../spark/sql/execution/datasources/orc/OrcSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 946b465589d8..6967754f031a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -448,7 +448,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { throw new UnsupportedOperationException(s"Unknown ORC implementation: $impl") } - assert(innerMessage.contains("Failed to merge incompatible data types bigint and string")) + assert(innerMessage.contains("Failed to merge incompatible data types")) } withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { From a6fc2d0d3b542c402e426ff125ff42822ddb4b7c Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Wed, 26 Jun 2019 14:52:35 +0800 Subject: [PATCH 15/15] update test cases --- .../datasources/orc/OrcSourceSuite.scala | 84 +++++++++---------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 6967754f031a..c9f5d9cb23e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -426,59 +426,57 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } test("SPARK-11412 test enabling/disabling schema merging with data type conflicts") { - def testSchemaMergingWithDataTypeConflicts(expectedColumnNumber: Int): Unit = { - withTempDir { dir => - val basePath = dir.getCanonicalPath - spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) - spark.range(0, 10).map(s => s"value_$s").toDF("a") - .write.orc(new Path(basePath, "foo=2").toString) - assert(spark.read.orc(basePath).columns.length === expectedColumnNumber) - } - } + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + spark.range(0, 10).map(s => s"value_$s").toDF("a") + .write.orc(new Path(basePath, "foo=2").toString) + + // with schema merging, there should throw exception + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { + val exception = intercept[SparkException] { + spark.read.orc(basePath).columns.length + }.getCause + + val innerMessage = orcImp match { + case "native" => exception.getMessage + case "hive" => exception.getCause.getMessage + case impl => + throw new UnsupportedOperationException(s"Unknown ORC implementation: $impl") + } - withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { - val exception = intercept[SparkException] { - testSchemaMergingWithDataTypeConflicts(3) - }.getCause - - val innerMessage = orcImp match { - case "native" => exception.getMessage - case "hive" => exception.getCause.getMessage - case impl => - throw new UnsupportedOperationException(s"Unknown ORC implementation: $impl") + assert(innerMessage.contains("Failed to merge incompatible data types")) } - assert(innerMessage.contains("Failed to merge incompatible data types")) - } - - withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { - testSchemaMergingWithDataTypeConflicts(2) + // it is ok if no schema merging + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") { + assert(spark.read.orc(basePath).columns.length === 2) + } } } test("SPARK-11412 test schema merging with corrupt files") { - def testSchemaMerging(ignoreCorruptFiles: Boolean, expectedColumnNumber: Int): Unit = { - withSQLConf( - SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString) { - withTempDir { dir => - val basePath = dir.getCanonicalPath - spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) - spark.range(0, 10).toDF("b").write.orc(new Path(basePath, "foo=2").toString) - spark.range(0, 10).toDF("c").write.json(new Path(basePath, "foo=3").toString) - assert(spark.read.orc(basePath).columns.length === expectedColumnNumber) + withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + spark.range(0, 10).toDF("b").write.orc(new Path(basePath, "foo=2").toString) + spark.range(0, 10).toDF("c").write.json(new Path(basePath, "foo=3").toString) + + // ignore corrupt files + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + assert(spark.read.orc(basePath).columns.length === 3) + } + + // don't ignore corrupt files + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val exception = intercept[SparkException] { + spark.read.orc(basePath).columns.length + }.getCause + assert(exception.getCause.getMessage.contains("Could not read footer for file")) } } } - - // ignore corrupt files - testSchemaMerging(true, 3) - - // don't ignore corrupt files - val exception = intercept[SparkException] { - testSchemaMerging(false, 3) - }.getCause - assert(exception.getCause.getMessage.contains("Could not read footer for file")) } }