Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Jul 23, 2025

What changes were proposed in this pull request?

This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889.

Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding ExecutionHolder as the result of interruption here before a response sender thread consumes a response here.
In this case, the cleanup finally calls ExecutorResponseObserver.removeAll() and all the responses are discarded, and the response sender thread can't escape this loop because neither gotResponse nor streamFinished becomes true.

The solution this PR proposes is changing the definition of streamFinished in ExecuteGrpcResponseSender so that a stream is regarded as finished in case the ExecutionResponseObserver is marked as completed and all the responses are discarded.
ExecutionResponseObserver.removeAll is called when the corresponding ExecutionHolder is closed or cleaned up by interruption so this solution could be reasonable.

Why are the changes needed?

To fix a potential issue.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Tested manually.
You can easily reproduce this issue without this change by inserting sleep to the test like as follows.

--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
@@ -331,6 +331,7 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
         // cancel
         val operationId = result.operationId
         val canceledId = spark.interruptOperation(operationId)
+        Thread.sleep(1000)
         assert(canceledId == Seq(operationId))
         // and check that it got canceled
         val e = intercept[SparkException] {

After this change applied, the test above doesn't hang.

Was this patch authored or co-authored using generative AI tooling?

No.

@sarutak
Copy link
Member Author

sarutak commented Jul 24, 2025

GA failure seems related to this change.
I'll investigate the cause.

@HyukjinKwon
Copy link
Member

Merged to master.

@sarutak
Copy link
Member Author

sarutak commented Jul 25, 2025

I noticed that this change fixes SPARK-50748 too.
A reproducible senario is:

  1. Run SparkSessionE2ESuite.interrupt all - background queries, foreground interrupt
  2. ExecutionThread produces a response which includes schema to ExecuteResponseObserver .
  3. A response sender thread consume the schema response from the observer and produce a response which includes execution_progress
  4. The response sender sends the schema response and finishes this gRPC stream but execution_progress response is not consumed yet.
  5. The client calls interruptAll and the execution_progress response is discarded
  6. The client tries to get the response and a response sender thread spawns but all the responses including execution_progress have already discarded. So this thread can't escape from this loop.

I reproduced the issue reported in SPARK-50748 by reverting this change and insert sleeps like as follows.

--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
@@ -66,6 +66,7 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
       case Failure(t) =>
         error = Some("unexpected failure in q2: " + t.toString)
     }
+    Thread.sleep(1000)
     // 20 seconds is < 30 seconds the queries should be running,
     // because it should be interrupted sooner
     val interrupted = mutable.ListBuffer[String]()

--- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -232,7 +232,9 @@ class ExecutePlanResponseReattachableIterator(
       if (iter.isEmpty) {
         iter = Some(rawBlockingStub.reattachExecute(createReattachExecuteRequest()))
       }
-      iterFun(iter.get)
+      val result = iterFun(iter.get)
+      Thread.sleep(1000)
+      result
     } catch {
       case ex: StatusRuntimeException
           if Option(StatusProto.fromThrowable(ex))

--- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -232,8 +232,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
       // 2. has a response to send
       def gotResponse = response.nonEmpty
       // 3. sent everything from the stream and the stream is finished
-      def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) ||
-        executionObserver.isCleaned()
+      def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _)
       // 4. time deadline or size limit reached
       def deadlineLimitReached =
         sentResponsesSize > maximumResponseSize || deadlineTimeNs < System.nanoTime()

@HyukjinKwon @dongjoon-hyun What do you think?

@dongjoon-hyun
Copy link
Member

Thank you so much, @sarutak . You can resolve that too.

@dongjoon-hyun
Copy link
Member

I resolved SPARK-50748 with this PR and assigned it to you.

@dongjoon-hyun
Copy link
Member

Oh, @sarutak . I realized that this is a main code change.

  1. We should not use [TESTS] in the PR title when we change the main code.
  2. Could you revise the PR title properly about the main code change? JIRA issue is about reporting bugs and PR title is about the code change. We use the proper titles instead of the same one.

// 3. sent everything from the stream and the stream is finished
def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _)
def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) ||
executionObserver.isCleaned()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, this is a bug fix, isn't it?

I guess we need to backport this, @sarutak .

cc @grundprinzip , @hvanhovell , @peter-toth

@sarutak
Copy link
Member Author

sarutak commented Jul 25, 2025

@dongjoon-hyun
Sorry for the improper title for this PR. I'll set a proper one.
Should we also file file in JIRA as a bug which is related to SPARK-50889 and SPARK-50748?

Also, I'll check whether this bug affects branch-3.5 and branch-4.0, and if so, I'll open a PR for backporting.

@dongjoon-hyun
Copy link
Member

You can reuse the Jira issue. You don't need to file a new one.

@sarutak
Copy link
Member Author

sarutak commented Jul 25, 2025

OK, just change the title of this PR.

@sarutak sarutak changed the title [SPARK-50889][CONNECT][TESTS] Fix Flaky Test: SparkSessionE2ESuite.interrupt operation (Hang) [SPARK-50748][SPARK-50889][CONNECT] Fix a race condition issue which happens when operations are interrupted Jul 25, 2025
@sarutak
Copy link
Member Author

sarutak commented Jul 25, 2025

@dongjoon-hyun
I don't think this bug affects branch-3.5 because the interruption handling code is different from master and doesn't clean ExecuteResponseObserver on interruption.
But it affects branch-4.0 and I reproduced the same issues so I'll open a backport PR for branch-4.0.

@dongjoon-hyun
Copy link
Member

Thank you so much for verifying that

dongjoon-hyun pushed a commit that referenced this pull request Jul 27, 2025
…hich happens when operations are interrupted

### What changes were proposed in this pull request?
This PR backports #51638 to `branch-4.0`.
This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889.

Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding `ExecutionHolder` as the result of interruption [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175) before a response sender thread consumes a response [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala#L183).
In this case, the cleanup finally calls `ExecutorResponseObserver.removeAll()` and all the responses are discarded, and the response sender thread can't escape [this loop](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245) because neither `gotResponse` nor `streamFinished` becomes true.

The solution this PR proposes is changing the definition of `streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as finished in case  the `ExecutionResponseObserver` is marked as completed and all the responses are discarded.
`ExecutionResponseObserver.removeAll` is called when the corresponding `ExecutionHolder` is closed or cleaned up by interruption so this solution could be reasonable.

### Why are the changes needed?
To fix a potential issue.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tested manually.
You can easily reproduce this issue without this change by inserting sleep to the test like as follows.
```
--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
 -331,6 +331,7  class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
         // cancel
         val operationId = result.operationId
         val canceledId = spark.interruptOperation(operationId)
+        Thread.sleep(1000)
         assert(canceledId == Seq(operationId))
         // and check that it got canceled
         val e = intercept[SparkException] {
```

After this change applied, the test above doesn't hang.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #51671 from sarutak/connect-race-condition.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…hich happens when operations are interrupted

### What changes were proposed in this pull request?
This PR backports apache#51638 to `branch-4.0`.
This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889.

Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding `ExecutionHolder` as the result of interruption [here](https://github.com/apache/spark/blob/bfec6692b102b172bdbcad7f983e2ec2844383c9/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175) before a response sender thread consumes a response [here](https://github.com/apache/spark/blob/bfec6692b102b172bdbcad7f983e2ec2844383c9/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala#L183).
In this case, the cleanup finally calls `ExecutorResponseObserver.removeAll()` and all the responses are discarded, and the response sender thread can't escape [this loop](https://github.com/apache/spark/blob/bfec6692b102b172bdbcad7f983e2ec2844383c9/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245) because neither `gotResponse` nor `streamFinished` becomes true.

The solution this PR proposes is changing the definition of `streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as finished in case  the `ExecutionResponseObserver` is marked as completed and all the responses are discarded.
`ExecutionResponseObserver.removeAll` is called when the corresponding `ExecutionHolder` is closed or cleaned up by interruption so this solution could be reasonable.

### Why are the changes needed?
To fix a potential issue.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tested manually.
You can easily reproduce this issue without this change by inserting sleep to the test like as follows.
```
--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
 -331,6 +331,7  class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
         // cancel
         val operationId = result.operationId
         val canceledId = spark.interruptOperation(operationId)
+        Thread.sleep(1000)
         assert(canceledId == Seq(operationId))
         // and check that it got canceled
         val e = intercept[SparkException] {
```

After this change applied, the test above doesn't hang.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51671 from sarutak/connect-race-condition.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants