Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ object SQLConf {
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
.createWithDefault("snappy")

val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
.doc("When native, use the native version of ORC support instead of the ORC library in Hive " +
"1.2.1. It is 'hive' by default prior to Spark 2.3.")
.internal()
.stringConf
.checkValues(Set("hive", "native"))
.createWithDefault("native")

val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

assertNotBucketed("save")

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
cls.newInstance() match {
case ds: WriteSupport =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -190,7 +191,7 @@ case class AlterTableAddColumnsCommand(
colsToAdd: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)
val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
Expand All @@ -216,6 +217,7 @@ case class AlterTableAddColumnsCommand(
* For datasource table, it currently only supports parquet, json, csv.
*/
private def verifyAlterTableAddColumn(
conf: SQLConf,
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
Expand All @@ -229,7 +231,7 @@ case class AlterTableAddColumnsCommand(
}

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
DataSource.lookupDataSource(catalogTable.provider.get, conf).newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// Hive type is already considered as hive serde table, so the logic will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
Expand Down Expand Up @@ -85,7 +87,8 @@ case class DataSource(

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -537,6 +540,7 @@ object DataSource extends Logging {
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -553,6 +557,8 @@ object DataSource extends Logging {
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc,
"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map is for backward compatibility in case we move data sources around. I think this datasources.orc is newly added. Why we need to add them here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch! sounds like we don't need compatibility rule for the new orc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like USING org.apache.spark.sql.hive.orc, we want to use USING org.apache.spark.sql.execution.datasources.orc, don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I received the advice, I thought it's for consistency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for safety. We also do it for parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parquet, this is for historical reason, see #13311.

Previously you can use parquet by org.apache.spark.sql.execution.datasources.parquet and org.apache.spark.sql.execution.datasources.parquet.DefaultSource. So it is also for backward compatibility.

For this new orc, it is not the same case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename variable and/or fix the comments there when we touch some codes around there to prevent confusion next time though.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.sql.execution.* path is meant to be private too. But I think it's okay to leave it for practical use cases with some comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are pretty safe. In case, we move the orc to the other location, it will still refer to the right location.

"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
Expand All @@ -568,8 +574,13 @@ object DataSource extends Logging {
"org.apache.spark.Logging")

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name => name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ORC_IMPLEMENTATION is hive, we leave the provider as it was, which may be orc. Then we will hit Multiple sources found issue, aren't we? Both the old and new orc has the same short name orc.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at the exact same path. It seems not because it's not registered to ServiceLoader (src/main/resources/org.apache.spark.sql.sources.DataSourceRegister). So, short name for the newer ORC source would not be used here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan . To avoid that issue, new OrcFileFormat is not registered intentionally.
@HyukjinKwon 's comment is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds counter-intuitive, I think we should register the new orc instead of the old one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also add comments here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for ^

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with both of you.

Just for explanation: The original design completely preserves the previous behavior.
Without SQLConf.ORC_IMPLEMENTATION option, Spark doesn't know OrcFileFormat. So, in case of non-Hive support, creating data source with "orc" will fail with unknown data source.

Anyway, I'm happy to update according to your advice. :)

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there is no more The ORC data source must be used with Hive support enabled.
If hive impl is request in sql/core, it will show more proper messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for here, I added the following to prevent Multiple sources found. Last time, I missed this way. My bad.

+      case name if name.equalsIgnoreCase("orc") &&
+        conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
+        "org.apache.spark.sql.hive.orc.OrcFileFormat"

}
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}

// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
val conf = sparkSession.sessionState.conf
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get, conf)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get, conf)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2782,4 +2784,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert(spark.read.format(orc).load(path).collect().length == 2)
}
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
val e = intercept[AnalysisException] {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}

withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(classOf[OrcFileFormat]))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
.load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))))
}

test("should fail to load ORC without Hive Support") {
val e = intercept[AnalysisException] {
spark.read.format("orc").load()
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}


test("resolve default source") {
spark.read
.format("org.apache.spark.sql.test")
Expand Down Expand Up @@ -478,42 +477,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema)
}

/**
* This only tests whether API compiles, but does not run it as orc()
* cannot be run without Hive classes.
*/
ignore("orc - API") {
// Reader, with user specified schema
// Refer to csv-specific test suites for behavior without user specified schema
spark.read.schema(userSchema).orc()
spark.read.schema(userSchema).orc(dir)
spark.read.schema(userSchema).orc(dir, dir, dir)
spark.read.schema(userSchema).orc(Seq(dir, dir): _*)
Option(dir).map(spark.read.schema(userSchema).orc)
test("orc - API and behavior regarding schema") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
// Writer
spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir)
val df = spark.read.orc(dir)
checkAnswer(df, spark.createDataset(data).toDF())
val schema = df.schema

// Writer
spark.range(10).write.orc(dir)
// Reader, without user specified schema
intercept[AnalysisException] {
testRead(spark.read.orc(), Seq.empty, schema)
}
testRead(spark.read.orc(dir), data, schema)
testRead(spark.read.orc(dir, dir), data ++ data, schema)
testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema)
// Test explicit calls to single arg method - SPARK-16009
testRead(Option(dir).map(spark.read.orc).get, data, schema)

// Reader, with user specified schema, data should be nulls as schema in file different
// from user schema
val expData = Seq[String](null, null, null)
testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema)
testRead(spark.read.schema(userSchema).orc(dir), expData, userSchema)
testRead(spark.read.schema(userSchema).orc(dir, dir), expData ++ expData, userSchema)
testRead(
spark.read.schema(userSchema).orc(Seq(dir, dir): _*), expData ++ expData, userSchema)
}
}

test("column nullability and comment - write and then read") {
Seq("json", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
Seq("json", "orc", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


Expand Down Expand Up @@ -195,8 +194,19 @@ case class RelationConversions(
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
Expand Down Expand Up @@ -621,4 +621,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
makeOrcFile((1 to 10).map(Tuple1.apply), path2)
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count())
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
Seq(
("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]),
("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i => orcImpl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, I will update #19882 .
I updated in my local and am running some tests.


withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> i) {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(format))
}
}
}
}
}