From 0567514b31be1d78871c5a5c3ffc0d5ac553ee8c Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Jan 2020 15:01:05 -0800 Subject: [PATCH 1/4] disable all the V2 file sources by default --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4471c4d4d1b38..777250effb045 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1728,7 +1728,7 @@ object SQLConf { "implementation class names for which Data Source V2 code path is disabled. These data " + "sources will fallback to Data Source V1 code path.") .stringConf - .createWithDefault("kafka") + .createWithDefault("kafka,parquet,orc,json,csv,text,avro") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + From 412dda1c04f6b46228d04b1adc3119bf5af48815 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Jan 2020 16:19:44 -0800 Subject: [PATCH 2/4] set the conf value in alphabetical order --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 777250effb045..7d8dc88cd6b33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1728,7 +1728,7 @@ object SQLConf { "implementation class names for which Data Source V2 code path is disabled. These data " + "sources will fallback to Data Source V1 code path.") .stringConf - .createWithDefault("kafka,parquet,orc,json,csv,text,avro") + .createWithDefault("avro,csv,json,kafka,orc,parquet,text") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + From 7afda97dcfa24cdb7315bc8cae39cec7bcdfd9b1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Jan 2020 17:32:46 -0800 Subject: [PATCH 3/4] update test suites --- .../spark/sql/connector/FileDataSourceV2FallBackSuite.scala | 6 ++++++ .../datasources/orc/OrcPartitionDiscoverySuite.scala | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index 21938f301dafb..c9ddab33b7122 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} @@ -86,6 +87,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { private val dummyReadOnlyFileSourceV2 = classOf[DummyReadOnlyFileDataSourceV2].getName private val dummyWriteOnlyFileSourceV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "") + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { val df = spark.range(10).toDF() withTempPath { file => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index 5d21ee698f4e6..ba4663a8fb91c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -169,6 +169,11 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSparkSession { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "") + test("read partitioned table - partition key included in orc file") { withTempDir { base => for { From 0aaaa5815ba05b00cd13e9b14f0f839c97f62738 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Jan 2020 20:43:04 -0800 Subject: [PATCH 4/4] update indent --- .../spark/sql/connector/FileDataSourceV2FallBackSuite.scala | 5 +---- .../datasources/orc/OrcPartitionDiscoverySuite.scala | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index c9ddab33b7122..b0da2eb697f36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -87,10 +87,7 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { private val dummyReadOnlyFileSourceV2 = classOf[DummyReadOnlyFileDataSourceV2].getName private val dummyWriteOnlyFileSourceV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName - override protected def sparkConf: SparkConf = - super - .sparkConf - .set(SQLConf.USE_V1_SOURCE_LIST, "") + override protected def sparkConf: SparkConf = super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "") test("Fall back to v1 when writing to file with read only FileDataSourceV2") { val df = spark.range(10).toDF() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index ba4663a8fb91c..ea839b8e1ef10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -169,10 +169,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSparkSession { - override protected def sparkConf: SparkConf = - super - .sparkConf - .set(SQLConf.USE_V1_SOURCE_LIST, "") + override protected def sparkConf: SparkConf = super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "") test("read partitioned table - partition key included in orc file") { withTempDir { base =>