diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index f7a20326f6e4a..bb76a30b38810 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.{Collections, Locale, UUID} +import java.util.{Locale, UUID} import scala.collection.JavaConverters._ @@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.internal.Logging import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.sources.v2.writer.WriteBuilder @@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } class KafkaTable(strategy: => ConsumerStrategy) extends Table - with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite { + with SupportsRead with SupportsWrite with BaseStreamingSink { override def name(): String = s"Kafka $strategy" override def schema(): StructType = KafkaOffsetReader.kafkaSchema - override def capabilities(): ju.Set[TableCapability] = Collections.emptySet() + override def capabilities(): ju.Set[TableCapability] = { + Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava + } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new KafkaScan(options) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java deleted file mode 100644 index 5cc9848d9da89..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.sources.v2.reader.Scan; -import org.apache.spark.sql.sources.v2.reader.ScanBuilder; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -/** - * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with - * continuous mode. - *
- * If a {@link Table} implements this interface, the - * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder} - * that builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented. - *
- */ -@Evolving -public interface SupportsContinuousRead extends SupportsRead { } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java deleted file mode 100644 index c98f3f1aa5cba..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.sources.v2.reader.Scan; -import org.apache.spark.sql.sources.v2.reader.ScanBuilder; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -/** - * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with - * micro-batch mode. - *- * If a {@link Table} implements this interface, the - * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder} - * that builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented. - *
- */ -@Evolving -public interface SupportsMicroBatchRead extends SupportsRead { } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java deleted file mode 100644 index ac11e483c18c4..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.BaseStreamingSink; -import org.apache.spark.sql.sources.v2.writer.WriteBuilder; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -/** - * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write. - *- * If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a - * {@link WriteBuilder} with {@link WriteBuilder#buildForStreaming()} implemented. - *
- */ -@Evolving -public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java index 8d3fdcd694e2c..4640c61faeb7c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java @@ -33,6 +33,16 @@ public enum TableCapability { */ BATCH_READ, + /** + * Signals that the table supports reads in micro-batch streaming execution mode. + */ + MICRO_BATCH_READ, + + /** + * Signals that the table supports reads in continuous streaming execution mode. + */ + CONTINUOUS_READ, + /** * Signals that the table supports append writes in batch execution mode. *@@ -42,6 +52,15 @@ public enum TableCapability { */ BATCH_WRITE, + /** + * Signals that the table supports append writes in streaming execution mode. + *
+ * Tables that return this capability must support appending data and may also support additional + * write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and + * {@link #OVERWRITE_DYNAMIC}. + */ + STREAMING_WRITE, + /** * Signals that the table can be truncated in a write operation. *
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java index f6085b933c656..c3964e2176d4f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java @@ -21,8 +21,6 @@ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream; import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.sources.v2.SupportsContinuousRead; -import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableCapability; @@ -74,8 +72,8 @@ default Batch toBatch() { /** * Returns the physical representation of this scan for streaming query with micro-batch mode. By * default this method throws exception, data sources must overwrite this method to provide an - * implementation, if the {@link Table} that creates this scan implements - * {@link SupportsMicroBatchRead}. + * implementation, if the {@link Table} that creates this scan returns + * {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}. * * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure * recovery. Data streams for the same logical source in the same query @@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { /** * Returns the physical representation of this scan for streaming query with continuous mode. By * default this method throws exception, data sources must overwrite this method to provide an - * implementation, if the {@link Table} that creates this scan implements - * {@link SupportsContinuousRead}. + * implementation, if the {@link Table} that creates this scan returns + * {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}. * * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure * recovery. Data streams for the same logical source in the same query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 96a78d3a0da20..c8b2f65ca62e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.streaming.BaseStreamingSink import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer._ @@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister { override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable } -private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite { +private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink { override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder override def name(): String = "noop-table" override def schema(): StructType = new StructType() - override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava + } } private[noop] object NoopWriteBuilder extends WriteBuilder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala new file mode 100644 index 0000000000000..c029acc0bb2df --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} +import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ} + +/** + * This rules adds some basic table capability check for streaming scan, without knowing the actual + * streaming execution mode. + */ +object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) { + import DataSourceV2Implicits._ + + override def apply(plan: LogicalPlan): Unit = { + plan.foreach { + case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => + throw new AnalysisException( + s"Table ${r.table.name()} does not support either micro-batch or continuous scan.") + case _ => + } + + val streamingSources = plan.collect { + case r: StreamingRelationV2 => r.table + } + val v1StreamingRelations = plan.collect { + case r: StreamingRelation => r + } + + if (streamingSources.length + v1StreamingRelations.length > 1) { + val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ)) + // v1 streaming data source only supports micro-batch. + val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) && + v1StreamingRelations.isEmpty + if (!allSupportsMicroBatch && !allSupportsContinuous) { + val microBatchSources = + streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++ + v1StreamingRelations.map(_.sourceName) + val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name()) + throw new AnalysisException( + "The streaming sources in a query do not have a common supported execution mode.\n" + + "Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" + + "Sources support continuous: " + continuousSources.mkString(", ")) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 59a4afbfe0939..c591a9625abc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming -import scala.collection.JavaConverters._ import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.sql.{Dataset, SparkSession} @@ -78,6 +77,7 @@ class MicroBatchExecution( val disabledSources = sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",") + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { @@ -88,31 +88,33 @@ class MicroBatchExecution( logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]") StreamingExecutionRelation(source, output)(sparkSession) }) - case s @ StreamingRelationV2(ds, dsName, table: SupportsMicroBatchRead, options, output, _) - if !disabledSources.contains(ds.getClass.getCanonicalName) => - v2ToRelationMap.getOrElseUpdate(s, { - // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - nextSourceId += 1 - logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") - // TODO: operator pushdown. - val scan = table.newScanBuilder(options).build() - val stream = scan.toMicroBatchStream(metadataPath) - StreamingDataSourceV2Relation(output, scan, stream) - }) - case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) => - v2ToExecutionRelationMap.getOrElseUpdate(s, { - // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - if (v1Relation.isEmpty) { - throw new UnsupportedOperationException( - s"Data source $dsName does not support microbatch processing.") - } - val source = v1Relation.get.dataSource.createSource(metadataPath) - nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV2 named '$dsName' [$ds]") - StreamingExecutionRelation(source, output)(sparkSession) - }) + + case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) => + val v2Disabled = disabledSources.contains(src.getClass.getCanonicalName) + if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { + v2ToRelationMap.getOrElseUpdate(s, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + nextSourceId += 1 + logInfo(s"Reading table [$table] from DataSourceV2 named '$srcName' [$src]") + // TODO: operator pushdown. + val scan = table.newScanBuilder(options).build() + val stream = scan.toMicroBatchStream(metadataPath) + StreamingDataSourceV2Relation(output, scan, stream) + }) + } else if (v1.isEmpty) { + throw new UnsupportedOperationException( + s"Data source $srcName does not support microbatch processing.") + } else { + v2ToExecutionRelationMap.getOrElseUpdate(s, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val source = v1.get.dataSource.createSource(metadataPath) + nextSourceId += 1 + logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]") + StreamingExecutionRelation(source, output)(sparkSession) + }) + } } sources = _logicalPlan.collect { // v1 source @@ -122,8 +124,9 @@ class MicroBatchExecution( } uniqueSources = sources.distinct + // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. sink match { - case s: SupportsStreamingWrite => + case s: SupportsWrite => val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan) WriteToMicroBatchDataSource(streamingWrite, _logicalPlan) @@ -519,7 +522,7 @@ class MicroBatchExecution( val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan - case _: SupportsStreamingWrite => + case _: SupportsWrite => newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } @@ -550,7 +553,7 @@ class MicroBatchExecution( SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { sink match { case s: Sink => s.addBatch(currentBatchId, nextBatch) - case _: SupportsStreamingWrite => + case _: SupportsWrite => // This doesn't accumulate any data - it just forces execution of the microbatch writer. nextBatch.collect() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index cc441937ce70c..fd959619650e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.SupportsStreamingWrite +import org.apache.spark.sql.sources.v2.SupportsWrite import org.apache.spark.sql.sources.v2.writer.SupportsTruncate import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming._ @@ -582,7 +582,7 @@ abstract class StreamExecution( } protected def createStreamingWrite( - table: SupportsStreamingWrite, + table: SupportsWrite, options: Map[String, String], inputPlan: LogicalPlan): StreamingWrite = { val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 884b92ae9421c..c7161d311c028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.streaming import java.util -import java.util.Collections + +import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite @@ -60,13 +61,15 @@ class ConsoleSinkProvider extends TableProvider def shortName(): String = "console" } -object ConsoleTable extends Table with SupportsStreamingWrite { +object ConsoleTable extends Table with SupportsWrite with BaseStreamingSink { override def name(): String = "console" override def schema(): StructType = StructType(Nil) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.STREAMING_WRITE).asJava + } override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder with SupportsTruncate { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index c8fb53df52598..ef0c942e959ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 -import org.apache.spark.sql.sources.v2.{SupportsContinuousRead, SupportsStreamingWrite} +import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -42,14 +42,14 @@ class ContinuousExecution( name: String, checkpointRoot: String, analyzedPlan: LogicalPlan, - sink: SupportsStreamingWrite, + sink: SupportsWrite, trigger: Trigger, triggerClock: Clock, outputMode: OutputMode, extraOptions: Map[String, String], deleteCheckpointOnStop: Boolean) extends StreamExecution( - sparkSession, name, checkpointRoot, analyzedPlan, sink, + sparkSession, name, checkpointRoot, analyzedPlan, sink.asInstanceOf[BaseStreamingSink], trigger, triggerClock, outputMode, deleteCheckpointOnStop) { @volatile protected var sources: Seq[ContinuousStream] = Seq() @@ -63,22 +63,23 @@ class ContinuousExecution( override val logicalPlan: WriteToContinuousDataSource = { val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() var nextSourceId = 0 + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { - case s @ StreamingRelationV2( - ds, dsName, table: SupportsContinuousRead, options, output, _) => + case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _) => + if (!table.supports(TableCapability.CONTINUOUS_READ)) { + throw new UnsupportedOperationException( + s"Data source $sourceName does not support continuous processing.") + } + v2ToRelationMap.getOrElseUpdate(s, { val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 - logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") + logInfo(s"Reading table [$table] from DataSourceV2 named '$sourceName' [$ds]") // TODO: operator pushdown. val scan = table.newScanBuilder(options).build() val stream = scan.toContinuousStream(metadataPath) StreamingDataSourceV2Relation(output, scan, stream) }) - - case StreamingRelationV2(_, sourceName, _, _, _, _) => - throw new UnsupportedOperationException( - s"Data source $sourceName does not support continuous processing.") } sources = _logicalPlan.collect { @@ -86,6 +87,7 @@ class ContinuousExecution( } uniqueSources = sources.distinct + // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. WriteToContinuousDataSource( createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index bfa9c09985503..0dcbdd3a1fd21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.execution.streaming import java.util -import java.util.Collections import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.control.NonFatal @@ -92,14 +92,15 @@ object MemoryStreamTableProvider extends TableProvider { } } -class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table - with SupportsMicroBatchRead with SupportsContinuousRead { +class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsRead { override def name(): String = "MemoryStreamDataSource" override def schema(): StructType = stream.fullSchema() - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MemoryStreamScanBuilder(stream) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 807e0b12c6278..838ede6c563f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -18,14 +18,16 @@ package org.apache.spark.sql.execution.streaming.sources import java.util -import java.util.Collections + +import scala.collection.JavaConverters._ import org.apache.spark.sql.{ForeachWriter, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter -import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table, TableCapability} +import org.apache.spark.sql.execution.streaming.BaseStreamingSink +import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType @@ -42,13 +44,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class ForeachWriterTable[T]( writer: ForeachWriter[T], converter: Either[ExpressionEncoder[T], InternalRow => T]) - extends Table with SupportsStreamingWrite { + extends Table with SupportsWrite with BaseStreamingSink { override def name(): String = "ForeachSink" override def schema(): StructType = StructType(Nil) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.STREAMING_WRITE).asJava + } override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder with SupportsTruncate { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala index a652eeb8d5f8c..f61e9dbecd4ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.streaming.sources import java.util -import java.util.Collections + +import scala.collection.JavaConverters._ import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.SparkSession @@ -78,7 +79,7 @@ class RateStreamTable( rowsPerSecond: Long, rampUpTimeSeconds: Long, numPartitions: Int) - extends Table with SupportsMicroBatchRead with SupportsContinuousRead { + extends Table with SupportsRead { override def name(): String = { s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " + @@ -87,7 +88,9 @@ class RateStreamTable( override def schema(): StructType = RateStreamProvider.SCHEMA - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan { override def readSchema(): StructType = RateStreamProvider.SCHEMA diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index a0452cf844d35..0f807e235661a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.streaming.sources import java.text.SimpleDateFormat import java.util -import java.util.{Collections, Locale} +import java.util.Locale +import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging @@ -67,7 +68,7 @@ class TextSocketSourceProvider extends TableProvider with DataSourceRegister wit } class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean) - extends Table with SupportsMicroBatchRead with SupportsContinuousRead { + extends Table with SupportsRead { override def name(): String = s"Socket[$host:$port]" @@ -79,7 +80,9 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest } } - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan { override def readSchema(): StructType = schema() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 8eb5de0f640a4..219e25c1407b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.streaming.sources import java.util -import java.util.Collections import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} -import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableCapability} +import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType @@ -43,13 +43,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. */ -class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Logging { +class MemorySinkV2 extends Table with SupportsWrite with MemorySinkBase with Logging { override def name(): String = "MemorySinkV2" override def schema(): StructType = StructType(Nil) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.STREAMING_WRITE).asJava + } override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder with SupportsTruncate { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 8f2f8e80e126b..732c4cb196fac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck +import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck} import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.util.ExecutionListenerManager @@ -177,6 +177,7 @@ abstract class BaseSessionStateBuilder( PreReadCheck +: HiveOnlyCheck +: V2WriteSupportCheck +: + V2StreamingScanSupportCheck +: customCheckRules } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 01f29cdeddc2d..01083a994e8a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -181,8 +182,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo case Some(schema) => provider.getTable(dsOptions, schema) case _ => provider.getTable(dsOptions) } + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ table match { - case _: SupportsMicroBatchRead | _: SupportsContinuousRead => + case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => Dataset.ofRows( sparkSession, StreamingRelationV2( @@ -190,6 +192,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo sparkSession)) // fallback to v1 + // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 33d032eb78c2b..d2df3a5349dd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.execution.streaming.sources._ -import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableProvider} +import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider} +import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -315,8 +316,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { source = provider, conf = df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions val dsOptions = new CaseInsensitiveStringMap(options.asJava) + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ provider.getTable(dsOptions) match { - case s: SupportsStreamingWrite => s + case table: SupportsWrite if table.supports(STREAMING_WRITE) => + table.asInstanceOf[BaseStreamingSink] case _ => createV1Sink() } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 2e019ba614fc2..5a08049ab55c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS -import org.apache.spark.sql.sources.v2.SupportsStreamingWrite +import org.apache.spark.sql.sources.v2.SupportsWrite import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -261,7 +261,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } (sink, trigger) match { - case (v2Sink: SupportsStreamingWrite, trigger: ContinuousTrigger) => + case (table: SupportsWrite, trigger: ContinuousTrigger) => if (operationCheckEnabled) { UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) } @@ -270,7 +270,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo userSpecifiedName.orNull, checkpointLocation, analyzedPlan, - v2Sink, + table, trigger, triggerClock, outputMode, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala new file mode 100644 index 0000000000000..8a0450fce76a1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} +import org.apache.spark.sql.catalyst.plans.logical.Union +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelation, StreamingRelationV2} +import org.apache.spark.sql.sources.StreamSourceProvider +import org.apache.spark.sql.sources.v2.{Table, TableCapability, TableProvider} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class V2StreamingScanSupportCheckSuite extends SparkFunSuite with SharedSparkSession { + import TableCapability._ + + private def createStreamingRelation(table: Table, v1Relation: Option[StreamingRelation]) = { + StreamingRelationV2(FakeTableProvider, "fake", table, CaseInsensitiveStringMap.empty(), + FakeTableProvider.schema.toAttributes, v1Relation)(spark) + } + + private def createStreamingRelationV1() = { + StreamingRelation(DataSource(spark, classOf[FakeStreamSourceProvider].getName)) + } + + test("check correct plan") { + val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None) + val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None) + val plan3 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ, CONTINUOUS_READ), None) + val plan4 = createStreamingRelationV1() + + V2StreamingScanSupportCheck(Union(plan1, plan1)) + V2StreamingScanSupportCheck(Union(plan2, plan2)) + V2StreamingScanSupportCheck(Union(plan1, plan3)) + V2StreamingScanSupportCheck(Union(plan2, plan3)) + V2StreamingScanSupportCheck(Union(plan1, plan4)) + V2StreamingScanSupportCheck(Union(plan3, plan4)) + } + + test("table without scan capability") { + val e = intercept[AnalysisException] { + V2StreamingScanSupportCheck(createStreamingRelation(CapabilityTable(), None)) + } + assert(e.message.contains("does not support either micro-batch or continuous scan")) + } + + test("mix micro-batch only and continuous only") { + val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None) + val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None) + + val e = intercept[AnalysisException] { + V2StreamingScanSupportCheck(Union(plan1, plan2)) + } + assert(e.message.contains( + "The streaming sources in a query do not have a common supported execution mode")) + } + + test("mix continuous only and v1 relation") { + val plan1 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None) + val plan2 = createStreamingRelationV1() + val e = intercept[AnalysisException] { + V2StreamingScanSupportCheck(Union(plan1, plan2)) + } + assert(e.message.contains( + "The streaming sources in a query do not have a common supported execution mode")) + } +} + +private object FakeTableProvider extends TableProvider { + val schema = new StructType().add("i", "int") + + override def getTable(options: CaseInsensitiveStringMap): Table = { + throw new UnsupportedOperationException + } +} + +private case class CapabilityTable(_capabilities: TableCapability*) extends Table { + override def name(): String = "capability_test_table" + override def schema(): StructType = FakeTableProvider.schema + override def capabilities(): util.Set[TableCapability] = _capabilities.toSet.asJava +} + +private class FakeStreamSourceProvider extends StreamSourceProvider { + override def sourceSchema( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + "fake" -> FakeTableProvider.schema + } + + override def createSource( + sqlContext: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + new Source { + override def schema: StructType = FakeTableProvider.schema + override def getOffset: Option[Offset] = { + throw new UnsupportedOperationException + } + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + throw new UnsupportedOperationException + } + override def stop(): Unit = {} + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index f022edea275e0..25a68e4f9a57c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -20,13 +20,16 @@ package org.apache.spark.sql.streaming.sources import java.util import java.util.Collections -import org.apache.spark.sql.{DataFrame, SQLContext} +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, RateStreamOffset, Sink, StreamingQueryWrapper} import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage} @@ -77,24 +80,12 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite { } } -trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead { - override def name(): String = "fake" - override def schema(): StructType = StructType(Seq()) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder -} - -trait FakeContinuousReadTable extends Table with SupportsContinuousRead { - override def name(): String = "fake" - override def schema(): StructType = StructType(Seq()) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder -} - -trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite { +trait FakeStreamingWriteTable extends Table with SupportsWrite with BaseStreamingSink { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = { + Set(STREAMING_WRITE).asJava + } override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new FakeWriteBuilder } @@ -110,7 +101,16 @@ class FakeReadMicroBatchOnly override def getTable(options: CaseInsensitiveStringMap): Table = { LastReadOptions.options = options - new FakeMicroBatchReadTable {} + new Table with SupportsRead { + override def name(): String = "fake" + override def schema(): StructType = StructType(Seq()) + override def capabilities(): util.Set[TableCapability] = { + Set(MICRO_BATCH_READ).asJava + } + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new FakeScanBuilder + } + } } } @@ -124,7 +124,16 @@ class FakeReadContinuousOnly override def getTable(options: CaseInsensitiveStringMap): Table = { LastReadOptions.options = options - new FakeContinuousReadTable {} + new Table with SupportsRead { + override def name(): String = "fake" + override def schema(): StructType = StructType(Seq()) + override def capabilities(): util.Set[TableCapability] = { + Set(CONTINUOUS_READ).asJava + } + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new FakeScanBuilder + } + } } } @@ -132,7 +141,16 @@ class FakeReadBothModes extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-read-microbatch-continuous" override def getTable(options: CaseInsensitiveStringMap): Table = { - new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {} + new Table with SupportsRead { + override def name(): String = "fake" + override def schema(): StructType = StructType(Seq()) + override def capabilities(): util.Set[TableCapability] = { + Set(MICRO_BATCH_READ, CONTINUOUS_READ).asJava + } + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new FakeScanBuilder + } + } } } @@ -365,39 +383,37 @@ class StreamingDataSourceV2Suite extends StreamTest { val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor() .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty()) - (sourceTable, sinkTable, trigger) match { - // Valid microbatch queries. - case (_: SupportsMicroBatchRead, _: SupportsStreamingWrite, t) - if !t.isInstanceOf[ContinuousTrigger] => - testPositiveCase(read, write, trigger) - - // Valid continuous queries. - case (_: SupportsContinuousRead, _: SupportsStreamingWrite, - _: ContinuousTrigger) => - testPositiveCase(read, write, trigger) - + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + trigger match { // Invalid - can't read at all - case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] && - !r.isInstanceOf[SupportsContinuousRead] => + case _ if !sourceTable.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => testNegativeCase(read, write, trigger, s"Data source $read does not support streamed reading") // Invalid - can't write - case (_, w, _) if !w.isInstanceOf[SupportsStreamingWrite] => + case _ if !sinkTable.supports(STREAMING_WRITE) => testNegativeCase(read, write, trigger, s"Data source $write does not support streamed writing") - // Invalid - trigger is continuous but reader is not - case (r, _: SupportsStreamingWrite, _: ContinuousTrigger) - if !r.isInstanceOf[SupportsContinuousRead] => - testNegativeCase(read, write, trigger, - s"Data source $read does not support continuous processing") + case _: ContinuousTrigger => + if (sourceTable.supports(CONTINUOUS_READ)) { + // Valid microbatch queries. + testPositiveCase(read, write, trigger) + } else { + // Invalid - trigger is continuous but reader is not + testNegativeCase( + read, write, trigger, s"Data source $read does not support continuous processing") + } - // Invalid - trigger is microbatch but reader is not - case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] && - !t.isInstanceOf[ContinuousTrigger] => - testPostCreationNegativeCase(read, write, trigger, - s"Data source $read does not support microbatch processing") + case microBatchTrigger => + if (sourceTable.supports(MICRO_BATCH_READ)) { + // Valid continuous queries. + testPositiveCase(read, write, trigger) + } else { + // Invalid - trigger is microbatch but reader is not + testPostCreationNegativeCase(read, write, trigger, + s"Data source $read does not support microbatch processing") + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 3bca77094badb..70364e563bff7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck +import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} @@ -89,6 +89,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session PreWriteCheck +: PreReadCheck +: V2WriteSupportCheck +: + V2StreamingScanSupportCheck +: customCheckRules }