Skip to content

Conversation

@WweiL
Copy link
Contributor

@WweiL WweiL commented May 8, 2023

What changes were proposed in this pull request?

Add streaming query listener for python client. In this version, the code runs on the driver.

Why are the changes needed?

Continuation development of streaming connect

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

For now manually:
client:

Using Python version 3.9.16 (main, Dec  7 2022 01:11:58)
Client connected to the Spark Connect server at localhost
SparkSession available as 'spark'.
>>> from pyspark.sql.connect.streaming.listener import QueryStartedEvent;from pyspark.sql.connect.streaming.listener import StreamingQueryListener;from pyspark.sql.streaming.listener import (QueryProgressEvent, QueryTerminatedEvent, QueryIdleEvent)
>>> class MyListener(StreamingQueryListener):
...     def onQueryStarted(self, event: QueryStartedEvent) -> None:
...             print("hi, event query id is: " +  event.id)
...     def onQueryProgress(self, event: QueryProgressEvent) -> None:
...             pass
...     def onQueryTerminated(self, event: QueryTerminatedEvent) -> None:
...             pass
...     def onQueryIdle(self, event: QueryIdleEvent) -> None:
...             pass
... 
>>> spark.streams.addListener(MyListener())
>>> q = spark.readStream.format("rate").load().writeStream.format("console").start()
>>> q.stop()

server(driver):

23/05/08 21:27:49 INFO MicroBatchExecution: Starting [id = 6042353a-dc77-436b-9ee5-d4d0653ec0a2, runId = 269288ae-3463-408c-bbb3-a72934a4bc1d]. Use file:/tmp/temporary-9ec9765b-b6d8-427e-bfe9-fa758b9f87b7 to store the query checkpoint.
##### Python out for query start event is: out=###### Start running onQueryStarted ######
hi, event query id is: 6042353a-dc77-436b-9ee5-d4d0653ec0a2

##### Python error for query start event is: error=
##### End processing query start event exitCode=0
from pyspark.sql.streaming.listener import StreamingQueryListener;from pyspark.sql.streaming.listener import (QueryStartedEvent, QueryProgressEvent, QueryTerminatedEvent, QueryIdleEvent)
>>> class MyListener(StreamingQueryListener):
...     def onQueryStarted(self, event: QueryStartedEvent) -> None: print("hi, event query id is: " +  str(event.id))
...     def onQueryProgress(self, event: QueryProgressEvent) -> None: pass
...     def onQueryTerminated(self, event: QueryTerminatedEvent) -> None: pass
...     def onQueryIdle(self, event: QueryIdleEvent) -> None: pass

@WweiL
Copy link
Contributor Author

WweiL commented May 8, 2023

@rangadi @pengzhon-db


// TODO(Wei): serialize and deserialize events

private def toJSON(event: StreamingQueryListener.QueryStartedEvent): String =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

HyukjinKwon pushed a commit that referenced this pull request Jun 21, 2023
…in JSON format

### What changes were proposed in this pull request?

Following the discussion of `foreachBatch` implementation, we decide to implement connect StreamingQueryListener in a way that the server runs the listener code, rather than the client.

Following this POC: #41096, this is going to be done in a way such that
1. Client sends serialized python code to server
2. Server initializes a Scala `StreamingQueryListener`, which initialize the python progress and run the python code. (Details of this step still depends on `foreachBatch` implementation.
3. When a new StreamingQuery Event comes in, the jvm serialize it to JSON and send it to the python progress to process.

This PR focus on step 3, the serialization and deserialization of the events.

Also finishes a TODO to check exception in QueryTerminatedEvent

### Why are the changes needed?

For implementing Connect StreamingQueryListener

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New unit tests

Closes #41540 from WweiL/SPARK-42941-listener-python-new-1.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@WweiL WweiL closed this Jul 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant