-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core #47815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
|
Could we file a JIRA for Python API set too? Just to make sure we don't miss it out |
|
|
@HyukjinKwon @hvanhovell This PR is now ready for review. Could you take a look? Thanks! |
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
|
I feel like what you're doing here is similar with |
Yes exactly. Basically the equivalent of |
| extensions, | ||
| Map.empty) | ||
| Map.empty, | ||
| managedJobTags.asScala.toMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq, does this produce an immutable map (I think it should)? If so, then you don't have to force materialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, it is a mutable concurrent map - and is backed by the same map (so changes propagate back to managedJobTags).
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
Outdated
Show resolved
Hide resolved
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I left a few minor comments. Let me know if you want to address now, or in a follow-up? Two follow-ups here: We need to add this pyspark, and we need to homogenize this with the connect implementation.
| private def nextExecutionId: Long = _nextExecutionId.getAndIncrement | ||
|
|
||
| private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() | ||
| private[sql] val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I like the current implementation/separation but doing this in SQLExecution might have some corner cases, e.g., df.rdd.collect() won't be cancelled. One way is to explicitly document that the cancellations only work with SQL/DataFrame API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, does this work streaming queries too? I am fine with doing it in a followup but would like to make sure Spark Connect and Classic versions behave the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for the comment - I'll add it to the documentation.
I'll check streaming and follow-up in a next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this work streaming queries too?
No it's not. I have to changeto make it work. Will do as a follow-up.spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
Line 313 in 8023504
sparkSessionForStream.withActive {
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
I'll address most comments in this PR. Currently, I am being distracted by something else, but will come back very soon. |
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
|
Merging to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @xupefei , @hvanhovell , @HyukjinKwon , @mridulm .
This PR seems to introduce a new flaky test. I filed a JIRA. Please take a look because this happens in 12 hours. It might be very flaky.
|
Ping once more, @xupefei and @hvanhovell . Could you fix the flakiness or disable it (if you are busy), please? |
|
@xupefei mind taking a look please? |
|
On it. |
|
Trying out a fix at #48622. |
| } finally { | ||
| fpool.shutdownNow() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @xupefei could you add a test case below?? It seems to be not working properly with multithreaded environment
test("Tags are isolated in multithreaded environment") {
// Custom thread pool for multi-threaded testing
val threadPool = Executors.newFixedThreadPool(2)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(threadPool)
val session = SparkSession.builder().master("local").getOrCreate()
@volatile var output1: Set[String] = null
@volatile var output2: Set[String] = null
def tag1(): Unit = {
session.addTag("tag1")
output1 = session.getTags()
}
def tag2(): Unit = {
session.addTag("tag2")
output2 = session.getTags()
}
try {
// Run tasks in separate threads
val future1 = Future {
tag1()
}
val future2 = Future {
tag2()
}
// Wait for threads to complete
ThreadUtils.awaitResult(Future.sequence(Seq(future1, future2)), 1.minute)
// Assert outputs
assert(output1 != null)
assert(output1 == Set("tag1"))
assert(output2 != null)
assert(output2 == Set("tag2"))
} finally {
threadPool.shutdownNow()
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: you might need importing java.util.concurrent.Executors for the test case above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: also there is a corresponding tests from Spark Connect Python client as well:
spark/python/pyspark/sql/tests/connect/test_session.py
Lines 122 to 148 in b61411d
| def test_tags_multithread(self): | |
| output1 = None | |
| output2 = None | |
| def tag1(): | |
| nonlocal output1 | |
| self.spark.addTag("tag1") | |
| output1 = self.spark.getTags() | |
| def tag2(): | |
| nonlocal output2 | |
| self.spark.addTag("tag2") | |
| output2 = self.spark.getTags() | |
| t1 = threading.Thread(target=tag1) | |
| t1.start() | |
| t1.join() | |
| t2 = threading.Thread(target=tag2) | |
| t2.start() | |
| t2.join() | |
| self.assertIsNotNone(output1) | |
| self.assertEquals(output1, {"tag1"}) | |
| self.assertIsNotNone(output2) | |
| self.assertEquals(output2, {"tag2"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR didn't isolate tags on thread- but on session-level.
Two threads can be using the same Spark session and will share the same set of tags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that has a different semantic with the existing Spark Connect API. Is it WIP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR: #48906

What changes were proposed in this pull request?
This PR adds several new tag-related APIs in Connect back to Spark Core. Following the isolation practice in the original Connect API, the newly introduced APIs also support isolation:
interrupt{Tag,All,Operation}can only cancel jobs created by this Spark session.{add,remove}Tagand{get,clear}Tagsonly apply to jobs created by this Spark session.Instead of returning query IDs like in Spark Connect, here in Spark SQL, these methods will return SQL execution root IDs - as "query IDs" are only for Connect.
Why are the changes needed?
To close the API gap between Connect and Core.
Does this PR introduce any user-facing change?
Yes, Core users can use some new APIs.
How was this patch tested?
New test added.
Was this patch authored or co-authored using generative AI tooling?
No.