diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8abb4262d735..ce9cc562b220 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl") + .doc("When native, use the native version of ORC support instead of the ORC library in Hive " + + "1.2.1. It is 'hive' by default prior to Spark 2.3.") + .internal() + .stringConf + .checkValues(Set("hive", "native")) + .createWithDefault("native") + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0c5f3f22e31e..6cdfe2fae564 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,6 +1,7 @@ org.apache.spark.sql.execution.datasources.csv.CSVFileFormat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider org.apache.spark.sql.execution.datasources.json.JsonFileFormat +org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 17966eecfc05..ea1cf6677523 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - val cls = DataSource.lookupDataSource(source) + val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val options = new DataSourceV2Options(extraOptions.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 35abeccfd514..59a01e61124f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(source) + val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { cls.newInstance() match { case ds: WriteSupport => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index c42e6c3257fa..e400975f1970 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.Utils @@ -190,7 +191,7 @@ case class AlterTableAddColumnsCommand( colsToAdd: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val catalogTable = verifyAlterTableAddColumn(catalog, table) + val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table) try { sparkSession.catalog.uncacheTable(table.quotedString) @@ -216,6 +217,7 @@ case class AlterTableAddColumnsCommand( * For datasource table, it currently only supports parquet, json, csv. */ private def verifyAlterTableAddColumn( + conf: SQLConf, catalog: SessionCatalog, table: TableIdentifier): CatalogTable = { val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) @@ -229,7 +231,7 @@ case class AlterTableAddColumnsCommand( } if (DDLUtils.isDatasourceTable(catalogTable)) { - DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match { + DataSource.lookupDataSource(catalogTable.provider.get, conf).newInstance() match { // For datasource table, this command can only support the following File format. // TextFileFormat only default to one column "value" // Hive type is already considered as hive serde table, so the logic will not diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b43d282bd434..5f12d5f93a35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} @@ -85,7 +87,8 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) + lazy val providingClass: Class[_] = + DataSource.lookupDataSource(className, sparkSession.sessionState.conf) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -537,6 +540,7 @@ object DataSource extends Logging { val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" + val nativeOrc = classOf[OrcFileFormat].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -553,6 +557,8 @@ object DataSource extends Logging { "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, "org.apache.spark.sql.hive.orc" -> orc, + "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc, + "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv @@ -568,8 +574,16 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { - val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { + val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { + case name if name.equalsIgnoreCase("orc") && + conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => + classOf[OrcFileFormat].getCanonicalName + case name if name.equalsIgnoreCase("orc") && + conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => + "org.apache.spark.sql.hive.orc.OrcFileFormat" + case name => name + } val provider2 = s"$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) @@ -587,7 +601,8 @@ object DataSource extends Logging { if (provider1.toLowerCase(Locale.ROOT) == "orc" || provider1.startsWith("org.apache.spark.sql.hive.orc")) { throw new AnalysisException( - "The ORC data source must be used with Hive support enabled") + "Hive-based ORC data source must be used with Hive support enabled. " + + "Please use native ORC data source instead") } else if (provider1.toLowerCase(Locale.ROOT) == "avro" || provider1 == "com.databricks.spark.avro") { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 60c430bcfece..6e08df75b8a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -108,8 +108,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi } // Check if the specified data source match the data source of the existing table. - val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) - val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get) + val conf = sparkSession.sessionState.conf + val existingProvider = DataSource.lookupDataSource(existingTable.provider.get, conf) + val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get, conf) // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). if (existingProvider != specifiedProvider) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 31d9b909ad46..86bd9b95bca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -28,6 +28,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1664,7 +1666,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { e = intercept[AnalysisException] { sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`") } - assert(e.message.contains("The ORC data source must be used with Hive support enabled")) + assert(e.message.contains("Hive-based ORC data source must be used with Hive support")) e = intercept[AnalysisException] { sql(s"select id from `com.databricks.spark.avro`.`file_path`") @@ -2782,4 +2784,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(spark.read.format(orc).load(path).collect().length == 2) } } + + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + } + assert(e.message.contains("Hive-based ORC data source must be used with Hive support")) + } + + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass + } + assert(fileFormat == Some(classOf[OrcFileFormat])) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 3ce6ae3c5292..f22d843bfabd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -53,13 +53,6 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne") .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))) } - - test("should fail to load ORC without Hive Support") { - val e = intercept[AnalysisException] { - spark.read.format("orc").load() - } - assert(e.message.contains("The ORC data source must be used with Hive support enabled")) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index a5d7e6257a6d..8c9bb7d56a35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -155,7 +155,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } - test("resolve default source") { spark.read .format("org.apache.spark.sql.test") @@ -478,42 +477,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema) } - /** - * This only tests whether API compiles, but does not run it as orc() - * cannot be run without Hive classes. - */ - ignore("orc - API") { - // Reader, with user specified schema - // Refer to csv-specific test suites for behavior without user specified schema - spark.read.schema(userSchema).orc() - spark.read.schema(userSchema).orc(dir) - spark.read.schema(userSchema).orc(dir, dir, dir) - spark.read.schema(userSchema).orc(Seq(dir, dir): _*) - Option(dir).map(spark.read.schema(userSchema).orc) + test("orc - API and behavior regarding schema") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { + // Writer + spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir) + val df = spark.read.orc(dir) + checkAnswer(df, spark.createDataset(data).toDF()) + val schema = df.schema - // Writer - spark.range(10).write.orc(dir) + // Reader, without user specified schema + intercept[AnalysisException] { + testRead(spark.read.orc(), Seq.empty, schema) + } + testRead(spark.read.orc(dir), data, schema) + testRead(spark.read.orc(dir, dir), data ++ data, schema) + testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema) + // Test explicit calls to single arg method - SPARK-16009 + testRead(Option(dir).map(spark.read.orc).get, data, schema) + + // Reader, with user specified schema, data should be nulls as schema in file different + // from user schema + val expData = Seq[String](null, null, null) + testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema) + testRead(spark.read.schema(userSchema).orc(dir), expData, userSchema) + testRead(spark.read.schema(userSchema).orc(dir, dir), expData ++ expData, userSchema) + testRead( + spark.read.schema(userSchema).orc(Seq(dir, dir): _*), expData ++ expData, userSchema) + } } test("column nullability and comment - write and then read") { - Seq("json", "parquet", "csv").foreach { format => - val schema = StructType( - StructField("cl1", IntegerType, nullable = false).withComment("test") :: - StructField("cl2", IntegerType, nullable = true) :: - StructField("cl3", IntegerType, nullable = true) :: Nil) - val row = Row(3, null, 4) - val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) - - val tableName = "tab" - withTable(tableName) { - df.write.format(format).mode("overwrite").saveAsTable(tableName) - // Verify the DDL command result: DESCRIBE TABLE - checkAnswer( - sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"), - Row("cl1", "test") :: Nil) - // Verify the schema - val expectedFields = schema.fields.map(f => f.copy(nullable = true)) - assert(spark.table(tableName).schema == schema.copy(fields = expectedFields)) + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { + Seq("json", "orc", "parquet", "csv").foreach { format => + val schema = StructType( + StructField("cl1", IntegerType, nullable = false).withComment("test") :: + StructField("cl2", IntegerType, nullable = true) :: + StructField("cl3", IntegerType, nullable = true) :: Nil) + val row = Row(3, null, 4) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + + val tableName = "tab" + withTable(tableName) { + df.write.format(format).mode("overwrite").saveAsTable(tableName) + // Verify the DDL command result: DESCRIBE TABLE + checkAnswer( + sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"), + Row("cl1", "test") :: Nil) + // Verify the schema + val expectedFields = schema.fields.map(f => f.copy(nullable = true)) + assert(spark.table(tableName).schema == schema.copy(fields = expectedFields)) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ee1f6ee17306..3018c0642f06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -195,8 +194,19 @@ case class RelationConversions( .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { val options = relation.tableMeta.storage.properties - sessionCatalog.metastoreCatalog - .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc") + if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { + sessionCatalog.metastoreCatalog.convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], + "orc") + } else { + sessionCatalog.metastoreCatalog.convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], + "orc") + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 1fa9091f967a..1ffaf3031103 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -621,4 +621,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { makeOrcFile((1 to 10).map(Tuple1.apply), path2) assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count()) } + + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { + Seq( + ("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), + ("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) => + + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> i) { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass + } + assert(fileFormat == Some(format)) + } + } + } + } }