diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5f122293f4a0..51941a626907 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -1498,8 +1498,7 @@ def toTable(self, tableName, format=None, outputMode=None, partitionBy=None, que Starts the execution of the streaming query, which will continually output results to the given table as new data arrives. - A new table will be created if the table not exists. The returned - :class:`StreamingQuery` object can be used to interact with the stream. + The returned :class:`StreamingQuery` object can be used to interact with the stream. .. versionadded:: 3.1.0 @@ -1531,6 +1530,15 @@ def toTable(self, tableName, format=None, outputMode=None, partitionBy=None, que ----- This API is evolving. + For v1 table, partitioning columns provided by `partitionBy` will be respected no matter + the table exists or not. A new table will be created if the table not exists. + + For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will + be respected only if the v2 table does not exist. Besides, the v2 table created by this API + lacks some functionalities (e.g., customized properties, options, and serde info). If you + need them, please create the v2 table manually before the execution to avoid creating a + table with incomplete information. + Examples -------- >>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table') @@ -1543,7 +1551,6 @@ def toTable(self, tableName, format=None, outputMode=None, partitionBy=None, que ... format='parquet', ... checkpointLocation='/tmp/checkpoint') # doctest: +SKIP """ - # TODO(SPARK-33659): document the current behavior for DataStreamWriter.toTable API self.options(**options) if outputMode is not None: self.outputMode(outputMode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 2703119ce116..1be09e0e5f97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -302,11 +302,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * Starts the execution of the streaming query, which will continually output results to the given - * table as new data arrives. A new table will be created if the table not exists. The returned - * [[StreamingQuery]] object can be used to interact with the stream. + * table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with + * the stream. + * + * For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the + * table exists or not. A new table will be created if the table not exists. + * + * For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will be + * respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks + * some functionalities (e.g., customized properties, options, and serde info). If you need them, + * please create the v2 table manually before the execution to avoid creating a table with + * incomplete information. * * @since 3.1.0 */ + @Evolving @throws[TimeoutException] def toTable(tableName: String): StreamingQuery = { this.tableName = tableName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 9cf649605ed1..4c5c5e63cecb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -275,7 +275,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val tableName = "stream_test" withTable(tableName) { // The file written by batch will not be seen after the table was written by a streaming - // query. This is because we loads files from the metadata log instead of listing them + // query. This is because we load files from the metadata log instead of listing them // using HDFS API. Seq(4, 5, 6).toDF("value").write.format("parquet") .option("path", dir.getCanonicalPath).saveAsTable(tableName) @@ -289,7 +289,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val tableName = "stream_test" withTable(tableName) { // The file written by batch will not be seen after the table was written by a streaming - // query. This is because we loads files from the metadata log instead of listing them + // query. This is because we load files from the metadata log instead of listing them // using HDFS API. Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName) @@ -302,7 +302,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val tableName = "stream_test" withTable(tableName) { // The file written by batch will not be seen after the table was written by a streaming - // query. This is because we loads files from the metadata log instead of listing them + // query. This is because we load files from the metadata log instead of listing them // using HDFS API. Seq(4, 5, 6).toDF("value").write .mode("append").format("parquet").save(dir.getCanonicalPath)