diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index a5e5d01152db8..ede58bd26ce34 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.streaming.{Sink, Source} -import org.apache.spark.sql.internal.connector.SimpleTableProvider +import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdate} import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -394,7 +394,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister () => new KafkaScan(options) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { + new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate { private val options = info.options private val inputSchema: StructType = info.schema() private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim) @@ -410,6 +410,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister assert(inputSchema != null) new KafkaStreamingWrite(topic, producerParams, inputSchema) } + + override def truncate(): WriteBuilder = this + override def update(): WriteBuilder = this } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala new file mode 100644 index 0000000000000..32be74a345c5a --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal.connector + +import org.apache.spark.sql.connector.write.WriteBuilder + +// An internal `WriteBuilder` mixin to support UPDATE streaming output mode. +// TODO: design an official API for streaming output mode UPDATE. +trait SupportsStreamingUpdate extends WriteBuilder { + def update(): WriteBuilder +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 851cc51466a91..8a6c4dce75f30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} -import org.apache.spark.sql.internal.connector.SimpleTableProvider +import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdate} import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -52,8 +52,10 @@ private[noop] object NoopTable extends Table with SupportsWrite { } } -private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate { +private[noop] object NoopWriteBuilder extends WriteBuilder + with SupportsTruncate with SupportsStreamingUpdate { override def truncate(): WriteBuilder = this + override def update(): WriteBuilder = this override def buildForBatch(): BatchWrite = NoopBatchWrite override def buildForStreaming(): StreamingWrite = NoopStreamingWrite } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9b1951a834d9a..18fe38caa5e65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate import org.apache.spark.sql.streaming._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -629,14 +630,9 @@ abstract class StreamExecution( writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() case Update => - // Although no v2 sinks really support Update mode now, but during tests we do want them - // to pretend to support Update mode, and treat Update mode same as Append mode. - if (Utils.isTesting) { - writeBuilder.buildForStreaming() - } else { - throw new IllegalArgumentException( - "Data source v2 streaming sinks does not support Update mode.") - } + require(writeBuilder.isInstanceOf[SupportsStreamingUpdate], + table.name + " does not support Update mode.") + writeBuilder.asInstanceOf[SupportsStreamingUpdate].update().buildForStreaming() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index e471e6c601d16..1e64021c8105e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapabi import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite -import org.apache.spark.sql.internal.connector.SimpleTableProvider +import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdate} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -73,11 +73,12 @@ object ConsoleTable extends Table with SupportsWrite { } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder with SupportsTruncate { + new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate { private val inputSchema: StructType = info.schema() - // Do nothing for truncate. Console sink is special that it just prints all the records. + // Do nothing for truncate/update. Console sink is special and it just prints all the records. override def truncate(): WriteBuilder = this + override def update(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { assert(inputSchema != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index ba54c85d07303..57a73c740310e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapabi import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.python.PythonForeachWriter +import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate import org.apache.spark.sql.types.StructType /** @@ -54,12 +55,13 @@ case class ForeachWriterTable[T]( } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder with SupportsTruncate { + new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate { private var inputSchema: StructType = info.schema() - // Do nothing for truncate. Foreach sink is special that it just forwards all the records to - // ForeachWriter. + // Do nothing for truncate/update. Foreach sink is special and it just forwards all the + // records to ForeachWriter. override def truncate(): WriteBuilder = this + override def update(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { new StreamingWrite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index deab42bea36ad..03ebbb9f1b376 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapabi import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate import org.apache.spark.sql.types.StructType /** @@ -53,7 +54,7 @@ class MemorySink extends Table with SupportsWrite with Logging { } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder with SupportsTruncate { + new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate { private var needTruncate: Boolean = false private val inputSchema: StructType = info.schema() @@ -62,6 +63,9 @@ class MemorySink extends Table with SupportsWrite with Logging { this } + // The in-memory sink treats update as append. + override def update(): WriteBuilder = this + override def buildForStreaming(): StreamingWrite = { new MemoryStreamingWrite(MemorySink.this, inputSchema, needTruncate) }