diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index bbb060356f730..a37e4886c82c6 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -50,7 +50,7 @@ private[kafka010] object KafkaWriter extends Logging { topic: Option[String] = None): Unit = { schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic.isEmpty) { - throw new AnalysisException(s"topic option required when no " + + throw new IllegalArgumentException(s"topic option required when no " + s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.") } else { @@ -59,22 +59,22 @@ private[kafka010] object KafkaWriter extends Logging { ).dataType match { case StringType => // good case _ => - throw new AnalysisException(s"Topic type must be a ${StringType.catalogString}") + throw new IllegalArgumentException(s"Topic type must be a ${StringType.catalogString}") } schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse( Literal(null, StringType) ).dataType match { case StringType | BinaryType => // good case _ => - throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " + + throw new IllegalArgumentException(s"$KEY_ATTRIBUTE_NAME attribute type " + s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}") } schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse( - throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found") + throw new IllegalArgumentException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found") ).dataType match { case StringType | BinaryType => // good case _ => - throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " + + throw new IllegalArgumentException(s"$VALUE_ATTRIBUTE_NAME attribute type " + s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}") } schema.find(_.name == HEADERS_ATTRIBUTE_NAME).getOrElse( @@ -83,7 +83,7 @@ private[kafka010] object KafkaWriter extends Logging { ).dataType match { case KafkaRecordToRowConverter.headersType => // good case _ => - throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " + + throw new IllegalArgumentException(s"$HEADERS_ATTRIBUTE_NAME attribute type " + s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}") } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index 65adbd6b9887c..3fa7aae40d488 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -25,6 +25,8 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{BinaryType, DataType} import org.apache.spark.util.Utils @@ -215,6 +217,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { test("streaming - write data with bad schema") { val inputTopic = newTopic() testUtils.createTopic(inputTopic, partitions = 1) + testUtils.sendMessages(inputTopic, Array("0")) val input = spark .readStream @@ -226,21 +229,21 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val topic = newTopic() testUtils.createTopic(topic) - val ex = intercept[AnalysisException] { + val ex = intercept[StreamingQueryException] { /* No topic field or topic option */ createKafkaWriter(input.toDF())( withSelectExpr = "value as key", "value" - ) + ).processAllAvailable() } assert(ex.getMessage .toLowerCase(Locale.ROOT) .contains("topic option required when no 'topic' attribute is present")) - val ex2 = intercept[AnalysisException] { + val ex2 = intercept[StreamingQueryException] { /* No value field */ createKafkaWriter(input.toDF())( withSelectExpr = s"'$topic' as topic", "value as key" - ) + ).processAllAvailable() } assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( "required attribute 'value' not found")) @@ -249,6 +252,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { test("streaming - write data with valid schema but wrong types") { val inputTopic = newTopic() testUtils.createTopic(inputTopic, partitions = 1) + testUtils.sendMessages(inputTopic, Array("0")) val input = spark .readStream @@ -261,28 +265,28 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val topic = newTopic() testUtils.createTopic(topic) - val ex = intercept[AnalysisException] { + val ex = intercept[StreamingQueryException] { /* topic field wrong type */ createKafkaWriter(input.toDF())( withSelectExpr = s"CAST('1' as INT) as topic", "value" - ) + ).processAllAvailable() } assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) - val ex2 = intercept[AnalysisException] { + val ex2 = intercept[StreamingQueryException] { /* value field wrong type */ createKafkaWriter(input.toDF())( withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value" - ) + ).processAllAvailable() } assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( "value attribute type must be a string or binary")) - val ex3 = intercept[AnalysisException] { + val ex3 = intercept[StreamingQueryException] { /* key field wrong type */ createKafkaWriter(input.toDF())( withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value" - ) + ).processAllAvailable() } assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains( "key attribute type must be a string or binary")) @@ -330,18 +334,18 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { .option("subscribe", inputTopic) .load() - val ex = intercept[IllegalArgumentException] { + val ex = intercept[StreamingQueryException] { createKafkaWriter( input.toDF(), - withOptions = Map("kafka.key.serializer" -> "foo"))() + withOptions = Map("kafka.key.serializer" -> "foo"))().processAllAvailable() } assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "kafka option 'key.serializer' is not supported")) - val ex2 = intercept[IllegalArgumentException] { + val ex2 = intercept[StreamingQueryException] { createKafkaWriter( input.toDF(), - withOptions = Map("kafka.value.serializer" -> "foo"))() + withOptions = Map("kafka.value.serializer" -> "foo"))().processAllAvailable() } assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( "kafka option 'value.serializer' is not supported")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index c8d29520bcfce..c6de16a6f4bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownF import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.streaming.{WriteMicroBatch, WriteMicroBatchExec} import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -176,9 +177,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { withProjection :: Nil - case WriteToDataSourceV2(writer, query) => - WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil - case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) => CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil @@ -265,8 +263,13 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { }).toArray DeleteFromTableExec(r.table.asDeletable, filters) :: Nil - case WriteToContinuousDataSource(writer, query) => - WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil + case WriteMicroBatch(table, query, queryId, querySchema, outputMode, options, epochId) => + WriteMicroBatchExec( + table, planLater(query), queryId, querySchema, outputMode, options, epochId) :: Nil + + case WriteToContinuousDataSource(table, query, queryId, querySchema, outputMode, options) => + WriteToContinuousDataSourceExec( + table, planLater(query), queryId, querySchema, outputMode, options) :: Nil case Repartition(1, false, child) => val isContinuous = child.find { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 9f4392da6ab4d..cfdf934329267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -38,17 +38,6 @@ import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LongAccumulator, Utils} -/** - * Deprecated logical plan for writing data into data source v2. This is being replaced by more - * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. - */ -@deprecated("Use specific logical plans like AppendData instead", "2.4.0") -case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) - extends LogicalPlan { - override def children: Seq[LogicalPlan] = Seq(query) - override def output: Seq[Attribute] = Nil -} - /** * Physical plan node for v2 create table as select when the catalog does not support staging * the table creation. @@ -315,17 +304,6 @@ case class OverwritePartitionsDynamicExec( } } -case class WriteToDataSourceV2Exec( - batchWrite: BatchWrite, - query: SparkPlan) extends V2TableWriteExec { - - def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty() - - override protected def doExecute(): RDD[InternalRow] = { - writeWithV2(batchWrite) - } -} - /** * Helper for physical plans that build batch writes. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingWriteExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingWriteExec.scala new file mode 100644 index 0000000000000..1fd32109b0cfc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingWriteExec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.connector.catalog.SupportsWrite +import org.apache.spark.sql.connector.write.SupportsTruncate +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils + +trait BaseStreamingWriteExec extends UnaryExecNode { + def table: SupportsWrite + def query: SparkPlan + def queryId: String + def querySchema: StructType + def outputMode: OutputMode + def options: Map[String, String] + + override def child: SparkPlan = query + override def output: Seq[Attribute] = Nil + + protected lazy val inputRDD = query.execute() + lazy val streamWrite = { + val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava)) + .withQueryId(queryId) + .withInputDataSchema(querySchema) + outputMode match { + case Append => + writeBuilder.buildForStreaming() + + case Complete => + // TODO: we should do this check earlier when we have capability API. + require(writeBuilder.isInstanceOf[SupportsTruncate], + table.name + " does not support Complete mode.") + writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() + + case Update => + // Although no v2 sinks really support Update mode now, but during tests we do want them + // to pretend to support Update mode, and treat Update mode same as Append mode. + if (Utils.isTesting) { + writeBuilder.buildForStreaming() + } else { + throw new IllegalArgumentException( + "Data source v2 streaming sinks does not support Update mode.") + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 5fe1f92e396c9..bff6b552be36a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource} +import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress} +import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.util.Clock @@ -127,8 +127,8 @@ class MicroBatchExecution( // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. sink match { case s: SupportsWrite => - val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan) - WriteToMicroBatchDataSource(streamingWrite, _logicalPlan) + WriteToMicroBatchDataSource( + s, _logicalPlan, id.toString, _logicalPlan.schema, outputMode, extraOptions) case _ => _logicalPlan } @@ -557,7 +557,7 @@ class MicroBatchExecution( nextBatch.collect() } lastExecution.executedPlan match { - case w: WriteToDataSourceV2Exec => w.commitProgress + case w: WriteMicroBatchExec => w.commitProgress case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f470ad3f9c690..04754a22f80c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -584,35 +584,6 @@ abstract class StreamExecution( |batch = $batchDescription""".stripMargin } - protected def createStreamingWrite( - table: SupportsWrite, - options: Map[String, String], - inputPlan: LogicalPlan): StreamingWrite = { - val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava)) - .withQueryId(id.toString) - .withInputDataSchema(inputPlan.schema) - outputMode match { - case Append => - writeBuilder.buildForStreaming() - - case Complete => - // TODO: we should do this check earlier when we have capability API. - require(writeBuilder.isInstanceOf[SupportsTruncate], - table.name + " does not support Complete mode.") - writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() - - case Update => - // Although no v2 sinks really support Update mode now, but during tests we do want them - // to pretend to support Update mode, and treat Update mode same as Append mode. - if (Utils.isTesting) { - writeBuilder.buildForStreaming() - } else { - throw new IllegalArgumentException( - "Data source v2 streaming sinks does not support Update mode.") - } - } - } - protected def purge(threshold: Long): Unit = { logDebug(s"Purging metadata at threshold=$threshold") offsetLog.purge(threshold) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WriteToMicroBatchDataSource.scala new file mode 100644 index 0000000000000..590e4f015e09b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WriteToMicroBatchDataSource.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} +import org.apache.spark.sql.connector.catalog.SupportsWrite +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * The logical plan for writing data to a micro-batch stream. + * + * Note that this logical plan does not have a corresponding physical plan, as it will be converted + * to [[WriteMicroBatch]] with epoch id for each micro-batch. + */ +case class WriteToMicroBatchDataSource( + table: SupportsWrite, + query: LogicalPlan, + queryId: String, + querySchema: StructType, + outputMode: OutputMode, + options: Map[String, String]) + extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + def createPlan(epochId: Long): WriteMicroBatch = { + WriteMicroBatch(table, query, queryId, querySchema, outputMode, options, epochId) + } +} + +case class WriteMicroBatch( + table: SupportsWrite, + query: LogicalPlan, + queryId: String, + querySchema: StructType, + outputMode: OutputMode, + options: Map[String, String], + epochId: Long) extends UnaryNode { + override def child: LogicalPlan = query + override def output: Seq[Attribute] = Nil +} + +case class WriteMicroBatchExec( + table: SupportsWrite, + query: SparkPlan, + queryId: String, + querySchema: StructType, + outputMode: OutputMode, + options: Map[String, String], + epochId: Long) extends BaseStreamingWriteExec with V2TableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { + val batchWrite = new MicroBatchWrite(epochId, streamWrite) + writeWithV2(batchWrite) + } +} + +/** + * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements + * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped + * streaming write support. + */ +class MicroBatchWrite(epochId: Long, streamWrite: StreamingWrite) extends BatchWrite { + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + streamWrite.commit(epochId, messages) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + streamWrite.abort(epochId, messages) + } + + override def createBatchWriterFactory(): DataWriterFactory = { + new MicroBatchWriterFactory(epochId, streamWrite.createStreamingWriterFactory()) + } +} + +class MicroBatchWriterFactory(epochId: Long, streamingWriterFactory: StreamingDataWriterFactory) + extends DataWriterFactory { + + override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { + streamingWriterFactory.createWriter(partitionId, taskId, epochId) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 8c7371e75b539..b06ef78bd9d1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -88,7 +88,7 @@ class ContinuousExecution( // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. WriteToContinuousDataSource( - createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan) + sink, _logicalPlan, id.toString, _logicalPlan.schema, outputMode, extraOptions) } private val triggerExecutor = trigger match { @@ -178,7 +178,7 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - reportTimeTaken("queryPlanning") { + val write = reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionForQuery, withNewSources, @@ -188,7 +188,8 @@ class ContinuousExecution( runId, currentBatchId, offsetSeqMetadata) - lastExecution.executedPlan // Force the lazy generation of execution plan + // Force the lazy generation of execution plan and get the `StreamWrite`. + lastExecution.executedPlan.asInstanceOf[WriteToContinuousDataSourceExec].streamWrite } val stream = withNewSources.collect { @@ -212,7 +213,7 @@ class ContinuousExecution( // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( - logicalPlan.write, + write, stream, this, epochCoordinatorId, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala index cecb2843fc3b0..cfe27ab9ab4c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala @@ -19,13 +19,20 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.write.streaming.StreamingWrite +import org.apache.spark.sql.connector.catalog.SupportsWrite +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType /** * The logical plan for writing data in a continuous stream. */ -case class WriteToContinuousDataSource(write: StreamingWrite, query: LogicalPlan) - extends LogicalPlan { +case class WriteToContinuousDataSource( + table: SupportsWrite, + query: LogicalPlan, + queryId: String, + querySchema: StructType, + outputMode: OutputMode, + options: Map[String, String]) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index d4e522562e914..143021e1c943f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -23,35 +23,37 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.streaming.StreamingWrite -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.streaming.{BaseStreamingWriteExec, StreamExecution} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType /** * The physical plan for writing data into a continuous processing [[StreamingWrite]]. */ -case class WriteToContinuousDataSourceExec(write: StreamingWrite, query: SparkPlan) - extends UnaryExecNode with Logging { - - override def child: SparkPlan = query - override def output: Seq[Attribute] = Nil +case class WriteToContinuousDataSourceExec( + table: SupportsWrite, + query: SparkPlan, + queryId: String, + querySchema: StructType, + outputMode: OutputMode, + options: Map[String, String]) extends BaseStreamingWriteExec with Logging { override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = write.createStreamingWriterFactory() - val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) - - logInfo(s"Start processing data source write support: $write. " + - s"The input RDD has ${rdd.partitions.length} partitions.") + logInfo(s"Start processing data source StreamWrite: $streamWrite. " + + s"The input RDD has ${inputRDD.partitions.length} partitions.") EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - sparkContext.env) - .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) + sparkContext.env + ).askSync[Unit](SetWriterPartitions(inputRDD.getNumPartitions)) + val writerFactory = streamWrite.createStreamingWriterFactory() try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - rdd.collect() + new ContinuousWriteRDD(inputRDD, writerFactory).collect() } catch { case _: InterruptedException => // Interruption is how continuous queries are ended, so accept and ignore the exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala deleted file mode 100644 index 5f12832cd2550..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.sources - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} -import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} - -/** - * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements - * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped - * streaming write support. - */ -class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWrite) extends BatchWrite { - - override def commit(messages: Array[WriterCommitMessage]): Unit = { - writeSupport.commit(eppchId, messages) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - writeSupport.abort(eppchId, messages) - } - - override def createBatchWriterFactory(): DataWriterFactory = { - new MicroBatchWriterFactory(eppchId, writeSupport.createStreamingWriterFactory()) - } -} - -class MicroBatchWriterFactory(epochId: Long, streamingWriterFactory: StreamingDataWriterFactory) - extends DataWriterFactory { - - override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { - streamingWriterFactory.createWriter(partitionId, taskId, epochId) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala deleted file mode 100644 index ef1115e6d9e01..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.sources - -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.write.streaming.StreamingWrite -import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2 - -/** - * The logical plan for writing data to a micro-batch stream. - * - * Note that this logical plan does not have a corresponding physical plan, as it will be converted - * to [[WriteToDataSourceV2]] with [[MicroBatchWrite]] before execution. - */ -case class WriteToMicroBatchDataSource(write: StreamingWrite, query: LogicalPlan) - extends LogicalPlan { - override def children: Seq[LogicalPlan] = Seq(query) - override def output: Seq[Attribute] = Nil - - def createPlan(batchId: Long): WriteToDataSourceV2 = { - WriteToDataSourceV2(new MicroBatchWrite(batchId, write), query) - } -}