Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Aug 20, 2025

What changes were proposed in this pull request?

This PR fixes an issue which occurs when an operation in pending state is interrupted.
Once an operation in pending state is interrupted, the interruption and following all interruption for the operation never work correctly.
You can easily reproduce this issue by modifying SparkConnectExecutionManager#createExecuteHolderAndAttach like as follows.

     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
+      Thread.sleep(1000)
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
     } catch {

And then run a test interrupt all - background queries, foreground interrupt in SparkSessionE2ESuite.

$ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"'

You will see the following error.

[info] - interrupt all - background queries, foreground interrupt *** FAILED *** (20 seconds, 344 milliseconds)
[info]   The code passed to eventually never returned normally. Attempted 28 times over 20.285258458 seconds. Last failure message: Some("unexpected failure in q2: org.apache.spark.SparkException: java.lang.IllegalStateException: Operation was orphaned because of an internal error.") was not empty Error not empty: Some(unexpected failure in q2: org.apache.spark.SparkException: java.lang.IllegalStateException: Operation was orphaned because of an internal error.). (SparkSessionE2ESuite.scala:72)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
[info]   at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
[info]   at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:313)
[info]   at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:312)
[info]   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457)
[info]   at org.apache.spark.sql.connect.SparkSessionE2ESuite.$anonfun$new$1(SparkSessionE2ESuite.scala:72)

If an operation in pending state is interrupted, the interruption is handled in ExecuteHolder#interrupt and ErrorUtils.handleError is called in ErrorUtils#handleError, the operation status transitions to Canceled by calling executeEventsManager.postCanceled.
But postCanceled does not expect transition from pending state so an exception is thrown and propagated to the caller of ExecuteThreadRunner#interrupt.

The reason following all interruptions for the same operation never works correctly is that ExecuteThreadRunner#state has already been changed to interrupted here at the first call of ExecuteThreadRunner#interrupt and following interruptions don't enter this loop and this method always returns false, causing the result of interruption is not correctly recognized.

The solution in this PR includes:

  • Allow transition from pending state to canceled state
  • Protect transition started state and canceled state by lock because these state changes can happens asynchronously.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add new tests.
I also confirmed that SparkSessionE2ESuite mentioned above succeeded.

Was this patch authored or co-authored using generative AI tooling?

No.

@sarutak sarutak changed the title [SPARK-53339][CONNECT] Fix a race condition issue which occurs when an operation in pending state is interrupted [SPARK-53339][CONNECT] Fix an issue which occurs when an operation in pending state is interrupted Aug 20, 2025
@sarutak
Copy link
Member Author

sarutak commented Aug 25, 2025

cc: @peter-toth

@sarutak
Copy link
Member Author

sarutak commented Aug 26, 2025

cc: @dongjoon-hyun too.
This issue is one of the obstacles which blocks SPARK-48139.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @sarutak . I just came back from my vacation today.

*/
def interrupt(): Boolean = {
if (eventsManager.status == ExecuteStatus.Pending) {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. According to the function description, false is already occupied for the status where it was already interrupted.

  2. For the code change, according to the state transition, can we change the status to ExecuteStatus.Canceled status directly from the current ExecuteStatus.Pending because it's not started yet? In this case, we can return true.

object ExecuteStatus {
case object Pending extends ExecuteStatus(0)
case object Started extends ExecuteStatus(1)
case object Analyzed extends ExecuteStatus(2)
case object ReadyForExecution extends ExecuteStatus(3)
case object Finished extends ExecuteStatus(4)
case object Failed extends ExecuteStatus(5)
case object Canceled extends ExecuteStatus(6)
case object Closed extends ExecuteStatus(7)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the code change, according to the state transition, can we change the status to ExecuteStatus.Canceled status directly from the current ExecuteStatus.Pending because it's not started yet? In this case, we can return true.

Actually, that was my first idea to solve this issue. But as I mentioned in the description, I found that didn't work because transitioning from Pending to Canceled causes another issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the function description, false is already occupied for the status where it was already interrupted.

Hmm, if it's OK to ignore interruption to a pending state operation and we need exactly tell already interrupted from interruption failed, how about returning the exact interruption result rather than boolean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you. As long as the code and description are consistent, I'm okay for both. (1) Updating the description by changing the meaning of false and (2) changing the return types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. I'll simply update the description.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 26, 2025

BTW, thank you for the investigation to identify the root cause.

cc @grundprinzip , @hvanhovell , @zhengruifeng to ask if this was the intentional design of state transition or not.

@grundprinzip
Copy link
Contributor

There are two things at play here: the internal state of the operation itself and the notification on the listener bus.

If this patch simply ignores the interrupt on an operation in a pending state, there is a new edge case where we can never cancel this operation if it's stuck in a pending state for whatever reason. Previously, it seems that while the physical query was cancelled, only the observable operation state on the listener bus was not properly handled.

I understand that there is another race condition when the interrupt happens right between the incoming request and the different posting states. I think the better solution is not to ignore the interruption, but we need to figure out how to avoid double-posting of events.

@dongjoon-hyun
Copy link
Member

Given that this is one of the long standing Spark Connect interrupt operation issues which frequently happen in Apache Spark CIs in all live release branches, I'd like to suggest to document this situation as a known issue in Apache Spark 4.0.1 and 3.5.7 independently from this PR. WDYT, @sarutak and @grundprinzip ?

At the same time, we can discuss more in order to figure out the correct steps in Apache Spark 4.1.0 timeframe as @grundprinzip suggested in the above.

@sarutak
Copy link
Member Author

sarutak commented Aug 27, 2025

@dongjoon-hyun
I'm OK to document this as known issue but let me confirm if this issue affects 4.0.1 and 3.5.7 too.

@sarutak
Copy link
Member Author

sarutak commented Aug 28, 2025

@dongjoon-hyun
I confirmed this issue affects 4.0.1 but doesn't affect 3.5.7.
The implementation of interruption is quite different between 3.5 and 4.0+.
In 3.5, transition to pending state on interruption is not handled by ExecuteThreadRunner#interrupt but done in ExecutionThread by calling ErrorUtils.handle, and the thread is spawned after the operation has transitioned from pending state to started state. So interruption doesn't affect an operation in pending state.
On the other hand, in 4.0+, interruption on an operation in pending state and started state but ExecutionThread is not yet started is handled in ExecuteThreadRunner#interrupt.

So, this issue should be documented only for 4.0.1.
Release note for 4.0.1 is the appropriate place to document this issue right?

Also, thank you for sharing related PRs. As far as I know, we have only one issue which blocks SPARK-48139 besides this issue, and I believe that's the last one.
I'll open a PR for the rest one once this issue resolved for 4.0+.

@sarutak
Copy link
Member Author

sarutak commented Sep 3, 2025

@grundprinzip
In the current implementation, postStarted and postCanceled can asynchronously happen so I tried to protect them using a lock, and updated the PR description as well.
What do you think of the latest change?

@sarutak
Copy link
Member Author

sarutak commented Sep 11, 2025

@grundprinzip
This is a cause of the long standing issue so I think it's great to fix for 4.1.
WDYT?

@sarutak
Copy link
Member Author

sarutak commented Oct 14, 2025

@grundprinzip Gentle ping.

@sarutak
Copy link
Member Author

sarutak commented Oct 14, 2025

cc: @hvanhovell

@sarutak
Copy link
Member Author

sarutak commented Oct 22, 2025

@hvanhovell Gentle ping.

@sarutak
Copy link
Member Author

sarutak commented Nov 14, 2025

cc: @HyukjinKwon too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants