diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 9e3599712fde..01e626e5436a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -22,12 +22,16 @@ import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.CreateTableStatement import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableProvider, V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource @@ -298,52 +302,85 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * Starts the execution of the streaming query, which will continually output results to the given - * table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with - * the stream. + * table as new data arrives. A new table will be created if the table not exists. The returned + * [[StreamingQuery]] object can be used to interact with the stream. * * @since 3.1.0 */ @throws[TimeoutException] def toTable(tableName: String): StreamingQuery = { - this.source = SOURCE_NAME_TABLE this.tableName = tableName - startInternal(None) - } - private def startInternal(path: Option[String]): StreamingQuery = { - if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Hive data source can only be used with tables, you can not " + - "write files of Hive data source directly.") - } + import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier - if (source == SOURCE_NAME_TABLE) { - assertNotPartitioned(SOURCE_NAME_TABLE) + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val originalMultipartIdentifier = df.sparkSession.sessionState.sqlParser + .parseMultipartIdentifier(tableName) + val CatalogAndIdentifier(catalog, identifier) = originalMultipartIdentifier - import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier + // Currently we don't create a logical streaming writer node in logical plan, so cannot rely + // on analyzer to resolve it. Directly lookup only for temp view to provide clearer message. + // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. + if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) { + throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write") + } + if (!catalog.asTableCatalog.tableExists(identifier)) { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val originalMultipartIdentifier = df.sparkSession.sessionState.sqlParser - .parseMultipartIdentifier(tableName) - val CatalogAndIdentifier(catalog, identifier) = originalMultipartIdentifier - - // Currently we don't create a logical streaming writer node in logical plan, so cannot rely - // on analyzer to resolve it. Directly lookup only for temp view to provide clearer message. - // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. - if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) { - throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write") - } + /** + * Note, currently the new table creation by this API doesn't fully cover the V2 table. + * TODO (SPARK-33638): Full support of v2 table creation + */ + val cmd = CreateTableStatement( + originalMultipartIdentifier, + df.schema.asNullable, + partitioningColumns.getOrElse(Nil).asTransforms.toSeq, + None, + Map.empty[String, String], + Some(source), + Map.empty[String, String], + extraOptions.get("path"), + None, + None, + external = false, + ifNotExists = false) + Dataset.ofRows(df.sparkSession, cmd) + } - val tableInstance = catalog.asTableCatalog.loadTable(identifier) + val tableInstance = catalog.asTableCatalog.loadTable(identifier) - import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ - val sink = tableInstance match { - case t: SupportsWrite if t.supports(STREAMING_WRITE) => t - case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " + - s"write - $t") + def writeToV1Table(table: CatalogTable): StreamingQuery = { + if (table.tableType == CatalogTableType.VIEW) { + throw new AnalysisException(s"Streaming into views $tableName is not supported.") + } + require(table.provider.isDefined) + if (source != table.provider.get) { + throw new AnalysisException(s"The input source($source) is different from the table " + + s"$tableName's data source provider(${table.provider.get}).") } + format(table.provider.get) + .option("path", new Path(table.location).toString).start() + } - startQuery(sink, extraOptions) - } else if (source == SOURCE_NAME_MEMORY) { + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + tableInstance match { + case t: SupportsWrite if t.supports(STREAMING_WRITE) => startQuery(t, extraOptions) + case t: V2TableWithV1Fallback => + writeToV1Table(t.v1Table) + case t: V1Table => + writeToV1Table(t.v1Table) + case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " + + s"write - $t") + } + } + + private def startInternal(path: Option[String]): StreamingQuery = { + if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "write files of Hive data source directly.") + } + + if (source == SOURCE_NAME_MEMORY) { assertNotPartitioned(SOURCE_NAME_MEMORY) if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index bf850432d5c0..0296366f3578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableCatalog, InMemoryTableSessionCatalog} @@ -39,6 +39,7 @@ import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.streaming.sources.FakeScanBuilder import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { import testImplicits._ @@ -175,21 +176,24 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { test("write: write to table with custom catalog & no namespace") { val tableIdentifier = "testcat.table_name" - spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo") - checkAnswer(spark.table(tableIdentifier), Seq.empty) + withTable(tableIdentifier) { + spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo") + checkAnswer(spark.table(tableIdentifier), Seq.empty) - runTestWithStreamAppend(tableIdentifier) + runTestWithStreamAppend(tableIdentifier) + } } test("write: write to table with custom catalog & namespace") { spark.sql("CREATE NAMESPACE testcat.ns") - val tableIdentifier = "testcat.ns.table_name" - spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo") - checkAnswer(spark.table(tableIdentifier), Seq.empty) + withTable(tableIdentifier) { + spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo") + checkAnswer(spark.table(tableIdentifier), Seq.empty) - runTestWithStreamAppend(tableIdentifier) + runTestWithStreamAppend(tableIdentifier) + } } test("write: write to table with default session catalog") { @@ -200,35 +204,19 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { spark.sql("CREATE NAMESPACE ns") val tableIdentifier = "ns.table_name" - spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING $v2Source") - checkAnswer(spark.table(tableIdentifier), Seq.empty) + withTable(tableIdentifier) { + spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING $v2Source") + checkAnswer(spark.table(tableIdentifier), Seq.empty) - runTestWithStreamAppend(tableIdentifier) + runTestWithStreamAppend(tableIdentifier) + } } test("write: write to non-exist table with custom catalog") { val tableIdentifier = "testcat.nonexisttable" - spark.sql("CREATE NAMESPACE testcat.ns") - - withTempDir { checkpointDir => - val exc = intercept[NoSuchTableException] { - runStreamQueryAppendMode(tableIdentifier, checkpointDir, Seq.empty, Seq.empty) - } - assert(exc.getMessage.contains("nonexisttable")) - } - } - - test("write: write to file provider based table isn't allowed yet") { - val tableIdentifier = "table_name" - - spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING parquet") - checkAnswer(spark.table(tableIdentifier), Seq.empty) - withTempDir { checkpointDir => - val exc = intercept[AnalysisException] { - runStreamQueryAppendMode(tableIdentifier, checkpointDir, Seq.empty, Seq.empty) - } - assert(exc.getMessage.contains("doesn't support streaming write")) + withTable(tableIdentifier) { + runTestWithStreamAppend(tableIdentifier) } } @@ -262,8 +250,107 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val exc = intercept[AnalysisException] { runStreamQueryAppendMode(viewIdentifier, checkpointDir, Seq.empty, Seq.empty) } - assert(exc.getMessage.contains("doesn't support streaming write")) + assert(exc.getMessage.contains(s"Streaming into views $viewIdentifier is not supported")) + } + } + + test("write: write to an external table") { + withTempDir { dir => + val tableName = "stream_test" + withTable(tableName) { + checkForStreamTable(Some(dir), tableName) + } + } + } + + test("write: write to a managed table") { + val tableName = "stream_test" + withTable(tableName) { + checkForStreamTable(None, tableName) + } + } + + test("write: write to an external table with existing path") { + withTempDir { dir => + val tableName = "stream_test" + withTable(tableName) { + // The file written by batch will not be seen after the table was written by a streaming + // query. This is because we loads files from the metadata log instead of listing them + // using HDFS API. + Seq(4, 5, 6).toDF("value").write.format("parquet") + .option("path", dir.getCanonicalPath).saveAsTable(tableName) + + checkForStreamTable(Some(dir), tableName) + } + } + } + + test("write: write to a managed table with existing path") { + val tableName = "stream_test" + withTable(tableName) { + // The file written by batch will not be seen after the table was written by a streaming + // query. This is because we loads files from the metadata log instead of listing them + // using HDFS API. + Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName) + + checkForStreamTable(None, tableName) + } + } + + test("write: write to an external path and create table") { + withTempDir { dir => + val tableName = "stream_test" + withTable(tableName) { + // The file written by batch will not be seen after the table was written by a streaming + // query. This is because we loads files from the metadata log instead of listing them + // using HDFS API. + Seq(4, 5, 6).toDF("value").write + .mode("append").format("parquet").save(dir.getCanonicalPath) + + checkForStreamTable(Some(dir), tableName) + } + } + } + + test("write: write to table with different format shouldn't be allowed") { + val tableName = "stream_test" + + spark.sql(s"CREATE TABLE $tableName (id bigint, data string) USING json") + checkAnswer(spark.table(tableName), Seq.empty) + + withTempDir { checkpointDir => + val exc = intercept[AnalysisException] { + runStreamQueryAppendMode(tableName, checkpointDir, Seq.empty, Seq.empty) + } + assert(exc.getMessage.contains("The input source(parquet) is different from the table " + + s"$tableName's data source provider(json)")) + } + } + + private def checkForStreamTable(dir: Option[File], tableName: String): Unit = { + val memory = MemoryStream[Int] + val dsw = memory.toDS().writeStream.format("parquet") + dir.foreach { output => + dsw.option("path", output.getCanonicalPath) + } + val sq = dsw + .option("checkpointLocation", Utils.createTempDir().getCanonicalPath) + .toTable(tableName) + memory.addData(1, 2, 3) + sq.processAllAvailable() + + checkDataset( + spark.table(tableName).as[Int], + 1, 2, 3) + val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + val path = if (dir.nonEmpty) { + dir.get + } else { + new File(catalogTable.location) } + checkDataset( + spark.read.format("parquet").load(path.getCanonicalPath).as[Int], + 1, 2, 3) } private def runTestWithStreamAppend(tableIdentifier: String) = {