Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
* batches of Kafka data in a micro-batch streaming query.
*/
override def createMicroBatchReadSupport(
override def getMicroBatchReadSupport(
metadataPath: String,
options: DataSourceOptions): KafkaMicroBatchReadSupport = {

Expand Down Expand Up @@ -151,7 +151,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
* Kafka data in a continuous streaming query.
*/
override def createContinuousReadSupport(
override def getContinuousReadSupport(
metadataPath: String,
options: DataSourceOptions): KafkaContinuousReadSupport = {
val parameters = options.asMap().asScala.toMap
Expand Down Expand Up @@ -267,7 +267,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

override def createStreamingWriteSupport(
override def getStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
val readSupport = provider.createMicroBatchReadSupport(
val readSupport = provider.getMicroBatchReadSupport(
dir.getAbsolutePath, new DataSourceOptions(options.asJava))
val config = readSupport.newScanConfigBuilder(
KafkaSourceOffset(Map(tp -> 0L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for batch processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this interface (and the continuous and micro-batch equivalents) should note that returning a ReadSupport from options is for sources with no catalog support or to use an implementation directly. Maybe we should add this after #21306 is in though. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I'll add some documents about when this interface will be used (spark.read.format...), which means to use an implementation directly.

*
* This interface is used when end users want to use a data source implementation directly, e.g.
* This interface is used to return {@link BatchReadSupport} instances when end users run
* {@code SparkSession.read.format(...).option(...).load()}.
*/
@InterfaceStability.Evolving
public interface BatchReadSupportProvider extends DataSourceV2 {

/**
* Creates a {@link BatchReadSupport} to scan the data from this data source with a user
* Returns a {@link BatchReadSupport} instance to load the data from this data source with a user
* specified schema.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
Expand All @@ -43,15 +43,15 @@ public interface BatchReadSupportProvider extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) {
default BatchReadSupport getBatchReadSupport(StructType schema, DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}

/**
* Creates a {@link BatchReadSupport} to scan the data from this data source.
* Returns a {@link BatchReadSupport} instance to scan the data from this data source.
*
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
BatchReadSupport createBatchReadSupport(DataSourceOptions options);
BatchReadSupport getBatchReadSupport(DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data writing ability for batch processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* This interface is used to return {@link BatchWriteSupport} instances when end users run
* {@code Dataset.write.format(...).option(...).save()}.
*/
@InterfaceStability.Evolving
public interface BatchWriteSupportProvider extends DataSourceV2 {

/**
* Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done according to the save mode.
* Returns an optional {@link BatchWriteSupport} instance to save the data to this data source.
* Data sources can return None if there is no writing needed to be done according to the save
* mode.
*
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
Expand All @@ -48,7 +49,7 @@ public interface BatchWriteSupportProvider extends DataSourceV2 {
* case-insensitive string-to-string map.
* @return a write support to write data to this data source.
*/
Optional<BatchWriteSupport> createBatchWriteSupport(
Optional<BatchWriteSupport> getBatchWriteSupport(
String queryId,
StructType schema,
SaveMode mode,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue I'd like to keep the save mode before we finish the new DDL logical plans and DataFrameWriter APIs. We are migrating file source and we still need to support df.write.mode("errorIfExists").parquet(...).

After we finish the new DDL logical plans and DataFrameWriter APIs, we can rename it to LegacyBatchWriteSupportProvider and create a new BatchWriteSupportProvider for the new DDL logical plans and DataFrameWriter APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good idea. Why introduce a legacy API into a new API? If we are moving old sources to the new API, then they should fully implement the new API and should not continue to expose the unpredictable v1 behavior.

That said, as long as the TableCatalog makes it in, I don't care what anonymous tables do because I don't intend for any of our sources to use this path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is, if we don't take saveMode, the only end-user API to write to a data source is: df.write.format(...).mode("append").save(). That makes data source v2 totally unusable before we introduce the new write APIs.

I hope we can get this in before Spark 2.4, so that some data source projects can start migrating and experimenting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan: this is a bad idea because it enables save modes other than append for DataSourceV2 without using the new logical plans. This leads to undefined behavior and is why we proposed standard logical plans in the first place.

Using a data source implementation directly should only support appending and scanning anything more complex must require DeleteSupport (see #21308) or TableCatalog (see #21306) and the new logical plans. Otherwise, this will allow sources to expose behavior that we are trying to fix.

I'm -1 on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, your proposal is that we should block the completion of DataSourceV2 until the new logical plans are in place?

Copy link
Contributor

@rdblue rdblue Aug 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the v2 sources should only use the plans proposed in the SPIP. I also think that the v2 data source API should always tell the data source exactly what to do: for overwrite, what should be deleted and what should be added.

That doesn't block fixing the v2 API here and doesn't prevent anyone from using it. But it would prevent people from relying on undefined behavior that results from passing an ambiguous SaveMode to a source.

The only thing that would not be available by doing this is overwrite support by using the SaveMode, which isn't something anyone should rely on because it doesn't have defined behavior.

I understand that this may seem like it would block migration from the v1 API to the v2 API. But I think it is the right thing to do so that we have a clear and consistent definition for how v2 sources behave.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree that SaveMode is a bad API which leads to undefined behavior. That's why we start a project to design new DDL logical plans and write APIs. However, I believe we had an agreement before that we can't remove existing APIs, so the DataFrameWriter and SaveMode will still be there in Spark. If I'm a data source developer, even I've implemented the new write APIs (assuming it's finished), I would still support SaveMode to attract more users. DataFrameWriter is a very widely used API and the end users may need a long time to migrate to the new write APIs. BTW, file source (without catalog) does have a clearly defined behavior regarding SaveMode, we should make it possible to migrate file source to data source v2.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for continuous stream processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* {@code SparkSession.readStream.format(...).option(...).load()}.
* This interface is used to return {@link ContinuousReadSupport} instances when end users run
* {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger.
*/
@InterfaceStability.Evolving
public interface ContinuousReadSupportProvider extends DataSourceV2 {

/**
* Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source with
* a user specified schema.
* Returns a {@link ContinuousReadSupport} instance to scan the data from this streaming data
* source with a user specified schema.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
Expand All @@ -46,23 +46,24 @@ public interface ContinuousReadSupportProvider extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default ContinuousReadSupport createContinuousReadSupport(
default ContinuousReadSupport getContinuousReadSupport(
StructType schema,
String checkpointLocation,
DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}

/**
* Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source.
* Returns a {@link ContinuousReadSupport} instance to scan the data from this streaming data
* source.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Readers for the same logical source in the same query
* will be given the same checkpointLocation.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
ContinuousReadSupport createContinuousReadSupport(
ContinuousReadSupport getContinuousReadSupport(
String checkpointLocation,
DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for micro-batch stream processing.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* {@code SparkSession.readStream.format(...).option(...).load()}.
* This interface is used to return {@link MicroBatchReadSupport} instances when end users run
* {@code SparkSession.readStream.format(...).option(...).load()} with a micro-batch trigger.
*/
@InterfaceStability.Evolving
public interface MicroBatchReadSupportProvider extends DataSourceV2 {

/**
* Creates a {@link MicroBatchReadSupport} to scan the data from this streaming data source with
* a user specified schema.
* Returns a {@link MicroBatchReadSupport} instance to scan the data from this streaming data
* source with a user specified schema.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
Expand All @@ -46,23 +46,24 @@ public interface MicroBatchReadSupportProvider extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default MicroBatchReadSupport createMicroBatchReadSupport(
default MicroBatchReadSupport getMicroBatchReadSupport(
StructType schema,
String checkpointLocation,
DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}

/**
* Creates a {@link MicroBatchReadSupport} to scan the data from this streaming data source.
* Returns a {@link MicroBatchReadSupport} instance to scan the data from this streaming data
* source.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Readers for the same logical source in the same query
* will be given the same checkpointLocation.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
MicroBatchReadSupport createMicroBatchReadSupport(
MicroBatchReadSupport getMicroBatchReadSupport(
String checkpointLocation,
DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data writing ability for structured streaming.
*
* This interface is used when end users want to use a data source implementation directly, e.g.
* {@code Dataset.writeStream.format(...).option(...).save()}.
* This interface is used to return {@link StreamingWriteSupport} instances when end users run
* {@code Dataset.writeStream.format(...).option(...).start()}.
*/
@InterfaceStability.Evolving
public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {

/**
* Creates a {@link StreamingWriteSupport} to save the data to this data source.
* Returns a {@link StreamingWriteSupport} instance to save the data to this data source.
*
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
Expand All @@ -45,7 +45,7 @@ public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreami
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
StreamingWriteSupport createStreamingWriteSupport(
StreamingWriteSupport getStreamingWriteSupport(
String queryId,
StructType schema,
OutputMode mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* An interface that defines how to scan the data from data source for batch processing.
* An interface that defines how to load the data from data source for batch processing.
*
* The execution engine will create an instance of this interface at the start of a batch query,
* then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}. The
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
* query, then call {@link #newScanConfigBuilder()} to create an instance of {@link ScanConfig}. The
* {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
* {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
* factory to process data from the data source.
* factory to scan data from the data source.
*/
@InterfaceStability.Evolving
public interface BatchReadSupport extends ReadSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.spark.annotation.InterfaceStability;

/**
*
* A serializable representation of an input partition returned by
* {@link ReadSupport#planInputPartitions(ScanConfig)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;

/**
* An interface that defines how to scan the data from data source for continuous streaming
* An interface that defines how to load the data from data source for continuous streaming
* processing.
*
* The execution engine will create an instance of this interface at the start of a streaming query,
* then call {@link #newScanConfigBuilder(Offset)} and create an instance of {@link ScanConfig} for
* the duration of the streaming query or until {@link #needsReconfiguration(ScanConfig)} is true.
* The {@link ScanConfig} will be used to create input partitions and reader factory to process data
* for its duration. At the end {@link #stop()} will be called when the streaming execution is
* completed. Note that a single query may have multiple executions due to restart or failure
* recovery.
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset)} to create an instance of
* {@link ScanConfig} for the duration of the streaming query or until
* {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
* input partitions and reader factory to scan data for its duration. At the end {@link #stop()}
* will be called when the streaming execution is completed. Note that a single query may have
* multiple executions due to restart or failure recovery.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add this documentation on the relevant methods. So getContinuousReadSupport and getMicroBatchReadSupport would say something like "Spark will call this method at the beginning of each streaming query to get a ReadSupport", newScanConfigBuilder would say something like "Spark will get a ScanConfig once for each data scanning job".

*/
@InterfaceStability.Evolving
public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
* An interface that defines how to scan the data from data source for micro-batch streaming
* processing.
*
* The execution engine will create an instance of this interface at the start of a streaming query,
* then call {@link #newScanConfigBuilder(Offset, Offset)} and create an instance of
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} to create an instance of
* {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be used to create input
* partitions and reader factory to process a micro-batch. At the end {@link #stop()} will be called
* partitions and reader factory to scan a micro-batch. At the end {@link #stop()} will be called
* when the streaming execution is completed. Note that a single query may have multiple executions
* due to restart or failure recovery.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

} else {
val writer = provider.createBatchWriteSupport(
val writer = provider.getBatchWriteSupport(
UUID.randomUUID().toString,
df.logicalPlan.output.toStructType,
mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ object DataSourceV2Relation {
val v2Options = new DataSourceOptions(options.asJava)
userSpecifiedSchema match {
case Some(s) =>
asReadSupportProvider.createBatchReadSupport(s, v2Options)
asReadSupportProvider.getBatchReadSupport(s, v2Options)
case _ =>
asReadSupportProvider.createBatchReadSupport(v2Options)
asReadSupportProvider.getBatchReadSupport(v2Options)
}
}

def createWriteSupport(
options: Map[String, String],
schema: StructType): BatchWriteSupport = {
asWriteSupportProvider.createBatchWriteSupport(
asWriteSupportProvider.getBatchWriteSupport(
UUID.randomUUID().toString,
schema,
SaveMode.Append,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MicroBatchExecution(
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val readSupport = dataSourceV2.createMicroBatchReadSupport(
val readSupport = dataSourceV2.getMicroBatchReadSupport(
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
Expand Down Expand Up @@ -496,7 +496,7 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: StreamingWriteSupportProvider =>
val writer = s.createStreamingWriteSupport(
val writer = s.getStreamingWriteSupport(
s"$runId",
newAttributePlan.schema,
outputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ConsoleSinkProvider extends DataSourceV2
with DataSourceRegister
with CreatableRelationProvider {

override def createStreamingWriteSupport(
override def getStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ContinuousExecution(
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1

dataSource.createContinuousReadSupport(
dataSource.getContinuousReadSupport(
metadataPath,
new DataSourceOptions(extraReaderOptions.asJava))
}
Expand Down Expand Up @@ -185,7 +185,7 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}

val writer = sink.createStreamingWriteSupport(
val writer = sink.getStreamingWriteSupport(
s"$runId",
triggerLogicalPlan.schema,
outputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa

// ContinuousReadSupportProvider implementation
// This is necessary because of how StreamTest finds the source for AddDataMemory steps.
override def createContinuousReadSupport(
override def getContinuousReadSupport(
checkpointLocation: String,
options: DataSourceOptions): ContinuousReadSupport = this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class ForeachWriteSupportProvider[T](
converter: Either[ExpressionEncoder[T], InternalRow => T])
extends StreamingWriteSupportProvider {

override def createStreamingWriteSupport(
override def getStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
Expand Down
Loading