Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,8 +1498,7 @@ def toTable(self, tableName, format=None, outputMode=None, partitionBy=None, que
Starts the execution of the streaming query, which will continually output results to the
given table as new data arrives.

A new table will be created if the table not exists. The returned
:class:`StreamingQuery` object can be used to interact with the stream.
The returned :class:`StreamingQuery` object can be used to interact with the stream.

.. versionadded:: 3.1.0

Expand Down Expand Up @@ -1531,6 +1530,15 @@ def toTable(self, tableName, format=None, outputMode=None, partitionBy=None, que
-----
This API is evolving.

For v1 table, partitioning columns provided by `partitionBy` will be respected no matter
the table exists or not. A new table will be created if the table not exists.

For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will
be respected only if the v2 table does not exist. Besides, the v2 table created by this API
lacks some functionalities (e.g., customized properties, options, and serde info). If you
need them, please create the v2 table manually before the execution to avoid creating a
table with incomplete information.

Examples
--------
>>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table')
Expand All @@ -1543,7 +1551,6 @@ def toTable(self, tableName, format=None, outputMode=None, partitionBy=None, que
... format='parquet',
... checkpointLocation='/tmp/checkpoint') # doctest: +SKIP
"""
# TODO(SPARK-33659): document the current behavior for DataStreamWriter.toTable API
self.options(**options)
if outputMode is not None:
self.outputMode(outputMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

/**
* Starts the execution of the streaming query, which will continually output results to the given
* table as new data arrives. A new table will be created if the table not exists. The returned
* [[StreamingQuery]] object can be used to interact with the stream.
* table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the
* table exists or not. A new table will be created if the table not exists.
*
* For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will be
* respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks
* some functionalities (e.g., customized properties, options, and serde info). If you need them,
* please create the v2 table manually before the execution to avoid creating a table with
* incomplete information.
*
* @since 3.1.0
*/
@Evolving
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery = {
this.tableName = tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
// query. This is because we load files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet")
.option("path", dir.getCanonicalPath).saveAsTable(tableName)
Expand All @@ -289,7 +289,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
// query. This is because we load files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName)

Expand All @@ -302,7 +302,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
// query. This is because we load files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write
.mode("append").format("parquet").save(dir.getCanonicalPath)
Expand Down