Skip to content

Conversation

@SCHJonathan
Copy link
Contributor

@SCHJonathan SCHJonathan commented Aug 28, 2025

What changes were proposed in this pull request?

Introduces a mechanism for lazy execution of Declarative Pipelines query functions. A query function is something like the mv1 in this example:

@materialized_view
def mv1():
    return spark.table("upstream_table").filter(some_condition)

Currently, query functions are always executed eagerly. I.e. the implementation of the materialized_view decorator immediately invokes the function that it decorates and then registers the resulting DataFrame with the server.

This PR introduces Spark Connect proto changes that enable executing query functions later on, initiated by the server during graph resolution. After all datasets and flows have been registered with the server, the server can tell the client to execute the query functions for flows that haven't yet successfully been executed. The way this works is that the client initiates an RPC with the server, and then the server streams back responses that indicate to the client when it's time to execute a query function for one of its flows. Relevant changes:

  • New QueryFunctionFailure message
  • New QueryFunctionResult message
  • Replace relation field in DefineFlow with query_function_result field
  • New DefineFlowQueryFunctionResult message
  • New GetQueryFunctionExecutionSignalStream message
  • New PipelineQueryFunctionExecutionSignal message

Why are the changes needed?

There are some situations where we can't resolve the relation immediately at the time we're registering a flow.

E.g. consider this situation:
file 1:

@materialized_view
def mv1():
    data = [("Alice", 10), ("Bob", 15), ("Alice", 5)]
    return spark.createDataFrame(data, ["name", "amount"])

file 2:

@materialized_view
def mv2():
    return spark.table("mv1").groupBy("name").agg(sum("amount").alias("total_amount"))

Unlike some other transformations, which get analyzed lazily, groupBy can trigger an AnalyzePlan Spark Connect request immediately. If the query function for mv2 gets executed before mv1, then it will hit an error, because mv1 doesn't exist yet. groupBy isn't the only example here (df.schema, etc).

Other examples of these kinds of situations:

  • The set of columns for a downstream table is determined from the set of columns in an upstream table.
  • When spark.sql is used.

Does this PR introduce any user-facing change?

No

How was this patch tested?

It is a proto only changes. Will followup with unit tests and E2E tests once we add implementation.

Was this patch authored or co-authored using generative AI tooling?

No

@SCHJonathan SCHJonathan changed the title Jonathan chang data/proto changes [SPARK-52807][SDP] Proto changes to support analysis inside Declarative Pipelines query functions Aug 28, 2025
@SCHJonathan SCHJonathan force-pushed the jonathan-chang_data/proto-changes branch from a74b139 to d595752 Compare August 28, 2025 01:20
@SCHJonathan
Copy link
Contributor Author

cc @hvanhovell for review

message QueryFunctionFailure {
// Identifier for a dataset within the graph that the query function needed to know the schema
// of but which had not yet been analyzed itself.
optional string missing_dependency = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is is possible for us to return all missing dependencies in one go? That should help us to be more efficient during initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good point. This Proto basically describe a Flow's query function cannot be lazily analyzed by Spark Connect client because it triggers some eager analysis (e.g., df.schema / df.isStreaming), and the dependencies represented by the df have not yet been resolved by the dataflow graph.

It's very possible for the df to contain multiple unresolved dependencies (e.g., multi-table join). Thereby, instead of storing a string identifier for a single missing dependency, we directly pass the entire logical plan to the server and let the server to filter out which leaf nodes in the plan have not yet been resolved by the dataflow graph

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt @sryza

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. My understanding was that, with the current backend implementation, we only get one unresolved dependency at a time (because we basically find out that it's missing when we hit a failure). However:

  1. I might be wrong about that implementation
  2. We could make a more sophisticated implementation in the future, and it would be good if the protocol supports that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on further thinking and discussion, it seems like we might be able to just leave this out for now: when the server fails to analyze a plan, it knows what flow that plan was associated with, so it can just do the bookkeeping on its side.

There might be some edge situations (e.g. at the beginning) where this means that we end up needing one more query function invocation than we otherwise would, for query functions that do analysis. But we can bias towards simplicity for now and optimize later if we need to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good Sandy, let me update the proto to reflect this

// An unresolved relation that defines the dataset's flow.
optional spark.connect.Relation relation = 4;
// [Deprecated] An unresolved relation that defines the dataset's flow.
optional spark.connect.Relation relation = 4 [deprecated = true];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this un-released we can just remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SDP currently still extensive using this field right now, wdyt I mark the proto as deprecated in this PR, followup with a PR updating the code usage to use the new proto. After that, we remove the deprecated proto

message QueryFunctionResult {
oneof flow_function_evaluation_result {
// If the query function executed successfully, the unresolved logical plan produced by it.
spark.connect.Relation analyzed_plan = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's won't be analyzed yet, right? It's an unresolved plan that will be analyzed once it gets to the server? Also, I think we should use the word "relation" instead of "plan" – I think this was my bad originally and Dongjoon recently pointed out that relation was more accurage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG let me update it!

Copy link
Contributor Author

@SCHJonathan SCHJonathan Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me also create a ticket to rename the plan field to relation for DefineFlow RPC before SDP is released. opps. looks like it's already done

// The logical plan that the query function needed to eagerly analyze in order to know
// the schema / isStreaming / etc of the plan it produced, but could not because it has
// unresolved dependencies.
spark.connect.Relation unresolved_dependency_plan = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vs. passing a list of missing dependencies, which will lead to a simpler backend implementation?

@SCHJonathan SCHJonathan requested a review from sryza September 29, 2025 18:52

// An unresolved relation that defines the dataset's flow.
// An unresolved relation that defines the dataset's flow. Empty if the query function
// that defines the flow cannot be analyzed at the time of flow definition.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead the server would bookkeeping the unresolved dependency from the previously triggered AnalyzePlan RPC. SC client can just sent None as the relation when defining the flow, and the server would use the tracked unresolved dependencies to register the Flow in the DataflowGraph.

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@sryza
Copy link
Contributor

sryza commented Sep 29, 2025

Will give a little more time for @hvanhovell to take a look if he's interested, will then merge.

@sryza sryza closed this in e04fd59 Oct 1, 2025
dongjoon-hyun added a commit to apache/spark-connect-swift that referenced this pull request Oct 27, 2025
…th `4.1.0-preview3` RC1

### What changes were proposed in this pull request?

This PR aims to update Spark Connect-generated Swift source code with Apache Spark `4.1.0-preview3` RC1.

### Why are the changes needed?

There are many changes between Apache Spark 4.1.0-preview2 and preview3.

- apache/spark#52685
- apache/spark#52613
- apache/spark#52553
- apache/spark#52532
- apache/spark#52517
- apache/spark#52514
- apache/spark#52487
- apache/spark#52328
- apache/spark#52200
- apache/spark#52154
- apache/spark#51344

To use the latest bug fixes and new messages to develop for new features of `4.1.0-preview3`.

```
$ git clone -b v4.1.0-preview3 https://github.com/apache/spark.git
$ cd spark/sql/connect/common/src/main/protobuf/
$ protoc --swift_out=. spark/connect/*.proto
$ protoc --grpc-swift_out=. spark/connect/*.proto

// Remove empty GRPC files
$ cd spark/connect
$ grep 'This file contained no services' * | awk -F: '{print $1}' | xargs rm
```

### Does this PR introduce _any_ user-facing change?

Pass the CIs.

### How was this patch tested?

Pass the CIs. I manually tested with `Apache Spark 4.1.0-preview3` (with the two SDP ignored tests).

```
$ swift test --no-parallel
...
✔ Test run with 203 tests in 21 suites passed after 19.088 seconds.
```
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #252 from dongjoon-hyun/SPARK-54043.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…ve Pipelines query functions

### What changes were proposed in this pull request?

Introduces a mechanism for lazy execution of Declarative Pipelines query functions. A query function is something like the `mv1` in this example:
```python
materialized_view
def mv1():
    return spark.table("upstream_table").filter(some_condition)
```
Currently, query functions are always executed eagerly. I.e. the implementation of the `materialized_view` decorator immediately invokes the function that it decorates and then registers the resulting DataFrame with the server.

This PR introduces Spark Connect proto changes that enable executing query functions later on, initiated by the server during graph resolution. After all datasets and flows have been registered with the server, the server can tell the client to execute the query functions for flows that haven't yet successfully been executed. The way this works is that the client initiates an RPC with the server, and then the server streams back responses that indicate to the client when it's time to execute a query function for one of its flows. Relevant changes:
- New `QueryFunctionFailure` message
- New `QueryFunctionResult` message
- Replace relation field in `DefineFlow` with `query_function_result` field
- New `DefineFlowQueryFunctionResult` message
- New `GetQueryFunctionExecutionSignalStream` message
- New `PipelineQueryFunctionExecutionSignal` message

### Why are the changes needed?

There are some situations where we can't resolve the relation immediately at the time we're registering a flow.

E.g. consider this situation:
file 1:
```python
materialized_view
def mv1():
    data = [("Alice", 10), ("Bob", 15), ("Alice", 5)]
    return spark.createDataFrame(data, ["name", "amount"])
```
file 2:
```python
materialized_view
def mv2():
    return spark.table("mv1").groupBy("name").agg(sum("amount").alias("total_amount"))
```
Unlike some other transformations, which get analyzed lazily, `groupBy` can trigger an `AnalyzePlan` Spark Connect request immediately. If the query function for `mv2` gets executed before `mv1`, then it will hit an error, because `mv1` doesn't exist yet. `groupBy` isn't the only example here (`df.schema`, etc).

Other examples of these kinds of situations:
- The set of columns for a downstream table is determined from the set of columns in an upstream table.
- When `spark.sql` is used.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

It is a proto only changes. Will followup with unit tests and E2E tests once we add implementation.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52154 from SCHJonathan/jonathan-chang_data/proto-changes.

Authored-by: Yuheng Chang <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants