Skip to content

Conversation

@WweiL
Copy link
Contributor

@WweiL WweiL commented Jun 9, 2023

What changes were proposed in this pull request?

Following the discussion of foreachBatch implementation, we decide to implement connect StreamingQueryListener in a way that the server runs the listener code, rather than the client.

Following this POC: #41096, this is going to be done in a way such that

  1. Client sends serialized python code to server
  2. Server initializes a Scala StreamingQueryListener, which initialize the python progress and run the python code. (Details of this step still depends on foreachBatch implementation.
  3. When a new StreamingQuery Event comes in, the jvm serialize it to JSON and send it to the python progress to process.

This PR focus on step 3, the serialization and deserialization of the events.

Also finishes a TODO to check exception in QueryTerminatedEvent

Why are the changes needed?

For implementing Connect StreamingQueryListener

Does this PR introduce any user-facing change?

No

How was this patch tested?

New unit tests

@WweiL
Copy link
Contributor Author

WweiL commented Jun 9, 2023

PS. ChatGPT is especially helpful in doing such boilerplate jobs :P

inputRowsPerSecond=j["inputRowsPerSecond"],
processedRowsPerSecond=j["processedRowsPerSecond"],
observedMetrics={
k: Row(*row_dict.keys())(*row_dict.values()) # Assume no nested rows
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be nested row for this field?

Copy link
Contributor Author

@WweiL WweiL Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the original PRs #26127. The intended use case of observe method is to construct this Row by aggregating on some fields. I think we don't need to handle nested rows here but I'm open to discussion.

@WweiL
Copy link
Contributor Author

WweiL commented Jun 9, 2023

@rangadi @HyukjinKwon Can you take a look? Thanks!

@HyukjinKwon
Copy link
Member

cc @HeartSaVioR too

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine from a cursory look.

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a few comments.

)

@classmethod
def fromJson(cls, j: Dict[str, Any]) -> "QueryStartedEvent":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the context where these are used? Thanks for the detailed description and you mentioned it for 'step 3'. Could you point to any WIP code that uses this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Here is the pointer: https://github.com/apache/spark/pull/41096/files#r1230025798
Note that it's draft code so it's really messy. Basically since we register a scala listener but run python code inside it. We need a way to send the events to python process. And JSON is a safe way for that job

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Scala does toJson and Python code does fromJson.
Should this be under connect directory since it is Connect specific code?

Copy link
Contributor Author

@WweiL WweiL Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to still let users to use import pyspark.sql.streaming.Query<xxx>Event in Connect. If we put the code in connect folder they might need to do import pyspark.sql.connect.streaming.Query<xxx>Event.

A way I can think of doing this is to refactor the code to make the event classes abstract here:

class QueryStartedEvent(ABC):
    @property
    @abstractmethod
    def id(self) -> uuid.UUID:
        ...

Do you think we should do that?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to still let users to use import pyspark.sql.streaming.QueryEvent

@HyukjinKwon is this true? I think Users have only the Spark Connect python code.

import pyspark.sql.connect.streaming.QueryEvent.

User code in a StreamingListener in connect is a Spark Connect code. I think our imports statements import the right version (connect or legacy depending on the environment).

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually I didn't want users to directly import the event class (and that's why I mentioned that the constructor is private). Do we need for end users to be able to import in Spark Connect?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would write StreamingListener code just like their rest of Spark-Connect code. I.e. they have access to only those packages that are available in Spark-Connect. (they don't import 'connect' version directly).

What that would mean for this class is a bit uncertain to me. If @HyukjinKwon is ok with this, I am ok.

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants