Skip to content

Commit ce084d3

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-24990][SQL] merge ReadSupport and ReadSupportWithSchema
## What changes were proposed in this pull request? Regarding user-specified schema, data sources may have 3 different behaviors: 1. must have a user-specified schema 2. can't have a user-specified schema 3. can accept the user-specified if it's given, or infer the schema. I added `ReadSupportWithSchema` to support these behaviors, following data source v1. But it turns out we don't need this extra interface. We can just add a `createReader(schema, options)` to `ReadSupport` and make it call `createReader(options)` by default. TODO: also fix the streaming API in followup PRs. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #21946 from cloud-fan/ds-schema.
1 parent 9f55860 commit ce084d3

File tree

7 files changed

+47
-79
lines changed

7 files changed

+47
-79
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.sql.sources.v2;
1919

2020
import org.apache.spark.annotation.InterfaceStability;
21+
import org.apache.spark.sql.sources.DataSourceRegister;
2122
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
23+
import org.apache.spark.sql.types.StructType;
2224

2325
/**
2426
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
@@ -27,6 +29,29 @@
2729
@InterfaceStability.Evolving
2830
public interface ReadSupport extends DataSourceV2 {
2931

32+
/**
33+
* Creates a {@link DataSourceReader} to scan the data from this data source.
34+
*
35+
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
36+
* submitted.
37+
*
38+
* @param schema the user specified schema.
39+
* @param options the options for the returned data source reader, which is an immutable
40+
* case-insensitive string-to-string map.
41+
*
42+
* By default this method throws {@link UnsupportedOperationException}, implementations should
43+
* override this method to handle user specified schema.
44+
*/
45+
default DataSourceReader createReader(StructType schema, DataSourceOptions options) {
46+
String name;
47+
if (this instanceof DataSourceRegister) {
48+
name = ((DataSourceRegister) this).shortName();
49+
} else {
50+
name = this.getClass().getName();
51+
}
52+
throw new UnsupportedOperationException(name + " does not support user specified schema");
53+
}
54+
3055
/**
3156
* Creates a {@link DataSourceReader} to scan the data from this data source.
3257
*

sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@
2323
import org.apache.spark.sql.catalyst.InternalRow;
2424
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2525
import org.apache.spark.sql.sources.v2.ReadSupport;
26-
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
2726
import org.apache.spark.sql.types.StructType;
2827

2928
/**
3029
* A data source reader that is returned by
3130
* {@link ReadSupport#createReader(DataSourceOptions)} or
32-
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
31+
* {@link ReadSupport#createReader(StructType, DataSourceOptions)}.
3332
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
3433
* logic is delegated to {@link InputPartition}s, which are returned by
3534
* {@link #planInputPartitions()}.

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
3737
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
3838
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3939
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
40-
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
40+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
4141
import org.apache.spark.sql.types.{StringType, StructType}
4242
import org.apache.spark.unsafe.types.UTF8String
4343

@@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
194194
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
195195
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
196196
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
197-
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
197+
if (ds.isInstanceOf[ReadSupport]) {
198198
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
199199
ds = ds, conf = sparkSession.sessionState.conf)
200200
val pathsOption = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2424
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2626
import org.apache.spark.sql.sources.DataSourceRegister
27-
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
27+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
2828
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
2929
import org.apache.spark.sql.types.StructType
3030

@@ -110,22 +110,6 @@ object DataSourceV2Relation {
110110
source match {
111111
case support: ReadSupport =>
112112
support
113-
case _: ReadSupportWithSchema =>
114-
// this method is only called if there is no user-supplied schema. if there is no
115-
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
116-
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
117-
case _ =>
118-
throw new AnalysisException(s"Data source is not readable: $name")
119-
}
120-
}
121-
122-
def asReadSupportWithSchema: ReadSupportWithSchema = {
123-
source match {
124-
case support: ReadSupportWithSchema =>
125-
support
126-
case _: ReadSupport =>
127-
throw new AnalysisException(
128-
s"Data source does not support user-supplied schema: $name")
129113
case _ =>
130114
throw new AnalysisException(s"Data source is not readable: $name")
131115
}
@@ -146,7 +130,7 @@ object DataSourceV2Relation {
146130
val v2Options = new DataSourceOptions(options.asJava)
147131
userSpecifiedSchema match {
148132
case Some(s) =>
149-
asReadSupportWithSchema.createReader(s, v2Options)
133+
asReadSupport.createReader(s, v2Options)
150134
case _ =>
151135
asReadSupport.createReader(v2Options)
152136
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import org.apache.spark.sql.catalyst.InternalRow;
2323
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2424
import org.apache.spark.sql.sources.v2.DataSourceV2;
25-
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
25+
import org.apache.spark.sql.sources.v2.ReadSupport;
2626
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
2727
import org.apache.spark.sql.sources.v2.reader.InputPartition;
2828
import org.apache.spark.sql.types.StructType;
2929

30-
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
30+
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupport {
3131

3232
class Reader implements DataSourceReader {
3333
private final StructType schema;
@@ -47,6 +47,11 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
4747
}
4848
}
4949

50+
@Override
51+
public DataSourceReader createReader(DataSourceOptions options) {
52+
throw new IllegalArgumentException("requires a user-supplied schema");
53+
}
54+
5055
@Override
5156
public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
5257
return new Reader(schema);

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ import java.util.{ArrayList, List => JList}
2222
import test.org.apache.spark.sql.sources.v2._
2323

2424
import org.apache.spark.SparkException
25-
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
25+
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2827
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
2928
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
3029
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
@@ -135,8 +134,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
135134
test("schema required data source") {
136135
Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
137136
withClue(cls.getName) {
138-
val e = intercept[AnalysisException](spark.read.format(cls.getName).load())
139-
assert(e.message.contains("requires a user-supplied schema"))
137+
val e = intercept[IllegalArgumentException](spark.read.format(cls.getName).load())
138+
assert(e.getMessage.contains("requires a user-supplied schema"))
140139

141140
val schema = new StructType().add("i", "int").add("s", "string")
142141
val df = spark.read.format(cls.getName).schema(schema).load()
@@ -455,15 +454,20 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
455454
}
456455

457456

458-
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
457+
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupport {
459458

460459
class Reader(val readSchema: StructType) extends DataSourceReader {
461460
override def planInputPartitions(): JList[InputPartition[InternalRow]] =
462461
java.util.Collections.emptyList()
463462
}
464463

465-
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader =
464+
override def createReader(options: DataSourceOptions): DataSourceReader = {
465+
throw new IllegalArgumentException("requires a user-supplied schema")
466+
}
467+
468+
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
466469
new Reader(schema)
470+
}
467471
}
468472

469473
class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {

0 commit comments

Comments
 (0)