Skip to content

Conversation

@juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Jul 14, 2023

What changes were proposed in this pull request?

Currently, Spark Connect only allows to cancel all operations in a session by using SparkSession.interruptAll().
In this PR we are adding a mechanism to interrupt by tag (similar to SparkContext.cancelJobsWithTag), and to interrupt individual operations.

Also, add the new tags to SparkListenerConnectOperationStarted.

Why are the changes needed?

Better control of query cancelation in Spark Connect

Does this PR introduce any user-facing change?

Yes. New Apis in Spark Connect scala client:

SparkSession.addTag
SparkSession.removeTag
SparkSession.getTags
SparkSession.clearTags
SparkSession.interruptTag
SparkSession.interruptOperation

and also SparkResult.operationId, to be able to get the id for SparkSession.interruptOperation.

Python client APIs will be added in a followup PR.

How was this patch tested?

Added tests in SparkSessionE2ESuite.

@juliuszsompolski
Copy link
Contributor Author

At this point this is the server-side changes and I am working on client changes.

@juliuszsompolski juliuszsompolski changed the title [SPARK-44422] Spark Connect fine grained interrupt [SPARK-44422][CONNECT] Spark Connect fine grained interrupt Jul 14, 2023
@juliuszsompolski juliuszsompolski force-pushed the sc-fine-grained-cancel branch from b72d81d to a04e61b Compare July 17, 2023 15:46
@HyukjinKwon
Copy link
Member

cc @hvanhovell @grundprinzip

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice and clean. The tag to spark job tag needs some doc and I may have missed the usage in this PR

@juliuszsompolski
Copy link
Contributor Author

Thanks @grundprinzip I'm working on the usage (client side) right now.

Comment on lines 616 to 619
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has actually been fixed by #41315
(now the execution is in different thread, and the interrupt interrupts that thread, not only Spark Jobs.

@juliuszsompolski juliuszsompolski force-pushed the sc-fine-grained-cancel branch from d7f28d1 to d2c8524 Compare July 19, 2023 20:40
@juliuszsompolski
Copy link
Contributor Author

cc @zhenlineo @HyukjinKwon

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is using Option[String] more ideomatic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I adapted to the surrounding style of

  private[this] var opId: String = null
  private[this] var numRecords: Int = 0
  private[this] var structType: StructType = _
  private[this] var arrowSchema: pojo.Schema = _
  private[this] var nextResultIndex: Int = 0

From

  def schema: StructType = {
    if (structType == null) {
      processResponses(stopOnSchema = true)
    }
    structType
  }

it looks like I can assume _ is null, so I could make it _ and maybe that's more idiomatic.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I will work on Python side.

Copy link
Contributor

@zhenlineo zhenlineo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we block on receiving this operationId from the server? When that happens we can only interrupt all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently getting the operationId is indeed a bit awkward, and adding tags interruptTag is a more convenient API to use.
This will still be improved in a followup PR for async and detachable API execution. Some preparation is here already - https://github.com/apache/spark/pull/42009/files#diff-3cad257dc0c15b4d091beebdfd42659f803193c23667425d8926b84113a2a312R288 operationId can also be passed in ExecutePlanRequest, so that the client already knows it from the start and doesn't need to take it from first response.
In my followup PR I also plan to add a response that would always be sent right at the beginning of the query, so that even if the client did not set the operationId, it can get it right away.

@juliuszsompolski juliuszsompolski force-pushed the sc-fine-grained-cancel branch from fd1fb0d to e4047e3 Compare July 20, 2023 09:52
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 21, 2023

Merged to master, and branch-3.5.

HyukjinKwon pushed a commit that referenced this pull request Jul 21, 2023
### What changes were proposed in this pull request?

Currently, Spark Connect only allows to cancel all operations in a session by using SparkSession.interruptAll().
In this PR we are adding a mechanism to interrupt by tag (similar to SparkContext.cancelJobsWithTag), and to interrupt individual operations.

Also, add the new tags to SparkListenerConnectOperationStarted.

### Why are the changes needed?

Better control of query cancelation in Spark Connect

### Does this PR introduce _any_ user-facing change?

Yes. New Apis in Spark Connect scala client:
```
SparkSession.addTag
SparkSession.removeTag
SparkSession.getTags
SparkSession.clearTags
SparkSession.interruptTag
SparkSession.interruptOperation
```
and also `SparkResult.operationId`, to be able to get the id for `SparkSession.interruptOperation`.

Python client APIs will be added in a followup PR.

### How was this patch tested?

Added tests in SparkSessionE2ESuite.

Closes #42009 from juliuszsompolski/sc-fine-grained-cancel.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit dda3784)
Signed-off-by: Hyukjin Kwon <[email protected]>
* Spark Connect tags are also added as SparkContext job tags, but to make the tag unique, they
* need to be combined with userId and sessionId.
*/
def tagToSparkJobTag(tag: String): String = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski input tag isn't used for output, which doesn't look intended

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting!
@HyukjinKwon could you maybe piggy back changing it to maybe

    "SparkConnect_Execute_" +
      s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}"

to #42120 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
}

test("interrupt operation") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski after this pr, maven test SparkSessionE2ESuite start failed:

build/mvn clean install -DskipTests -Phive  
build/mvn clean test -pl connector/connect/client/jvm
SparkSessionE2ESuite:
- interrupt all - background queries, foreground interrupt *** FAILED ***
  The code passed to eventually never returned normally. Attempted 30 times over 20.274941708000004 seconds. Last failure message: Some("unexpected failure in q2: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult") was not empty Error not empty: Some(unexpected failure in q2: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult). (SparkSessionE2ESuite.scala:69)
- interrupt all - foreground queries, background interrupt *** FAILED ***
  "org/apache/spark/sql/connect/client/SparkResult" did not contain "OPERATION_CANCELED" Unexpected exception: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult (SparkSessionE2ESuite.scala:99)
- interrupt tag *** FAILED ***
  The code passed to eventually never returned normally. Attempted 30 times over 20.256318458000003 seconds. Last failure message: ListBuffer() had length 0 instead of expected length 2 Interrupted operations: ListBuffer().. (SparkSessionE2ESuite.scala:197)
- interrupt operation *** FAILED ***
  org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:82)
  at org.apache.spark.sql.connect.client.SparkResult.operationId(SparkResult.scala:173)
  at org.apache.spark.sql.SparkSessionE2ESuite.$anonfun$new$31(SparkSessionE2ESuite.scala:241)
  at org.apache.spark.sql.connect.client.util.RemoteSparkSession.$anonfun$test$1(RemoteSparkSession.scala:235)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang

Unexpected exception: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult

This appears to be a reocurrance of the ClassNotFound that you attempted to fix in #41487 by moving these tests to a separate class.
Like I wrote there, it's a bigger problem with UDF serialization pulling in seemingly unrelated classes. I raised https://issues.apache.org/jira/browse/SPARK-43744 on this, and @zhenlineo is working on it (WIP #42069)

Copy link
Contributor

@LuciferYang LuciferYang Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang

Unexpected exception: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult

This appears to be a reocurrance of the ClassNotFound that you attempted to fix in #41487 by moving these tests to a separate class. Like I wrote there, it's a bigger problem with UDF serialization pulling in seemingly unrelated classes. I raised https://issues.apache.org/jira/browse/SPARK-43744 on this, and @zhenlineo is working on it (WIP #42069)

OK, thanks @juliuszsompolski

Copy link
Contributor

@LuciferYang LuciferYang Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski Sorry for the interruption, #42069 has already been merged, but these 4 test cases will still maven test fail. Do you have any plans or suggestions for fix?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bogao007 it would be great when you get change to fix https://issues.apache.org/jira/browse/SPARK-44576
After the fix, we can move the whole test to use addArtifact to fix the 4 maven tests. I think we need to fix them before the 3.5 release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenlineo So ultimately, we still need to solve this issue by treating client.jar or client-test.jar as Artifact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang Our current solution only stubs classes from user session scoped libs. That covers jars added via session.addArtifact. The bug Spark-44576 should be fixed as streaming should not fail when user adds session scoped libs. If the failing tests really bothers you, we can also move them into separate test file for now. The cause is the unnecessary dependency pulled by lambdas inside the same class file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenlineo Thank you for your response. As long as we resolve this issue before the release of 3.5, it will be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang Our current solution only stubs classes from user session scoped libs. That covers jars added via session.addArtifact. The bug Spark-44576 should be fixed as streaming should not fail when user adds session scoped libs. If the failing tests really bothers you, we can also move them into separate test file for now. The cause is the unnecessary dependency pulled by lambdas inside the same class file.

I have created https://issues.apache.org/jira/browse/SPARK-44784 and marked it as Blocker. We should fix this issue before the release of Apache Spark 3.5.0. also cc @xuanyuanking to know.

HyukjinKwon added a commit that referenced this pull request Jul 25, 2023
…Connect Python client

### What changes were proposed in this pull request?

This PR proposes the Python implementations for #42009.

### Why are the changes needed?

For the feature parity, and better control of query cancelation in Spark Connect

### Does this PR introduce _any_ user-facing change?

Yes. New Apis in Spark Connect Python client:

```
SparkSession.addTag
SparkSession.removeTag
SparkSession.getTags
SparkSession.clearTags
SparkSession.interruptTag
SparkSession.interruptOperation
```

### How was this patch tested?

Unittests were added, and manually tested too.

Closes #42120 from HyukjinKwon/SPARK-44509.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jul 25, 2023
…Connect Python client

### What changes were proposed in this pull request?

This PR proposes the Python implementations for #42009.

### Why are the changes needed?

For the feature parity, and better control of query cancelation in Spark Connect

### Does this PR introduce _any_ user-facing change?

Yes. New Apis in Spark Connect Python client:

```
SparkSession.addTag
SparkSession.removeTag
SparkSession.getTags
SparkSession.clearTags
SparkSession.interruptTag
SparkSession.interruptOperation
```

### How was this patch tested?

Unittests were added, and manually tested too.

Closes #42120 from HyukjinKwon/SPARK-44509.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 5ee462d)
Signed-off-by: Hyukjin Kwon <[email protected]>
LuciferYang pushed a commit that referenced this pull request May 6, 2024
…nterrupt tag` test

### What changes were proposed in this pull request?

This PR aims to disable  a flaky test, `SparkSessionE2ESuite.interrupt tag`, temporarily.

To re-enable this, SPARK-48139 is created as a blocker issue for 4.0.0.

### Why are the changes needed?

This test case was added at `Apache Spark 3.5.0` but has been unstable unfortunately until now.
- #42009

We tried to stabilize this test case before `Apache Spark 4.0.0-preview`.
- #45173
- #46374

However, it's still flaky.

- https://github.com/apache/spark/actions/runs/8962353911/job/24611130573 (Master, 2024-05-05)
- https://github.com/apache/spark/actions/runs/8948176536/job/24581022674 (Master, 2024-05-04)

This PR aims to stablize CI first and to focus this flaky issue as a blocker level before going on `Spark Connect GA` in SPARK-48139 before Apache Spark 4.0.0.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46396 from dongjoon-hyun/SPARK-48138.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
dongjoon-hyun added a commit that referenced this pull request May 8, 2024
…nterrupt tag` test

### What changes were proposed in this pull request?

This PR aims to disable  a flaky test, `SparkSessionE2ESuite.interrupt tag`, temporarily.

To re-enable this, SPARK-48139 is created as a blocker issue for 4.0.0.

### Why are the changes needed?

This test case was added at `Apache Spark 3.5.0` but has been unstable unfortunately until now.
- #42009

We tried to stabilize this test case before `Apache Spark 4.0.0-preview`.
- #45173
- #46374

However, it's still flaky.

- https://github.com/apache/spark/actions/runs/8962353911/job/24611130573 (Master, 2024-05-05)
- https://github.com/apache/spark/actions/runs/8948176536/job/24581022674 (Master, 2024-05-04)

This PR aims to stablize CI first and to focus this flaky issue as a blocker level before going on `Spark Connect GA` in SPARK-48139 before Apache Spark 4.0.0.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46396 from dongjoon-hyun/SPARK-48138.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit 8294c59)
Signed-off-by: Dongjoon Hyun <[email protected]>
@virrrat
Copy link

virrrat commented May 19, 2025

hi @juliuszsompolski , I was going through this PR and see SparkSession.interruptOperation was added here but I don't see a way/method to fetch operation-id of ongoing (or already executed) operations. Am I missing anything here?

@juliuszsompolski
Copy link
Contributor Author

@virrrat one interactive possibility would be to see them in Spark UI Spark Connect tab. If an interactive user sees a stuck operation there, they can then use interruptOperation with the id copied from there.
While it is not a public API, you can also retrieve the list of executions by using SparkConnectService.listActiveExecutions

@juliuszsompolski
Copy link
Contributor Author

You can also implement a SparkListener to capture events like SparkListenerConnectOperationStarted and have logic capturing the ids from there - that would be a public API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants