diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala similarity index 84% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index f328567c95d2..0e6171724402 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -30,10 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.types.StructType /** - * A [[ContinuousReadSupport]] for data from kafka. + * A [[ContinuousStream]] for data from kafka. * * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be * read by per-task consumers generated later. @@ -46,17 +45,23 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousStream( offsetReader: KafkaOffsetReader, kafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, initialOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends ContinuousReadSupport with Logging { + extends ContinuousStream with Logging { private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + // Initialized when creating reader factories. If this diverges from the partitions at the latest + // offsets, we need to reconfigure. + // Exposed outside this object only for unit tests. + @volatile private[sql] var knownPartitions: Set[TopicPartition] = _ + + override def initialOffset(): Offset = { val offsets = initialOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) @@ -67,18 +72,32 @@ class KafkaContinuousReadSupport( offsets } - override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema - - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { - new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss) - } - override def deserializeOffset(json: String): Offset = { KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { - val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets + override def planInputPartitions(start: Offset): Array[InputPartition] = { + val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start) + + val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet + val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) + val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) + + val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) + if (deletedPartitions.nonEmpty) { + val message = if ( + offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" + } else { + s"$deletedPartitions are gone. Some data may have been missed." + } + reportDataLoss(message) + } + + val startOffsets = newPartitionOffsets ++ + oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) + knownPartitions = startOffsets.keySet + startOffsets.toSeq.map { case (topicPartition, start) => KafkaContinuousInputPartition( @@ -86,8 +105,7 @@ class KafkaContinuousReadSupport( }.toArray } - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = { KafkaContinuousReaderFactory } @@ -105,8 +123,7 @@ class KafkaContinuousReadSupport( KafkaSourceOffset(mergedMap) } - override def needsReconfiguration(config: ScanConfig): Boolean = { - val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions + override def needsReconfiguration(): Boolean = { offsetReader.fetchLatestOffsets(None).keySet != knownPartitions } @@ -151,47 +168,6 @@ object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory { } } -class KafkaContinuousScanConfigBuilder( - schema: StructType, - startOffset: Offset, - offsetReader: KafkaOffsetReader, - reportDataLoss: String => Unit) - extends ScanConfigBuilder { - - override def build(): ScanConfig = { - val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset) - - val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet - val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) - val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) - - val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) - if (deletedPartitions.nonEmpty) { - val message = if ( - offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" - } else { - s"$deletedPartitions are gone. Some data may have been missed." - } - reportDataLoss(message) - } - - val startOffsets = newPartitionOffsets ++ - oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) - KafkaContinuousScanConfig(schema, startOffsets) - } -} - -case class KafkaContinuousScanConfig( - readSchema: StructType, - startOffsets: Map[TopicPartition, Long]) - extends ScanConfig { - - // Created when building the scan config builder. If this diverges from the partitions at the - // latest offsets, we need to reconfigure the kafka read support. - def knownPartitions: Set[TopicPartition] = startOffsets.keySet -} - /** * A per-task data reader for continuous Kafka processing. * diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 58c90b897091..9238899b0c00 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} -import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -48,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamingWriteSupportProvider - with ContinuousReadSupportProvider with TableProvider with Logging { import KafkaSourceProvider._ @@ -107,46 +106,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister new KafkaTable(strategy(options.asMap().asScala.toMap)) } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read - * Kafka data in a continuous streaming query. - */ - override def createContinuousReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaContinuousReadSupport = { - val parameters = options.asMap().asScala.toMap - validateStreamOptions(parameters) - // Each running query should use its own group id. Otherwise, the query may be only assigned - // partial data since Kafka will assign partitions to multiple consumers having the same group - // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) - - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap - - val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - - val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") - - new KafkaContinuousReadSupport( - kafkaOffsetReader, - kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - parameters, - metadataPath, - startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) - } - /** * Returns a new base relation with the given parameters. * @@ -406,7 +365,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } class KafkaTable(strategy: => ConsumerStrategy) extends Table - with SupportsMicroBatchRead { + with SupportsMicroBatchRead with SupportsContinuousRead { override def name(): String = s"Kafka $strategy" @@ -449,6 +408,40 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister startingStreamOffsets, failOnDataLoss(caseInsensitiveParams)) } + + override def toContinuousStream(checkpointLocation: String): ContinuousStream = { + val parameters = options.asMap().asScala.toMap + validateStreamOptions(parameters) + // Each running query should use its own group id. Otherwise, the query may be only assigned + // partial data since Kafka will assign partitions to multiple consumers having the same group + // id. Hence, we should generate a unique id for each query. + val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation) + + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } + val specifiedKafkaParams = + parameters + .keySet + .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) + .map { k => k.drop(6).toString -> parameters(k) } + .toMap + + val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + + val kafkaOffsetReader = new KafkaOffsetReader( + strategy(caseInsensitiveParams), + kafkaParamsForDriver(specifiedKafkaParams), + parameters, + driverGroupIdPrefix = s"$uniqueGroupId-driver") + + new KafkaContinuousStream( + kafkaOffsetReader, + kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), + parameters, + checkpointLocation, + startingStreamOffsets, + failOnDataLoss(caseInsensitiveParams)) + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 2f7fd7f7d47b..be0cea212f50 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -209,11 +209,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { assert( query.lastExecution.executedPlan.collectFirst { case scan: ContinuousScanExec - if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] - }.exists { config => + if scan.stream.isInstanceOf[KafkaContinuousStream] => + scan.stream.asInstanceOf[KafkaContinuousStream] + }.exists { stream => // Ensure the new topic is present and the old topic is gone. - config.knownPartitions.exists(_.topic == topic2) + stream.knownPartitions.exists(_.topic == topic2) }, s"query never reconfigured to new topic $topic2") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index fa3b623586aa..ad1c2c59d9c8 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -48,8 +48,8 @@ trait KafkaContinuousTest extends KafkaSourceTest { assert( query.lastExecution.executedPlan.collectFirst { case scan: ContinuousScanExec - if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] + if scan.stream.isInstanceOf[KafkaContinuousStream] => + scan.stream.asInstanceOf[KafkaContinuousStream] }.exists(_.knownPartitions.size == newCount), s"query never reconfigured to $newCount partitions") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 90b501573a95..aa7baac2e9e2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -28,14 +28,13 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random -import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} -import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution @@ -118,17 +117,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf val sources: Seq[BaseStreamingSource] = { query.get.logicalPlan.collect { case StreamingExecutionRelation(source: KafkaSource, _) => source - case r: StreamingDataSourceV2Relation - if r.stream.isInstanceOf[KafkaMicroBatchStream] => - r.stream.asInstanceOf[KafkaMicroBatchStream] - } ++ (query.get.lastExecution match { - case null => Seq() - case e => e.logical.collect { - case r: OldStreamingDataSourceV2Relation - if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - r.readSupport.asInstanceOf[KafkaContinuousReadSupport] - } - }) + case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] || + r.stream.isInstanceOf[KafkaContinuousStream] => + r.stream + } }.distinct if (sources.isEmpty) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java deleted file mode 100644 index 2a4933d75e8d..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; -import org.apache.spark.sql.sources.v2.reader.BatchReadSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability for batch processing. - * - * This interface is used to create {@link BatchReadSupport} instances when end users run - * {@code SparkSession.read.format(...).option(...).load()}. - */ -@Evolving -public interface BatchReadSupportProvider extends DataSourceV2 { - - /** - * Creates a {@link BatchReadSupport} instance to load the data from this data source with a user - * specified schema, which is called by Spark at the beginning of each batch query. - * - * Spark will call this method at the beginning of each batch query to create a - * {@link BatchReadSupport} instance. - * - * By default this method throws {@link UnsupportedOperationException}, implementations should - * override this method to handle user specified schema. - * - * @param schema the user specified schema. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) { - return DataSourceV2Utils.failForUserSpecifiedSchema(this); - } - - /** - * Creates a {@link BatchReadSupport} instance to scan the data from this data source, which is - * called by Spark at the beginning of each batch query. - * - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - BatchReadSupport createBatchReadSupport(DataSourceOptions options); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java deleted file mode 100644 index b4f2eb34a156..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability for continuous stream processing. - * - * This interface is used to create {@link ContinuousReadSupport} instances when end users run - * {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger. - */ -@Evolving -public interface ContinuousReadSupportProvider extends DataSourceV2 { - - /** - * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data - * source with a user specified schema, which is called by Spark at the beginning of each - * continuous streaming query. - * - * By default this method throws {@link UnsupportedOperationException}, implementations should - * override this method to handle user specified schema. - * - * @param schema the user provided schema. - * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure - * recovery. Readers for the same logical source in the same query - * will be given the same checkpointLocation. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - default ContinuousReadSupport createContinuousReadSupport( - StructType schema, - String checkpointLocation, - DataSourceOptions options) { - return DataSourceV2Utils.failForUserSpecifiedSchema(this); - } - - /** - * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data - * source, which is called by Spark at the beginning of each continuous streaming query. - * - * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure - * recovery. Readers for the same logical source in the same query - * will be given the same checkpointLocation. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - ContinuousReadSupport createContinuousReadSupport( - String checkpointLocation, - DataSourceOptions options); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index 4aaa57dd4db9..43bdcca70cb0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -20,7 +20,7 @@ import org.apache.spark.annotation.Evolving; /** - * TODO: remove it when we finish the API refactor for streaming side. + * TODO: remove it when we finish the API refactor for streaming write side. */ @Evolving public interface DataSourceV2 {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java similarity index 59% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java index 4922962f7065..b7fa3f24a238 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java @@ -15,16 +15,20 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader; +package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; /** - * An interface for building the {@link ScanConfig}. Implementations can mixin those - * SupportsPushDownXYZ interfaces to do operator pushdown, and keep the operator pushdown result in - * the returned {@link ScanConfig}. + * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with + * continuous mode. + *

+ * If a {@link Table} implements this interface, the + * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that + * builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented. + *

*/ @Evolving -public interface ScanConfigBuilder { - ScanConfig build(); -} +public interface SupportsContinuousRead extends SupportsRead { } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java index bcfa1983abb8..28d80b7a5bc3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java @@ -42,7 +42,7 @@ public interface Batch { InputPartition[] planInputPartitions(); /** - * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. + * Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}. */ PartitionReaderFactory createReaderFactory(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java deleted file mode 100644 index 518a8b03a2c6..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.Evolving; - -/** - * An interface that defines how to load the data from data source for batch processing. - * - * The execution engine will get an instance of this interface from a data source provider - * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch - * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}. - * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in - * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader - * factory to scan data from the data source with a Spark job. - */ -@Evolving -public interface BatchReadSupport extends ReadSupport { - - /** - * Returns a builder of {@link ScanConfig}. Spark will call this method and create a - * {@link ScanConfig} for each data scanning job. - * - * The builder can take some query specific information to do operators pushdown, and keep these - * information in the created {@link ScanConfig}. - * - * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs - * to take {@link ScanConfig} as an input. - */ - ScanConfigBuilder newScanConfigBuilder(); - - /** - * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. - */ - PartitionReaderFactory createReaderFactory(ScanConfig config); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java index 5f5248084bad..413349782efa 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java @@ -23,7 +23,7 @@ /** * A serializable representation of an input partition returned by - * {@link ReadSupport#planInputPartitions(ScanConfig)}. + * {@link Batch#planInputPartitions()} and the corresponding ones in streaming . * * Note that {@link InputPartition} will be serialized and sent to executors, then * {@link PartitionReader} will be created by diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java deleted file mode 100644 index 347a465905ac..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; - -/** - * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to - * report data partitioning and try to avoid shuffle at Spark side. - * - * Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition}, - * Spark may avoid adding a shuffle even if the reader does not implement this interface. - */ -@Evolving -// TODO: remove it, after we finish the API refactor completely. -public interface OldSupportsReportPartitioning extends ReadSupport { - - /** - * Returns the output data partitioning that this reader guarantees. - */ - Partitioning outputPartitioning(ScanConfig config); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java deleted file mode 100644 index 0d3ec17107c1..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.Evolving; - -/** - * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to - * report statistics to Spark. - * - * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the - * data source. Implementations that return more accurate statistics based on pushed operators will - * not improve query performance until the planner can push operators before getting stats. - */ -@Evolving -// TODO: remove it, after we finish the API refactor completely. -public interface OldSupportsReportStatistics extends ReadSupport { - - /** - * Returns the estimated statistics of this data source scan. - */ - Statistics estimateStatistics(ScanConfig config); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java deleted file mode 100644 index b1f610a82e8a..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.types.StructType; - -/** - * The base interface for all the batch and streaming read supports. Data sources should implement - * concrete read support interfaces like {@link BatchReadSupport}. - * - * If Spark fails to execute any methods in the implementations of this interface (by throwing an - * exception), the read action will fail and no Spark job will be submitted. - */ -@Evolving -public interface ReadSupport { - - /** - * Returns the full schema of this data source, which is usually the physical schema of the - * underlying storage. This full schema should not be affected by column pruning or other - * optimizations. - */ - StructType fullSchema(); - - /** - * Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition} - * represents a data split that can be processed by one Spark task. The number of input - * partitions returned here is the same as the number of RDD partitions this scan outputs. - * - * Note that, this may not be a full scan if the data source supports optimization like filter - * push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting - * {@link InputPartition input partitions}. - */ - InputPartition[] planInputPartitions(ScanConfig config); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java index c60fb2ba0b0b..25ab06eee42e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java @@ -18,9 +18,11 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream; import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.SupportsContinuousRead; import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead; import org.apache.spark.sql.sources.v2.Table; @@ -65,7 +67,7 @@ default String description() { * @throws UnsupportedOperationException */ default Batch toBatch() { - throw new UnsupportedOperationException("Batch scans are not supported"); + throw new UnsupportedOperationException(description() + ": Batch scan are not supported"); } /** @@ -81,6 +83,22 @@ default Batch toBatch() { * @throws UnsupportedOperationException */ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { - throw new UnsupportedOperationException("Micro-batch scans are not supported"); + throw new UnsupportedOperationException(description() + ": Micro-batch scan are not supported"); + } + + /** + * Returns the physical representation of this scan for streaming query with continuous mode. By + * default this method throws exception, data sources must overwrite this method to provide an + * implementation, if the {@link Table} that creates this scan implements + * {@link SupportsContinuousRead}. + * + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Data streams for the same logical source in the same query + * will be given the same checkpointLocation. + * + * @throws UnsupportedOperationException + */ + default ContinuousStream toContinuousStream(String checkpointLocation) { + throw new UnsupportedOperationException(description() + ": Continuous scan are not supported"); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java deleted file mode 100644 index c8cff68c2ef7..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.types.StructType; - -/** - * An interface that carries query specific information for the data scanning job, like operator - * pushdown information and streaming query offsets. This is defined as an empty interface, and data - * sources should define their own {@link ScanConfig} classes. - * - * For APIs that take a {@link ScanConfig} as input, like - * {@link ReadSupport#planInputPartitions(ScanConfig)}, - * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and - * {@link OldSupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need - * to cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. - */ -@Evolving -public interface ScanConfig { - - /** - * Returns the actual schema of this data source reader, which may be different from the physical - * schema of the underlying storage, as column pruning or other optimizations may happen. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - */ - StructType readSchema(); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java index 60e71c5dd008..862bd14bffed 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java @@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns extends ScanBuilder { * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. * - * Note that, {@link ScanConfig#readSchema()} implementation should take care of the column + * Note that, {@link Scan#readSchema()} implementation should take care of the column * pruning applied here. */ void pruneColumns(StructType requiredSchema); diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java index ba175812a88d..4ce97bc5e76b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java @@ -21,14 +21,14 @@ import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; /** - * A mix in interface for {@link Batch}. Data sources can implement this interface to + * A mix in interface for {@link Scan}. Data sources can implement this interface to * report data partitioning and try to avoid shuffle at Spark side. * - * Note that, when a {@link Batch} implementation creates exactly one {@link InputPartition}, + * Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition}, * Spark may avoid adding a shuffle even if the reader does not implement this interface. */ @Evolving -public interface SupportsReportPartitioning extends Batch { +public interface SupportsReportPartitioning extends Scan { /** * Returns the output data partitioning that this reader guarantees. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java index d9f5fb64083a..d7364af69e89 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java @@ -20,7 +20,7 @@ import org.apache.spark.annotation.Evolving; /** - * A mix in interface for {@link Batch}. Data sources can implement this interface to + * A mix in interface for {@link Scan}. Data sources can implement this interface to * report statistics to Spark. * * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the @@ -28,7 +28,7 @@ * not improve query performance until the planner can push operators before getting stats. */ @Evolving -public interface SupportsReportStatistics extends Batch { +public interface SupportsReportStatistics extends Scan { /** * Returns the estimated statistics of this data source scan. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java deleted file mode 100644 index 2b784ac0e9f3..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.BaseStreamingSource; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.ScanConfig; -import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; - -/** - * An interface that defines how to load the data from data source for continuous streaming - * processing. - * - * The execution engine will get an instance of this interface from a data source provider - * (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a - * streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of - * {@link ScanConfig} for the duration of the streaming query or until - * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create - * input partitions and reader factory to scan data with a Spark job for its duration. At the end - * {@link #stop()} will be called when the streaming execution is completed. Note that a single - * query may have multiple executions due to restart or failure recovery. - */ -@Evolving -public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { - - /** - * Returns a builder of {@link ScanConfig}. Spark will call this method and create a - * {@link ScanConfig} for each data scanning job. - * - * The builder can take some query specific information to do operators pushdown, store streaming - * offsets, etc., and keep these information in the created {@link ScanConfig}. - * - * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} - * needs to take {@link ScanConfig} as an input. - */ - ScanConfigBuilder newScanConfigBuilder(Offset start); - - /** - * Returns a factory, which produces one {@link ContinuousPartitionReader} for one - * {@link InputPartition}. - */ - ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config); - - /** - * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances - * for each partition to a single global offset. - */ - Offset mergeOffsets(PartitionOffset[] offsets); - - /** - * The execution engine will call this method in every epoch to determine if new input - * partitions need to be generated, which may be required if for example the underlying - * source system has had partitions added or removed. - * - * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} - * instance. - */ - default boolean needsReconfiguration(ScanConfig config) { - return false; - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java new file mode 100644 index 000000000000..fff5b95a4de1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.Scan; + +/** + * A {@link SparkDataStream} for streaming queries with continuous mode. + */ +@Evolving +public interface ContinuousStream extends SparkDataStream { + + /** + * Returns a list of {@link InputPartition input partitions} given the start offset. Each + * {@link InputPartition} represents a data split that can be processed by one Spark task. The + * number of input partitions returned here is the same as the number of RDD partitions this scan + * outputs. + *

+ * If the {@link Scan} supports filter pushdown, this stream is likely configured with a filter + * and is responsible for creating splits for that filter, which is not a full scan. + *

+ *

+ * This method will be called to launch one Spark job for reading the data stream. It will be + * called more than once, if {@link #needsReconfiguration()} returns true and Spark needs to + * launch a new job. + *

+ */ + InputPartition[] planInputPartitions(Offset start); + + /** + * Returns a factory to create a {@link ContinuousPartitionReader} for each + * {@link InputPartition}. + */ + ContinuousPartitionReaderFactory createContinuousReaderFactory(); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the Spark job to scan this continuous data stream will be interrupted and Spark will + * launch it again with a new list of {@link InputPartition input partitions}. + */ + default boolean needsReconfiguration() { + return false; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java index 2fb3957293df..330f07ba4f2f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java @@ -51,7 +51,7 @@ public interface MicroBatchStream extends SparkDataStream { InputPartition[] planInputPartitions(Offset start, Offset end); /** - * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. + * Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}. */ PartitionReaderFactory createReaderFactory(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java index 67bff0c27e8a..a06671383ac5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java @@ -21,7 +21,7 @@ /** * An abstract representation of progress through a {@link MicroBatchStream} or - * {@link ContinuousReadSupport}. + * {@link ContinuousStream}. * During execution, offsets provided by the data source implementation will be logged and used as * restart checkpoints. Each source should provide an offset implementation which the source can use * to reconstruct a position in the stream up to which data has been seen/processed. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java index 8ea34be8d839..30f38ce37c40 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java @@ -24,7 +24,8 @@ * The base interface representing a readable data stream in a Spark streaming query. It's * responsible to manage the offsets of the streaming source in the streaming query. * - * Data sources should implement concrete data stream interfaces: {@link MicroBatchStream}. + * Data sources should implement concrete data stream interfaces: + * {@link MicroBatchStream} and {@link ContinuousStream}. */ @Evolving public interface SparkDataStream extends BaseStreamingSource { diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java deleted file mode 100644 index 9a8c1bdd23be..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.spark.sql.sources.v2.reader.ReadSupport; - -/** - * A base interface for streaming read support. Data sources should implement concrete streaming - * read support interfaces: {@link ContinuousReadSupport}. - * This is exposed for a testing purpose. - */ -@VisibleForTesting -public interface StreamingReadSupport extends ReadSupport { - - /** - * Returns the initial offset for a streaming query to start reading from. Note that the - * streaming data source should not assume that it will start reading from its initial offset: - * if Spark is restarting an existing query, it will restart from the check-pointed offset rather - * than the initial one. - */ - Offset initialOffset(); - - /** - * Deserialize a JSON string into an Offset of the implementation-defined offset type. - * - * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader - */ - Offset deserializeOffset(String json); - - /** - * Informs the source that Spark has completed processing all data for offsets less than or - * equal to `end` and will only request offsets greater than `end` in the future. - */ - void commit(Offset end); -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala new file mode 100644 index 000000000000..c7fcc6723e45 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.sources.v2.reader._ + +/** + * Physical plan node for scanning a batch of data from a data source v2. + */ +case class BatchScanExec( + output: Seq[AttributeReference], + @transient scan: Scan) extends DataSourceV2ScanExecBase { + + @transient lazy val batch = scan.toBatch + + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. + override def equals(other: Any): Boolean = other match { + case other: BatchScanExec => this.batch == other.batch + case _ => false + } + + override def hashCode(): Int = batch.hashCode() + + override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + + override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + + override lazy val inputRDD: RDD[InternalRow] = { + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index c735b0ef68a0..f54ff608a53e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -20,99 +20,44 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.catalyst.plans.physical.SinglePartition -import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, Offset} /** * Physical plan node for scanning data from a streaming data source with continuous mode. */ -// TODO: merge it and `MicroBatchScanExec`. case class ContinuousScanExec( - output: Seq[AttributeReference], - @transient source: DataSourceV2, - @transient options: Map[String, String], - @transient pushedFilters: Seq[Expression], - @transient readSupport: ReadSupport, - @transient scanConfig: ScanConfig) - extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { - - override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields) + output: Seq[Attribute], + @transient scan: Scan, + @transient stream: ContinuousStream, + @transient start: Offset) extends DataSourceV2ScanExecBase { // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { - case other: ContinuousScanExec => - output == other.output && readSupport.getClass == other.readSupport.getClass && - options == other.options + case other: ContinuousScanExec => this.stream == other.stream case _ => false } - override def hashCode(): Int = { - Seq(output, source, options).hashCode() - } - - override def outputPartitioning: physical.Partitioning = readSupport match { - case _ if partitions.length == 1 => - SinglePartition + override def hashCode(): Int = stream.hashCode() - case s: OldSupportsReportPartitioning => - new DataSourcePartitioning( - s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) + override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start) - case _ => super.outputPartitioning + override lazy val readerFactory: ContinuousPartitionReaderFactory = { + stream.createContinuousReaderFactory() } - private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) - - private lazy val readerFactory = readSupport match { - case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig) - case _ => throw new IllegalStateException("unknown read support: " + readSupport) - } - - override val supportsBatch: Boolean = { - require(partitions.forall(readerFactory.supportColumnarReads) || - !partitions.exists(readerFactory.supportColumnarReads), - "Cannot mix row-based and columnar input partitions.") - - partitions.exists(readerFactory.supportColumnarReads) - } - - private lazy val inputRDD: RDD[InternalRow] = readSupport match { - case _: ContinuousReadSupport => - assert(!supportsBatch, - "continuous stream reader does not support columnar read yet.") - EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - sparkContext.env) - .askSync[Unit](SetReaderPartitions(partitions.size)) - new ContinuousDataSourceRDD( - sparkContext, - sqlContext.conf.continuousStreamingExecutorQueueSize, - sqlContext.conf.continuousStreamingExecutorPollIntervalMs, - partitions, - schema, - readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) - - case _ => - new DataSourceRDD( - sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch) - } - - override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) - - override protected def doExecute(): RDD[InternalRow] = { - if (supportsBatch) { - WholeStageCodegenExec(this)(codegenStageId = 0).execute() - } else { - val numOutputRows = longMetric("numOutputRows") - inputRDD.map { r => - numOutputRows += 1 - r - } - } + override lazy val inputRDD: RDD[InternalRow] = { + EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) + .askSync[Unit](SetReaderPartitions(partitions.size)) + new ContinuousDataSourceRDD( + sparkContext, + sqlContext.conf.continuousStreamingExecutorQueueSize, + sqlContext.conf.continuousStreamingExecutorPollIntervalMs, + partitions, + schema, + readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 63e97e67dc64..47cf26dc9481 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.v2._ @@ -58,8 +58,6 @@ case class DataSourceV2Relation( case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}") } - - def newWriteBuilder(schema: StructType): WriteBuilder = table match { case s: SupportsBatchWrite => val dsOptions = new DataSourceOptions(options.asJava) @@ -94,7 +92,7 @@ case class DataSourceV2Relation( */ case class StreamingDataSourceV2Relation( output: Seq[Attribute], - scanDesc: String, + scan: Scan, stream: SparkDataStream, startOffset: Option[Offset] = None, endOffset: Option[Offset] = None) @@ -104,7 +102,7 @@ case class StreamingDataSourceV2Relation( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) - override def computeStats(): Statistics = stream match { + override def computeStats(): Statistics = scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) @@ -113,46 +111,6 @@ case class StreamingDataSourceV2Relation( } } -// TODO: remove it after finish API refactor for continuous streaming. -case class OldStreamingDataSourceV2Relation( - output: Seq[AttributeReference], - source: DataSourceV2, - options: Map[String, String], - readSupport: ReadSupport, - scanConfigBuilder: ScanConfigBuilder) - extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat { - - override def isStreaming: Boolean = true - - override def simpleString(maxFields: Int): String = { - "Streaming RelationV2 " + metadataString(maxFields) - } - - override def pushedFilters: Seq[Expression] = Nil - - override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) - - // TODO: unify the equal/hashCode implementation for all data source v2 query plans. - override def equals(other: Any): Boolean = other match { - case other: OldStreamingDataSourceV2Relation => - output == other.output && readSupport.getClass == other.readSupport.getClass && - options == other.options - case _ => false - } - - override def hashCode(): Int = { - Seq(output, source, options).hashCode() - } - - override def computeStats(): Statistics = readSupport match { - case r: OldSupportsReportStatistics => - val statistics = r.estimateStatistics(scanConfigBuilder.build()) - Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) - case _ => - Statistics(sizeInBytes = conf.defaultSizeInBytes) - } -} - object DataSourceV2Relation { def create(table: Table, options: Map[String, String]): DataSourceV2Relation = { val output = table.schema().toAttributes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala similarity index 66% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 53e4e77c65e2..da71e7873b52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -19,39 +19,26 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.AttributeMap import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning} -/** - * Physical plan node for scanning a batch of data from a data source. - */ -case class DataSourceV2ScanExec( - output: Seq[AttributeReference], - scanDesc: String, - @transient batch: Batch) - extends LeafExecNode with ColumnarBatchScan { - - override def simpleString(maxFields: Int): String = { - s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc" - } +trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { - // TODO: unify the equal/hashCode implementation for all data source v2 query plans. - override def equals(other: Any): Boolean = other match { - case other: DataSourceV2ScanExec => this.batch == other.batch - case _ => false - } + def scan: Scan - override def hashCode(): Int = batch.hashCode() + def partitions: Seq[InputPartition] - private lazy val partitions = batch.planInputPartitions() + def readerFactory: PartitionReaderFactory - private lazy val readerFactory = batch.createReaderFactory() + override def simpleString(maxFields: Int): String = { + s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" + } - override def outputPartitioning: physical.Partitioning = batch match { + override def outputPartitioning: physical.Partitioning = scan match { case _ if partitions.length == 1 => SinglePartition @@ -70,13 +57,11 @@ case class DataSourceV2ScanExec( partitions.exists(readerFactory.supportColumnarReads) } - private lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch) - } + def inputRDD: RDD[InternalRow] override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) - override protected def doExecute(): RDD[InternalRow] = { + override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { WholeStageCodegenExec(this)(codegenStageId = 0).execute() } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b4c547104c4b..40ac5cf40298 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode object DataSourceV2Strategy extends Strategy { @@ -117,7 +117,7 @@ object DataSourceV2Strategy extends Strategy { |Output: ${output.mkString(", ")} """.stripMargin) - val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch) + val plan = BatchScanExec(output, scan) val filterCondition = postScanFilters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan) @@ -130,15 +130,14 @@ object DataSourceV2Strategy extends Strategy { // ensure there is a projection, which will produce unsafe rows required by some operators ProjectExec(r.output, MicroBatchScanExec( - r.output, r.scanDesc, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil + r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil - case r: OldStreamingDataSourceV2Relation => - // TODO: support operator pushdown for streaming data sources. - val scanConfig = r.scanConfigBuilder.build() + case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty => + val continuousStream = r.stream.asInstanceOf[ContinuousStream] // ensure there is a projection, which will produce unsafe rows required by some operators ProjectExec(r.output, ContinuousScanExec( - r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil + r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil @@ -158,8 +157,7 @@ object DataSourceV2Strategy extends Strategy { case Repartition(1, false, child) => val isContinuous = child.find { - case s: OldStreamingDataSourceV2Relation => - s.readSupport.isInstanceOf[ContinuousReadSupport] + case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[ContinuousStream] case _ => false }.isDefined diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index feea8bcb80c8..d2e33d4fa77c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -19,12 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} -import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.catalyst.plans.physical.SinglePartition -import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset} /** @@ -32,14 +28,10 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offse */ case class MicroBatchScanExec( output: Seq[Attribute], - scanDesc: String, + @transient scan: Scan, @transient stream: MicroBatchStream, @transient start: Offset, - @transient end: Offset) extends LeafExecNode with ColumnarBatchScan { - - override def simpleString(maxFields: Int): String = { - s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc" - } + @transient end: Offset) extends DataSourceV2ScanExecBase { // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { @@ -49,44 +41,11 @@ case class MicroBatchScanExec( override def hashCode(): Int = stream.hashCode() - private lazy val partitions = stream.planInputPartitions(start, end) - - private lazy val readerFactory = stream.createReaderFactory() - - override def outputPartitioning: physical.Partitioning = stream match { - case _ if partitions.length == 1 => - SinglePartition - - case s: SupportsReportPartitioning => - new DataSourcePartitioning( - s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) - - case _ => super.outputPartitioning - } - - override def supportsBatch: Boolean = { - require(partitions.forall(readerFactory.supportColumnarReads) || - !partitions.exists(readerFactory.supportColumnarReads), - "Cannot mix row-based and columnar input partitions.") + override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end) - partitions.exists(readerFactory.supportColumnarReads) - } + override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() - private lazy val inputRDD: RDD[InternalRow] = { + override lazy val inputRDD: RDD[InternalRow] = { new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch) } - - override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) - - override protected def doExecute(): RDD[InternalRow] = { - if (supportsBatch) { - WholeStageCodegenExec(this)(codegenStageId = 0).execute() - } else { - val numOutputRows = longMetric("numOutputRows") - inputRDD.map { r => - numOutputRows += 1 - r - } - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 64270e1f44a2..c34dce2b24e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -99,7 +99,7 @@ class MicroBatchExecution( // TODO: operator pushdown. val scan = table.newScanBuilder(dsOptions).build() val stream = scan.toMicroBatchStream(metadataPath) - StreamingDataSourceV2Relation(output, scan.description(), stream) + StreamingDataSourceV2Relation(output, scan, stream) }) case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) => v2ToExecutionRelationMap.getOrElseUpdate(s, { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala deleted file mode 100644 index 1be071614d92..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.apache.spark.sql.sources.v2.reader.{ScanConfig, ScanConfigBuilder} -import org.apache.spark.sql.types.StructType - -/** - * A very simple [[ScanConfigBuilder]] implementation that creates a simple [[ScanConfig]] to - * carry schema and offsets for streaming data sources. - */ -class SimpleStreamingScanConfigBuilder( - schema: StructType, - start: Offset, - end: Option[Offset] = None) - extends ScanConfigBuilder { - - override def build(): ScanConfig = SimpleStreamingScanConfig(schema, start, end) -} - -case class SimpleStreamingScanConfig( - readSchema: StructType, - start: Offset, - end: Option[Offset]) - extends ScanConfig diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 535fa1c70b3f..83d38dcade7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceV2, Table} +import org.apache.spark.sql.sources.v2.{DataSourceV2, Table} object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { @@ -110,30 +110,6 @@ case class StreamingRelationV2( override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session) } -/** - * Used to link a [[DataSourceV2]] into a continuous processing execution. - */ -case class ContinuousExecutionRelation( - source: ContinuousReadSupportProvider, - extraOptions: Map[String, String], - output: Seq[Attribute])(session: SparkSession) - extends LeafNode with MultiInstanceRelation { - - override def otherCopyArgs: Seq[AnyRef] = session :: Nil - override def isStreaming: Boolean = true - override def toString: String = source.toString - - // There's no sensible value here. On the execution path, this relation will be - // swapped out with microbatches. But some dataframe operations (in particular explain) do lead - // to this node surviving analysis. So we satisfy the LeafNode contract with the session default - // value. - override def computeStats(): Statistics = Statistics( - sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) - ) - - override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session) -} - /** * A dummy physical plan for [[StreamingRelation]] to support * [[org.apache.spark.sql.Dataset.explain]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index c74fa141372d..b22795d20776 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -22,20 +22,18 @@ import java.util.concurrent.TimeUnit import java.util.function.UnaryOperator import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} +import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{ContinuousScanExec, OldStreamingDataSourceV2Relation} -import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 -import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider} -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -54,25 +52,39 @@ class ContinuousExecution( sparkSession, name, checkpointRoot, analyzedPlan, sink, trigger, triggerClock, outputMode, deleteCheckpointOnStop) { - @volatile protected var continuousSources: Seq[ContinuousReadSupport] = Seq() - override protected def sources: Seq[BaseStreamingSource] = continuousSources + @volatile protected var sources: Seq[ContinuousStream] = Seq() // For use only in test harnesses. private[sql] var currentEpochCoordinatorId: String = _ override val logicalPlan: LogicalPlan = { - val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() - analyzedPlan.transform { - case r @ StreamingRelationV2( - source: ContinuousReadSupportProvider, _, _, extraReaderOptions, output, _) => - // TODO: shall we create `ContinuousReadSupport` here instead of each reconfiguration? - toExecutionRelationMap.getOrElseUpdate(r, { - ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) + val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() + var nextSourceId = 0 + val _logicalPlan = analyzedPlan.transform { + case s @ StreamingRelationV2( + ds, dsName, table: SupportsContinuousRead, options, output, _) => + v2ToRelationMap.getOrElseUpdate(s, { + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + nextSourceId += 1 + logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") + val dsOptions = new DataSourceOptions(options.asJava) + // TODO: operator pushdown. + val scan = table.newScanBuilder(dsOptions).build() + val stream = scan.toContinuousStream(metadataPath) + StreamingDataSourceV2Relation(output, scan, stream) }) + case StreamingRelationV2(_, sourceName, _, _, _, _) => throw new UnsupportedOperationException( s"Data source $sourceName does not support continuous processing.") } + + sources = _logicalPlan.collect { + case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream] + } + uniqueSources = sources.distinct + + _logicalPlan } private val triggerExecutor = trigger match { @@ -92,6 +104,8 @@ class ContinuousExecution( do { runContinuous(sparkSessionForStream) } while (state.updateAndGet(stateUpdate) == ACTIVE) + + stopSources() } /** @@ -135,7 +149,7 @@ class ContinuousExecution( updateStatusMessage("Starting new streaming query") logInfo(s"Starting new streaming query.") currentBatchId = 0 - OffsetSeq.fill(continuousSources.map(_ => null): _*) + OffsetSeq.fill(sources.map(_ => null): _*) } } @@ -144,47 +158,17 @@ class ContinuousExecution( * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with. */ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = { - // A list of attributes that will need to be updated. - val replacements = new ArrayBuffer[(Attribute, Attribute)] - // Translate from continuous relation to the underlying data source. - var nextSourceId = 0 - continuousSources = logicalPlan.collect { - case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) => - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - nextSourceId += 1 - - dataSource.createContinuousReadSupport( - metadataPath, - new DataSourceOptions(extraReaderOptions.asJava)) - } - uniqueSources = continuousSources.distinct - val offsets = getStartOffsets(sparkSessionForQuery) - var insertedSourceId = 0 - val withNewSources = logicalPlan transform { - case ContinuousExecutionRelation(source, options, output) => - val readSupport = continuousSources(insertedSourceId) - insertedSourceId += 1 - val newOutput = readSupport.fullSchema().toAttributes - val maxFields = SQLConf.get.maxToStringFields - assert(output.size == newOutput.size, - s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " + - s"${truncatedString(newOutput, ",", maxFields)}") - replacements ++= output.zip(newOutput) - + val withNewSources: LogicalPlan = logicalPlan transform { + case relation: StreamingDataSourceV2Relation => val loggedOffset = offsets.offsets(0) - val realOffset = loggedOffset.map(off => readSupport.deserializeOffset(off.json)) - val startOffset = realOffset.getOrElse(readSupport.initialOffset) - val scanConfigBuilder = readSupport.newScanConfigBuilder(startOffset) - OldStreamingDataSourceV2Relation(newOutput, source, options, readSupport, scanConfigBuilder) + val realOffset = loggedOffset.map(off => relation.stream.deserializeOffset(off.json)) + val startOffset = realOffset.getOrElse(relation.stream.initialOffset) + relation.copy(startOffset = Some(startOffset)) } - // Rewire the plan to use the new attributes that were returned by the source. - val replacementMap = AttributeMap(replacements) - val triggerLogicalPlan = withNewSources transformAllExpressions { - case a: Attribute if replacementMap.contains(a) => - replacementMap(a).withMetadata(a.metadata) + withNewSources.transformAllExpressions { case (_: CurrentTimestamp | _: CurrentDate) => throw new IllegalStateException( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") @@ -192,15 +176,15 @@ class ContinuousExecution( val writer = sink.createStreamingWriteSupport( s"$runId", - triggerLogicalPlan.schema, + withNewSources.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) - val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan) + val planWithSink = WriteToContinuousDataSource(writer, withNewSources) reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionForQuery, - withSink, + planWithSink, outputMode, checkpointFile("state"), id, @@ -210,10 +194,9 @@ class ContinuousExecution( lastExecution.executedPlan // Force the lazy generation of execution plan } - val (readSupport, scanConfig) = lastExecution.executedPlan.collect { - case scan: ContinuousScanExec - if scan.readSupport.isInstanceOf[ContinuousReadSupport] => - scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig + val stream = planWithSink.collect { + case relation: StreamingDataSourceV2Relation => + relation.stream.asInstanceOf[ContinuousStream] }.head sparkSessionForQuery.sparkContext.setLocalProperty( @@ -233,16 +216,14 @@ class ContinuousExecution( // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( - writer, readSupport, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) + writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) val epochUpdateThread = new Thread(new Runnable { override def run: Unit = { try { triggerExecutor.execute(() => { startTrigger() - val shouldReconfigure = readSupport.needsReconfiguration(scanConfig) && - state.compareAndSet(ACTIVE, RECONFIGURING) - if (shouldReconfigure) { + if (stream.needsReconfiguration && state.compareAndSet(ACTIVE, RECONFIGURING)) { if (queryExecutionThread.isAlive) { queryExecutionThread.interrupt() } @@ -289,7 +270,6 @@ class ContinuousExecution( epochUpdateThread.interrupt() epochUpdateThread.join() - stopSources() sparkSession.sparkContext.cancelJobGroup(runId.toString) } } @@ -299,11 +279,11 @@ class ContinuousExecution( */ def addOffset( epoch: Long, - readSupport: ContinuousReadSupport, + stream: ContinuousStream, partitionOffsets: Seq[PartitionOffset]): Unit = { - assert(continuousSources.length == 1, "only one continuous source supported currently") + assert(sources.length == 1, "only one continuous source supported currently") - val globalOffset = readSupport.mergeOffsets(partitionOffsets.toArray) + val globalOffset = stream.mergeOffsets(partitionOffsets.toArray) val oldOffset = synchronized { offsetLog.add(epoch, OffsetSeq.fill(globalOffset)) offsetLog.get(epoch - 1) @@ -329,7 +309,7 @@ class ContinuousExecution( def commit(epoch: Long): Unit = { updateStatusMessage(s"Committing epoch $epoch") - assert(continuousSources.length == 1, "only one continuous source supported currently") + assert(sources.length == 1, "only one continuous source supported currently") assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") synchronized { @@ -338,9 +318,9 @@ class ContinuousExecution( if (queryExecutionThread.isAlive) { commitLog.add(epoch, CommitMetadata()) val offset = - continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) - committedOffsets ++= Seq(continuousSources(0) -> offset) - continuousSources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset]) + sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) + committedOffsets ++= Seq(sources(0) -> offset) + sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset]) } else { return } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index a6cde2b8a710..48ff70f9c9d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -22,23 +22,22 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.streaming.{RateStreamOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder, ValueRunTimeMsPair} -import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider +import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair} import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.types.StructType case class RateStreamPartitionOffset( partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset -class RateStreamContinuousReadSupport(options: DataSourceOptions) extends ContinuousReadSupport { +class RateStreamContinuousStream( + rowsPerSecond: Long, + numPartitions: Int, + options: DataSourceOptions) extends ContinuousStream { implicit val defaultFormats: DefaultFormats = DefaultFormats val creationTime = System.currentTimeMillis() - val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt - val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { @@ -54,18 +53,10 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json)) } - override def fullSchema(): StructType = RateStreamProvider.SCHEMA - - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { - new SimpleStreamingScanConfigBuilder(fullSchema(), start) - } - override def initialOffset: Offset = createInitialOffset(numPartitions, creationTime) - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { - val startOffset = config.asInstanceOf[SimpleStreamingScanConfig].start - - val partitionStartMap = startOffset match { + override def planInputPartitions(start: Offset): Array[InputPartition] = { + val partitionStartMap = start match { case off: RateStreamOffset => off.partitionToValueAndRunTimeMs case off => throw new IllegalArgumentException( @@ -91,8 +82,7 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin }.toArray } - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = { RateStreamContinuousReaderFactory } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala index 28ab2448a663..e7bc71394061 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala @@ -31,37 +31,29 @@ import org.json4s.jackson.Serialization import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.streaming.{Offset => _, _} import org.apache.spark.sql.execution.streaming.sources.TextSocketReader import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.types.StructType import org.apache.spark.util.RpcUtils /** - * A ContinuousReadSupport that reads text lines through a TCP socket, designed only for tutorials - * and debugging. This ContinuousReadSupport will *not* work in production applications due to + * A [[ContinuousStream]] that reads text lines through a TCP socket, designed only for tutorials + * and debugging. This ContinuousStream will *not* work in production applications due to * multiple reasons, including no support for fault recovery. * * The driver maintains a socket connection to the host-port, keeps the received messages in * buckets and serves the messages to the executors via a RPC endpoint. */ -class TextSocketContinuousReadSupport(options: DataSourceOptions) - extends ContinuousReadSupport with Logging { +class TextSocketContinuousStream( + host: String, port: Int, numPartitions: Int, options: DataSourceOptions) + extends ContinuousStream with Logging { implicit val defaultFormats: DefaultFormats = DefaultFormats - private val host: String = options.get("host").get() - private val port: Int = options.get("port").get().toInt - - assert(SparkSession.getActiveSession.isDefined) - private val spark = SparkSession.getActiveSession.get - private val numPartitions = spark.sparkContext.defaultParallelism - @GuardedBy("this") private var socket: Socket = _ @@ -101,21 +93,9 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions) startOffset } - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { - new SimpleStreamingScanConfigBuilder(fullSchema(), start) - } - - override def fullSchema(): StructType = { - if (includeTimestamp) { - TextSocketReader.SCHEMA_TIMESTAMP - } else { - TextSocketReader.SCHEMA_REGULAR - } - } - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { - val startOffset = config.asInstanceOf[SimpleStreamingScanConfig] - .start.asInstanceOf[TextSocketOffset] + override def planInputPartitions(start: Offset): Array[InputPartition] = { + val startOffset = start.asInstanceOf[TextSocketOffset] recordEndpoint.setStartOffsets(startOffset.offsets) val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}" endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint) @@ -140,8 +120,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions) }.toArray } - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = { TextSocketReaderFactory } @@ -197,7 +176,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions) logWarning(s"Stream closed by $host:$port") return } - TextSocketContinuousReadSupport.this.synchronized { + TextSocketContinuousStream.this.synchronized { currentOffset += 1 val newData = (line, Timestamp.valueOf( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index 2238ce26e7b4..d1bda79f4b6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport import org.apache.spark.util.RpcUtils @@ -83,14 +83,14 @@ private[sql] object EpochCoordinatorRef extends Logging { */ def create( writeSupport: StreamingWriteSupport, - readSupport: ContinuousReadSupport, + stream: ContinuousStream, query: ContinuousExecution, epochCoordinatorId: String, startEpoch: Long, session: SparkSession, env: SparkEnv): RpcEndpointRef = synchronized { val coordinator = new EpochCoordinator( - writeSupport, readSupport, query, startEpoch, session, env.rpcEnv) + writeSupport, stream, query, startEpoch, session, env.rpcEnv) val ref = env.rpcEnv.setupEndpoint(endpointName(epochCoordinatorId), coordinator) logInfo("Registered EpochCoordinator endpoint") ref @@ -116,7 +116,7 @@ private[sql] object EpochCoordinatorRef extends Logging { */ private[continuous] class EpochCoordinator( writeSupport: StreamingWriteSupport, - readSupport: ContinuousReadSupport, + stream: ContinuousStream, query: ContinuousExecution, startEpoch: Long, session: SparkSession, @@ -220,7 +220,7 @@ private[continuous] class EpochCoordinator( partitionOffsets.collect { case ((e, _), o) if e == epoch => o } if (thisEpochOffsets.size == numReaderPartitions) { logDebug(s"Epoch $epoch has offsets reported from all partitions: $thisEpochOffsets") - query.addOffset(epoch, readSupport, thisEpochOffsets.toSeq) + query.addOffset(epoch, stream, thisEpochOffsets.toSeq) resolveCommitsAtEpoch(epoch) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 5406679630e2..e71f81caeb97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsMicroBatchRead, Table, TableProvider} +import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -68,7 +68,15 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas def fullSchema(): StructType = encoder.schema - protected def logicalPlan: LogicalPlan + protected val logicalPlan: LogicalPlan = { + StreamingRelationV2( + MemoryStreamTableProvider, + "memory", + new MemoryStreamTable(this), + Map.empty, + attributes, + None)(sqlContext.sparkSession) + } def addData(data: TraversableOnce[A]): Offset } @@ -81,7 +89,8 @@ object MemoryStreamTableProvider extends TableProvider { } } -class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsMicroBatchRead { +class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table + with SupportsMicroBatchRead with SupportsContinuousRead { override def name(): String = "MemoryStreamDataSource" @@ -101,7 +110,11 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w override def readSchema(): StructType = stream.fullSchema() override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { - stream.asInstanceOf[MemoryStream[_]] + stream.asInstanceOf[MicroBatchStream] + } + + override def toContinuousStream(checkpointLocation: String): ContinuousStream = { + stream.asInstanceOf[ContinuousStream] } } @@ -113,16 +126,6 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging { - protected val logicalPlan: LogicalPlan = { - StreamingRelationV2( - MemoryStreamTableProvider, - "memory", - new MemoryStreamTable(this), - Map.empty, - attributes, - None)(sqlContext.sparkSession) - } - protected val output = logicalPlan.output /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index 8c5c9eff55ef..41eaf84b7f9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -30,8 +30,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.{Encoder, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.streaming.{Offset => _, _} -import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions} -import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig, ScanConfigBuilder} +import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.streaming._ import org.apache.spark.util.RpcUtils @@ -44,15 +43,10 @@ import org.apache.spark.util.RpcUtils * the specified offset within the list, or null if that offset doesn't yet have a record. */ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2) - extends MemoryStreamBase[A](sqlContext) - with ContinuousReadSupportProvider with ContinuousReadSupport { + extends MemoryStreamBase[A](sqlContext) with ContinuousStream { private implicit val formats = Serialization.formats(NoTypeHints) - protected val logicalPlan = - // TODO: don't pass null as table after finish API refactor for continuous stream. - StreamingRelationV2(this, "memory", null, Map(), attributes, None)(sqlContext.sparkSession) - // ContinuousReader implementation @GuardedBy("this") @@ -87,13 +81,9 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa ) } - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { - new SimpleStreamingScanConfigBuilder(fullSchema(), start) - } - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { - val startOffset = config.asInstanceOf[SimpleStreamingScanConfig] - .start.asInstanceOf[ContinuousMemoryStreamOffset] + override def planInputPartitions(start: Offset): Array[InputPartition] = { + val startOffset = start.asInstanceOf[ContinuousMemoryStreamOffset] synchronized { val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id" endpointRef = @@ -105,8 +95,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa } } - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = { ContinuousMemoryStreamReaderFactory } @@ -115,12 +104,6 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa } override def commit(end: Offset): Unit = {} - - // ContinuousReadSupportProvider implementation - // This is necessary because of how StreamTest finds the source for AddDataMemory steps. - override def createContinuousReadSupport( - checkpointLocation: String, - options: DataSourceOptions): ContinuousReadSupport = this } object ContinuousMemoryStream { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala index 8d334f0afd0f..075c6b9362ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReadSupport +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousStream import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.types._ /** @@ -41,7 +41,7 @@ import org.apache.spark.sql.types._ * be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. */ class RateStreamProvider extends DataSourceV2 - with TableProvider with ContinuousReadSupportProvider with DataSourceRegister { + with TableProvider with DataSourceRegister { import RateStreamProvider._ override def getTable(options: DataSourceOptions): Table = { @@ -68,12 +68,6 @@ class RateStreamProvider extends DataSourceV2 new RateStreamTable(rowsPerSecond, rampUpTimeSeconds, numPartitions) } - override def createContinuousReadSupport( - checkpointLocation: String, - options: DataSourceOptions): ContinuousReadSupport = { - new RateStreamContinuousReadSupport(options) - } - override def shortName(): String = "rate" } @@ -81,7 +75,7 @@ class RateStreamTable( rowsPerSecond: Long, rampUpTimeSeconds: Long, numPartitions: Int) - extends Table with SupportsMicroBatchRead { + extends Table with SupportsMicroBatchRead with SupportsContinuousRead { override def name(): String = { s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " + @@ -98,6 +92,10 @@ class RateStreamTable( new RateStreamMicroBatchStream( rowsPerSecond, rampUpTimeSeconds, numPartitions, options, checkpointLocation) } + + override def toContinuousStream(checkpointLocation: String): ContinuousStream = { + new RateStreamContinuousStream(rowsPerSecond, numPartitions, options) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala index ddf398b7752e..540131c8de8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala @@ -26,7 +26,6 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ListBuffer import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming.LongOffset @@ -40,7 +39,8 @@ import org.apache.spark.unsafe.types.UTF8String * and debugging. This MicroBatchReadSupport will *not* work in production applications due to * multiple reasons, including no support for fault recovery. */ -class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOptions) +class TextSocketMicroBatchStream( + host: String, port: Int, numPartitions: Int, options: DataSourceOptions) extends MicroBatchStream with Logging { @GuardedBy("this") @@ -124,10 +124,6 @@ class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOpt batches.slice(sliceStart, sliceEnd) } - assert(SparkSession.getActiveSession.isDefined) - val spark = SparkSession.getActiveSession.get - val numPartitions = spark.sparkContext.defaultParallelism - val slices = Array.fill(numPartitions)(new ListBuffer[(UTF8String, Long)]) rawList.zipWithIndex.foreach { case (r, idx) => slices(idx % numPartitions).append(r) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index 35007785b41a..c3b24a8f65dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -24,16 +24,15 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport +import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} class TextSocketSourceProvider extends DataSourceV2 - with TableProvider with ContinuousReadSupportProvider - with DataSourceRegister with Logging { + with TableProvider with DataSourceRegister with Logging { private def checkParameters(params: DataSourceOptions): Unit = { logWarning("The socket source should not be used for production applications! " + @@ -58,22 +57,16 @@ class TextSocketSourceProvider extends DataSourceV2 new TextSocketTable( options.get("host").get, options.getInt("port", -1), + options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism), options.getBoolean("includeTimestamp", false)) } - override def createContinuousReadSupport( - checkpointLocation: String, - options: DataSourceOptions): ContinuousReadSupport = { - checkParameters(options) - new TextSocketContinuousReadSupport(options) - } - /** String that represents the format that this data source provider uses. */ override def shortName(): String = "socket" } -class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean) - extends Table with SupportsMicroBatchRead { +class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean) + extends Table with SupportsMicroBatchRead with SupportsContinuousRead { override def name(): String = s"Socket[$host:$port]" @@ -90,7 +83,11 @@ class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean) override def readSchema(): StructType = schema() override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { - new TextSocketMicroBatchStream(host, port, options) + new TextSocketMicroBatchStream(host, port, numPartitions, options) + } + + override def toContinuousStream(checkpointLocation: String): ContinuousStream = { + new TextSocketContinuousStream(host, port, numPartitions, options) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 417dd5584b30..866681838af8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -30,9 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils /** * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems, @@ -183,39 +181,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo case _ => provider.getTable(dsOptions) } table match { - case s: SupportsMicroBatchRead => + case _: SupportsMicroBatchRead | _: SupportsContinuousRead => Dataset.ofRows( sparkSession, StreamingRelationV2( - provider, source, s, options, - table.schema.toAttributes, v1Relation)(sparkSession)) - - case _ if ds.isInstanceOf[ContinuousReadSupportProvider] => - val provider = ds.asInstanceOf[ContinuousReadSupportProvider] - var tempReadSupport: ContinuousReadSupport = null - val schema = try { - val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath - tempReadSupport = if (userSpecifiedSchema.isDefined) { - provider.createContinuousReadSupport( - userSpecifiedSchema.get, tmpCheckpointPath, dsOptions) - } else { - provider.createContinuousReadSupport(tmpCheckpointPath, dsOptions) - } - tempReadSupport.fullSchema() - } finally { - // Stop tempReader to avoid side-effect thing - if (tempReadSupport != null) { - tempReadSupport.stop() - tempReadSupport = null - } - } - Dataset.ofRows( - sparkSession, - // TODO: do not pass null as table after finish the API refactor for continuous - // stream. - StreamingRelationV2( - provider, source, table = null, options, - schema.toAttributes, v1Relation)(sparkSession)) + provider, source, table, options, table.schema.toAttributes, v1Relation)( + sparkSession)) // fallback to v1 case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index d40a1fdec0bb..d0418f893143 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions} +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.streaming.Offset import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.util.ManualClock @@ -308,30 +308,17 @@ class RateStreamProviderSuite extends StreamTest { "rate source does not support user-specified schema")) } - test("continuous in registry") { - DataSource.lookupDataSource("rate", spark.sqlContext.conf). - getConstructor().newInstance() match { - case ds: ContinuousReadSupportProvider => - val readSupport = ds.createContinuousReadSupport( - "", DataSourceOptions.empty()) - assert(readSupport.isInstanceOf[RateStreamContinuousReadSupport]) - case _ => - throw new IllegalStateException("Could not find read support for continuous rate") - } - } - test("continuous data") { - val readSupport = new RateStreamContinuousReadSupport( - new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava)) - val config = readSupport.newScanConfigBuilder(readSupport.initialOffset).build() - val tasks = readSupport.planInputPartitions(config) - val readerFactory = readSupport.createContinuousReaderFactory(config) - assert(tasks.size == 2) + val stream = new RateStreamContinuousStream( + rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty()) + val partitions = stream.planInputPartitions(stream.initialOffset) + val readerFactory = stream.createContinuousReaderFactory() + assert(partitions.size == 2) val data = scala.collection.mutable.ListBuffer[InternalRow]() - tasks.foreach { + partitions.foreach { case t: RateStreamContinuousInputPartition => - val startTimeMs = readSupport.initialOffset() + val startTimeMs = stream.initialOffset() .asInstanceOf[RateStreamOffset] .partitionToValueAndRunTimeMs(t.partitionIndex) .runTimeMs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index cf069d571081..33c65d784fba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ @@ -294,25 +295,25 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before serverThread = new ServerThread() serverThread.start() - val readSupport = new TextSocketContinuousReadSupport( - new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", - "port" -> serverThread.port.toString).asJava)) - - val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build() - val tasks = readSupport.planInputPartitions(scanConfig) - assert(tasks.size == 2) + val stream = new TextSocketContinuousStream( + host = "localhost", + port = serverThread.port, + numPartitions = 2, + options = DataSourceOptions.empty()) + val partitions = stream.planInputPartitions(stream.initialOffset()) + assert(partitions.length == 2) val numRecords = 10 val data = scala.collection.mutable.ListBuffer[Int]() val offsets = scala.collection.mutable.ListBuffer[Int]() - val readerFactory = readSupport.createContinuousReaderFactory(scanConfig) + val readerFactory = stream.createContinuousReaderFactory() import org.scalatest.time.SpanSugar._ failAfter(5 seconds) { // inject rows, read and check the data and offsets for (i <- 0 until numRecords) { serverThread.enqueue(i.toString) } - tasks.foreach { + partitions.foreach { case t: TextSocketContinuousInputPartition => val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader] for (i <- 0 until numRecords / 2) { @@ -330,15 +331,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before data.clear() case _ => throw new IllegalStateException("Unexpected task type") } - assert(readSupport.startOffset.offsets == List(3, 3)) - readSupport.commit(TextSocketOffset(List(5, 5))) - assert(readSupport.startOffset.offsets == List(5, 5)) + assert(stream.startOffset.offsets == List(3, 3)) + stream.commit(TextSocketOffset(List(5, 5))) + assert(stream.startOffset.offsets == List(5, 5)) } def commitOffset(partition: Int, offset: Int): Unit = { - val offsetsToCommit = readSupport.startOffset.offsets.updated(partition, offset) - readSupport.commit(TextSocketOffset(offsetsToCommit)) - assert(readSupport.startOffset.offsets == offsetsToCommit) + val offsetsToCommit = stream.startOffset.offsets.updated(partition, offset) + stream.commit(TextSocketOffset(offsetsToCommit)) + assert(stream.startOffset.offsets == offsetsToCommit) } } @@ -346,13 +347,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before serverThread = new ServerThread() serverThread.start() - val readSupport = new TextSocketContinuousReadSupport( - new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", - "port" -> serverThread.port.toString).asJava)) + val stream = new TextSocketContinuousStream( + host = "localhost", + port = serverThread.port, + numPartitions = 2, + options = DataSourceOptions.empty()) - readSupport.startOffset = TextSocketOffset(List(5, 5)) + stream.startOffset = TextSocketOffset(List(5, 5)) assertThrows[IllegalStateException] { - readSupport.commit(TextSocketOffset(List(6, 6))) + stream.commit(TextSocketOffset(List(6, 6))) } } @@ -360,21 +363,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before serverThread = new ServerThread() serverThread.start() - val readSupport = new TextSocketContinuousReadSupport( - new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", - "includeTimestamp" -> "true", - "port" -> serverThread.port.toString).asJava)) - val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build() - val tasks = readSupport.planInputPartitions(scanConfig) - assert(tasks.size == 2) + val stream = new TextSocketContinuousStream( + host = "localhost", + port = serverThread.port, + numPartitions = 2, + options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava)) + val partitions = stream.planInputPartitions(stream.initialOffset()) + assert(partitions.size == 2) val numRecords = 4 // inject rows, read and check the data and offsets for (i <- 0 until numRecords) { serverThread.enqueue(i.toString) } - val readerFactory = readSupport.createContinuousReaderFactory(scanConfig) - tasks.foreach { + val readerFactory = stream.createContinuousReaderFactory() + partitions.foreach { case t: TextSocketContinuousInputPartition => val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader] for (_ <- 0 until numRecords / 2) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index c60ea4a2f9f5..511fdfe5c23a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -24,7 +24,7 @@ import test.org.apache.spark.sql.sources.v2._ import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation} import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.functions._ @@ -40,14 +40,14 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { private def getBatch(query: DataFrame): AdvancedBatch = { query.queryExecution.executedPlan.collect { - case d: DataSourceV2ScanExec => + case d: BatchScanExec => d.batch.asInstanceOf[AdvancedBatch] }.head } private def getJavaBatch(query: DataFrame): JavaAdvancedDataSourceV2.AdvancedBatch = { query.queryExecution.executedPlan.collect { - case d: DataSourceV2ScanExec => + case d: BatchScanExec => d.batch.asInstanceOf[JavaAdvancedDataSourceV2.AdvancedBatch] }.head } @@ -309,7 +309,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { assert(logical.canonicalized.output.length == logicalNumOutput) val physical = df.queryExecution.executedPlan.collect { - case d: DataSourceV2ScanExec => d + case d: BatchScanExec => d }.head assert(physical.canonicalized.output.length == physicalNumOutput) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 72321c418f9b..b39abc5a8004 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -494,7 +494,7 @@ class StreamSuite extends StreamTest { // `extended = false` only displays the physical plan. assert("StreamingDataSourceV2Relation".r .findAllMatchIn(explainWithoutExtended).size === 0) - assert("ScanV2".r + assert("BatchScan".r .findAllMatchIn(explainWithoutExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithoutExtended.contains("StateStoreRestore")) @@ -504,7 +504,7 @@ class StreamSuite extends StreamTest { // plan. assert("StreamingDataSourceV2Relation".r .findAllMatchIn(explainWithExtended).size === 3) - assert("ScanV2".r + assert("BatchScan".r .findAllMatchIn(explainWithExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithExtended.contains("StateStoreRestore")) @@ -547,17 +547,17 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("Streaming RelationV2 ContinuousMemoryStream".r + assert("StreamingDataSourceV2Relation".r .findAllMatchIn(explainWithoutExtended).size === 0) - assert("ScanV2 ContinuousMemoryStream".r + assert("ContinuousScan".r .findAllMatchIn(explainWithoutExtended).size === 1) val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("Streaming RelationV2 ContinuousMemoryStream".r + assert("StreamingDataSourceV2Relation".r .findAllMatchIn(explainWithExtended).size === 3) - assert("ScanV2 ContinuousMemoryStream".r + assert("ContinuousScan".r .findAllMatchIn(explainWithExtended).size === 1) } finally { q.stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index b4bd6f6b2edc..da496837e7a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -39,12 +39,11 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.AllTuples import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch} import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.execution.streaming.state.StateStore -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -692,16 +691,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be case r: StreamingExecutionRelation => r.source // v2 source case r: StreamingDataSourceV2Relation => r.stream - case r: OldStreamingDataSourceV2Relation => r.readSupport // We can add data to memory stream before starting it. Then the input plan has // not been processed by the streaming engine and contains `StreamingRelationV2`. case r: StreamingRelationV2 if r.sourceName == "memory" => - // TODO: remove this null hack after finish API refactor for continuous stream. - if (r.table == null) { - r.dataSource.asInstanceOf[ContinuousReadSupport] - } else { - r.table.asInstanceOf[MemoryStreamTable].stream - } + r.table.asInstanceOf[MemoryStreamTable].stream } .zipWithIndex .find(_._1 == source) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 62fde98e40dc..dc22e31678fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig} +import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType @@ -911,7 +911,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi testStream(df, useV2Sink = true)( StartStream(trigger = Trigger.Continuous(100)), - AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation")) + AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingDataSourceV2Relation")) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala index d6819eacd07c..d3d210c02e90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousReadSupport, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types.{DataType, IntegerType, StructType} @@ -44,7 +44,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar { super.beforeEach() epochEndpoint = EpochCoordinatorRef.create( mock[StreamingWriteSupport], - mock[ContinuousReadSupport], + mock[ContinuousStream], mock[ContinuousExecution], coordinatorId, startEpoch, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index f85cae9fa433..344a8aa55f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest { case s: ContinuousExecution => assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") val reader = s.lastExecution.executedPlan.collectFirst { - case ContinuousScanExec(_, _, _, _, r: RateStreamContinuousReadSupport, _) => r + case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r }.get val deltaMs = numTriggers * 1000 + 300 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index e644c16ddfea..a0b56ec17f0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport import org.apache.spark.sql.test.TestSparkSession @@ -45,7 +45,7 @@ class EpochCoordinatorSuite private var orderVerifier: InOrder = _ override def beforeEach(): Unit = { - val reader = mock[ContinuousReadSupport] + val stream = mock[ContinuousStream] writeSupport = mock[StreamingWriteSupport] query = mock[ContinuousExecution] orderVerifier = inOrder(writeSupport, query) @@ -53,7 +53,7 @@ class EpochCoordinatorSuite spark = new TestSparkSession() epochCoordinator - = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, spark, SparkEnv.get) + = EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get) } test("single epoch") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index d98cc41de9b0..62f166602941 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -31,33 +31,23 @@ import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, T import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -class FakeDataStream extends MicroBatchStream { +class FakeDataStream extends MicroBatchStream with ContinuousStream { override def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) override def commit(end: Offset): Unit = {} override def stop(): Unit = {} override def initialOffset(): Offset = RateStreamOffset(Map()) override def latestOffset(): Offset = RateStreamOffset(Map()) + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { throw new IllegalStateException("fake source - cannot actually read") } - override def createReaderFactory(): PartitionReaderFactory = { + override def planInputPartitions(start: Offset): Array[InputPartition] = { throw new IllegalStateException("fake source - cannot actually read") } -} - -case class FakeReadSupport() extends ContinuousReadSupport { - override def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) - override def commit(end: Offset): Unit = {} - override def stop(): Unit = {} - override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) - override def fullSchema(): StructType = StructType(Seq()) - override def initialOffset(): Offset = RateStreamOffset(Map()) - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { + override def createReaderFactory(): PartitionReaderFactory = { throw new IllegalStateException("fake source - cannot actually read") } - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = { throw new IllegalStateException("fake source - cannot actually read") } } @@ -66,21 +56,19 @@ class FakeScanBuilder extends ScanBuilder with Scan { override def build(): Scan = this override def readSchema(): StructType = StructType(Seq()) override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = new FakeDataStream + override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream } -class FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead { +trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder } -trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider { - override def createContinuousReadSupport( - checkpointLocation: String, - options: DataSourceOptions): ContinuousReadSupport = { - LastReadOptions.options = options - FakeReadSupport() - } +trait FakeContinuousReadTable extends Table with SupportsContinuousRead { + override def name(): String = "fake" + override def schema(): StructType = StructType(Seq()) + override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder } trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider { @@ -111,27 +99,34 @@ class FakeReadMicroBatchOnly class FakeReadContinuousOnly extends DataSourceRegister with TableProvider - with FakeContinuousReadSupportProvider with SessionConfigSupport { override def shortName(): String = "fake-read-continuous-only" override def keyPrefix: String = shortName() - override def getTable(options: DataSourceOptions): Table = new Table { - override def schema(): StructType = StructType(Seq()) - override def name(): String = "fake" + override def getTable(options: DataSourceOptions): Table = { + LastReadOptions.options = options + new FakeContinuousReadTable {} } } -class FakeReadBothModes extends DataSourceRegister - with TableProvider with FakeContinuousReadSupportProvider { +class FakeReadBothModes extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-read-microbatch-continuous" - override def getTable(options: DataSourceOptions): Table = new FakeMicroBatchReadTable {} + override def getTable(options: DataSourceOptions): Table = { + new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {} + } } -class FakeReadNeitherMode extends DataSourceRegister { +class FakeReadNeitherMode extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-read-neither-mode" + + override def getTable(options: DataSourceOptions): Table = { + new Table { + override def name(): String = "fake" + override def schema(): StructType = StructType(Nil) + } + } } class FakeWriteSupportProvider @@ -324,33 +319,25 @@ class StreamingDataSourceV2Suite extends StreamTest { for ((read, write, trigger) <- cases) { testQuietly(s"stream with read format $read, write format $write, trigger $trigger") { - val readSource = DataSource.lookupDataSource(read, spark.sqlContext.conf). - getConstructor().newInstance() + val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor() + .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty()) val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf). getConstructor().newInstance() - def isMicroBatch(ds: Any): Boolean = ds match { - case provider: TableProvider => - val table = provider.getTable(DataSourceOptions.empty()) - table.isInstanceOf[SupportsMicroBatchRead] - case _ => false - } - - (readSource, writeSource, trigger) match { + (table, writeSource, trigger) match { // Valid microbatch queries. - case (_: TableProvider, _: StreamingWriteSupportProvider, t) - if isMicroBatch(readSource) && !t.isInstanceOf[ContinuousTrigger] => + case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t) + if !t.isInstanceOf[ContinuousTrigger] => testPositiveCase(read, write, trigger) // Valid continuous queries. - case (_: ContinuousReadSupportProvider, _: StreamingWriteSupportProvider, + case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider, _: ContinuousTrigger) => testPositiveCase(read, write, trigger) // Invalid - can't read at all - case (r, _, _) - if !r.isInstanceOf[TableProvider] - && !r.isInstanceOf[ContinuousReadSupportProvider] => + case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] && + !r.isInstanceOf[SupportsContinuousRead] => testNegativeCase(read, write, trigger, s"Data source $read does not support streamed reading") @@ -361,14 +348,13 @@ class StreamingDataSourceV2Suite extends StreamTest { // Invalid - trigger is continuous but reader is not case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger) - if !r.isInstanceOf[ContinuousReadSupportProvider] => + if !r.isInstanceOf[SupportsContinuousRead] => testNegativeCase(read, write, trigger, s"Data source $read does not support continuous processing") // Invalid - trigger is microbatch but reader is not - case (r, _, t) - if !isMicroBatch(r) && - !t.isInstanceOf[ContinuousTrigger] => + case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] && + !t.isInstanceOf[ContinuousTrigger] => testPostCreationNegativeCase(read, write, trigger, s"Data source $read does not support microbatch processing") }