-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43648][CONNECT][TESTS] Move interrupt all related test to a new test file to pass Maven test
#41487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
interrupt all related test to a new test fileinterrupt all related test to a new test file to pass Maven test
https://github.com/apache/spark/pull/41483/files has implemented the fix way of using |
| import org.apache.spark.sql.connect.client.util.RemoteSparkSession | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| class SparkSessionE2ESuite extends RemoteSparkSession { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the investigation and suggestion, @LuciferYang .
If this is supposed to be a clean room unlike ClientE2ETestSuite. Could you add some test class description officially here? For example, something like Do not import 'org.apache.spark.sql.connect.client.SparkResult? Or, Do not import any class from spark-connect-client-jvm.jar?
import some classes from spark-connect-client-jvm.jar, such as org.apache.spark.sql.connect.client.SparkResult
Without an explicit written warning, this test suite could be broken again when someone adds a new test case. It will be a head-ache of re-analysis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun ~
b9e7e4a add waring comments to SparkSessionE2ESuite.
At the same time, I am preparing a pr use to add Maven test of connect related modules on GitHub Action(#41253), as Maven testing often failures before. If this pr can be merged, we can start testing the client module first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking: I wonder if we can also add something to the class name to make it more indicative that it's a "clean room" suite? Maybe something like SparkSessionCleanRoomE2ESuite or some variation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry but let's not use CleanRoom keyword here, @vicennial . :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert
vicennial
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the investigation and solution proposals @LuciferYang!
LGTM
| import org.apache.spark.sql.connect.client.util.RemoteSparkSession | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| class SparkSessionE2ESuite extends RemoteSparkSession { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking: I wonder if we can also add something to the class name to make it more indicative that it's a "clean room" suite? Maybe something like SparkSessionCleanRoomE2ESuite or some variation.
|
@vicennial ca3e6b1 rename |
| import session.implicits._ | ||
| implicit val ec: ExecutionContextExecutor = ExecutionContext.global | ||
| val q1 = Future { | ||
| spark.range(10).map(n => { Thread.sleep(30000); n }).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just because ClientE2ETestSuite imported org.apache.spark.sql.connect.client.SparkResult, the serialization of this closure, which in no way uses SparkResult was referencing it, and causing an issue on the server during deserialization?
Wouldn't this pop up as a serious and hard to understand issue for users of Spark Connect - when they have their client code, they create a simple UDF closure in a class that imports/uses something totally unrelated to the UDF... and then UDF somehow pulls it in causing inexplicable deserialization problems on the server?
Also, this import of SparkResult was only added yesterday in 62338ed#diff-7fa161b193c8792c8c0d8dd4bcae3e683ab8553edafa2ae5c13df42b26f612b0R41, while this test was already failing before, complaining about the lack of SparkResult class during deserialization? So it must have pulled it somehow even before it was imported in the test suite class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am more inclined towards implicit imports caused by the use of df.withResult:
spark/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
Lines 3181 to 3186 in 64855fa
| private[sql] def withResult[E](f: SparkResult[T] => E): E = { | |
| val result = collectResult() | |
| try f(result) | |
| finally { | |
| result.close() | |
| } |
I have tried deleting cases related to withResult from ClientE2ETestSuite , and these two cases can also be passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this pop up as a serious and hard to understand issue for users of Spark Connect - when they have their client code, they create a simple UDF closure in a class that imports/uses something totally unrelated to the UDF... and then UDF somehow pulls it in causing inexplicable deserialization problems on the server?
@vicennial do we have a way to clean up unused Imports during the udf serialization process? I haven't investigated this yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not an expert on this, but it looks to me that Spark has a ClosureCleaner that looks like something used for that purpose.
I think ClosureCleaner solves a similar problem between Spark driver and executors - makes irrelevant code not be tied up into closures that are send from the driver to executors for execution?
Maybe just doing ClosureCleaner.clean on UdfPacket.function will fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I've already tried it before give this pr, at least the existing ClosureCleaner cannot achieve the goal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In non-connect mode, we should add the necessary jars when submitting a Spark App, I thinks similar operations should also be performed in connect mode, so in https://github.com/apache/spark/pull/41483/files, using spark-connect-client-jvm.jar as an Artifact( if it exists )can also solve this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rednaxelafx could I ask for your help?
You're the last person who wrote a lot of code to ClosureCleaner (in 2020).
Do you understand better what might be going on, and why the serialization of this closure in UdfPacket.function would pull in unrelated stuff that ClosureCleaner wouldn't clean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, in traditional mode, all dependencies are in the classpath, so I think there is no need to cleanup imports. Perhaps my understanding is incorrect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked that in this case ClosureCleaner doesn't do anything, because in
// only need to clean when there is an enclosing "this" captured by the closure, and it
// should be something cleanable, i.e. a Scala REPL line object
val needsCleaning = isClosureDeclaredInScalaRepl &&
outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == capturingClassName
logError(s"needsCleaning: $needsCleaning")
logError(s"outerThisOpt: $outerThisOpt")
logError(s"capturingClassName: $capturingClassName")
there is
23/06/07 14:01:41.888 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: Cleaning indylambda closure: $anonfun$mapFuncToMapPartitionsAdaptor$1
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: needsCleaning: false
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: outerThisOpt: Some(org.apache.spark.sql.ClientE2ETestSuite$$Lambda$1137/224191215@39257c6f)
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: capturingClassName: org.apache.spark.sql.connect.common.UdfUtils$
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: +++ indylambda closure ($anonfun$mapFuncToMapPartitionsAdaptor$1) is now cleaned +++
but I don't really understand it.
If I just force needsCleaning=true, then my JVM just crashes :-).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Crash or threw NoSuchFieldException or throw other exceptions? In my impression, ClosureCleaner focuses on the repl, and there are some reflection calls base on convention rules in the code
| /** | ||
| * Warning: SPARK-43648 moves two test cases related to `interrupt all` from `ClientE2ETestSuite` | ||
| * to the current test class to avoid the maven test issue of missing | ||
| * `org.apache.spark.sql.connect.client.SparkResult` during udf deserialization in server side. So | ||
| * please don't import classes that only exist in `spark-connect-client-jvm.jar` into the this | ||
| * class, as it will trigger similar maven test failures again. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for readers in the future, this exact history is irrelevant.
The comment should just say that this suite should not import any classes from spark-connect-client-jvm.jar because it could trigger serialization error.
It's enough to refer to SPARK-43648 for anyone wanting to read more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| * Warning(SPARK-43648): Please don't import classes that only exist in | ||
| * `spark-connect-client-jvm.jar` into the this class, as it will trigger udf deserialization | ||
| * error during Maven testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am more inclined towards implicit imports caused by the use of df.withResult:
if it happens through implicit imports, then such a rule maybe won't help...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I didn't add this originally because I found it difficult to describe
|
@dongjoon-hyun @vicennial @juliuszsompolski Do you have any other suggestions for this pr? Can we merge this one first? This can make Maven test to pass first, and if there is a better solution, we can add followup |
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
Outdated
Show resolved
Hide resolved
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine.
…ql/SparkSessionE2ESuite.scala Co-authored-by: Hyukjin Kwon <[email protected]>
|
Thanks @HyukjinKwon ~ |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
|
LGTM, thanks! |
|
Merged to master. |
…new test file to pass Maven test ### What changes were proposed in this pull request? SPARK-43331 added two new test cases in `ClientE2ETestSuite`: - interrupt all - background queries, foreground interrupt - interrupt all - foreground queries, background interrupt The udf in these two new test cases does not only require `spark-connect-client-jvm-tests.jar` when deserializing on the server side due to `ClientE2ETestSuite` implicitly import some classes from `spark-connect-client-jvm.jar`, such as `org.apache.spark.sql.connect.client.SparkResult`, so when run the maven test as follows: ``` build/mvn clean install -DskipTests build/mvn test -pl connector/connect/client/jvm -Dtest=none -DwildcardSuites=org.apache.spark.sql.ClientE2ETestSuite ``` There will be the test failed in server side as follows: ``` 23/05/22 15:44:11 ERROR SparkConnectService: Error during: execute. UserId: . SessionId: 0f4013ca-3af9-443b-a0e5-e339a827e0cf. java.lang.NoClassDefFoundError: org/apache/spark/sql/connect/client/SparkResult at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.getDeclaredMethod(Class.java:2128) at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1643) at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79) at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520) at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2005) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1815) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1640) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.util.Utils$.deserialize(Utils.scala:148) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1353) at org.apache.spark.sql.connect.planner.SparkConnectPlanner$TypedScalaUdf$.apply(SparkConnectPlanner.scala:761) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformTypedMapPartitions(SparkConnectPlanner.scala:531) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformMapPartitions(SparkConnectPlanner.scala:495) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:143) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:100) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$2(SparkConnectStreamHandler.scala:87) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:825) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1(SparkConnectStreamHandler.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:209) at org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager$.withArtifactClassLoader(SparkConnectArtifactManager.scala:178) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48) at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:166) at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:611) at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352) at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.connect.client.SparkResult at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ... 56 more ``` We can choose to add `spark-connect-client-jvm.jar` as `Artifact` to the server during testing, but this is relatively complex and `spark-connect-client-jvm.jar` does not always exist during Maven testing. So this PR chooses to move these two test cases to a separate file to pass Maven test. sbt test is not failed by this because the classpath of sbt submitted `SimpleSparkConnectService` will include connect client module, this is different from Maven, so GitHub Actions can passed. ### Why are the changes needed? Make `interrupt all - background queries, foreground interrupt` and `interrupt all - foreground queries, background interrupt` can test pass using maven. ### Does this PR introduce _any_ user-facing change? No, just for test ### How was this patch tested? - Pass Github Actions - Manual check ``` build/mvn clean install -DskipTests build/mvn test -pl connector/connect/client/jvm ``` The two test cases in the PR description can be tested successfully. Closes apache#41487 from LuciferYang/SPARK-43648-2. Lead-authored-by: yangjie01 <[email protected]> Co-authored-by: YangJie <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
|
This is fine as a temporary workaround, but it is actually showing a problem that can heavily impact usability. We are apparently capturing classes that are unrelated to the code at hand. @LuciferYang is anyone looking into why this is causing problems with Maven? I am not saying that you should, I just want to make sure we are not duplicating efforts. |
|
I reopened and edited the description of https://issues.apache.org/jira/browse/SPARK-43744 which I originally raised for this issue and then closed as duplicate of SPARK-43648, which can be used to track this. |
When we do maven test, the content of the connect client jvm module was not in the classpath of Previously, we only placed Lines 130 to 140 in 7107742
|
|
@hvanhovell At present, I have not continued to investigate because I think this behavior is reasonable, although it may be difficult to use(maybe I am wrong). At the same time, within my knowledge, I have not found an effective way to clean up these |
It's pulling this piece of code that's not related to the UDF closure into the serialization of the closure, and causes a failure on the server because this code is not on the server. |
|
I think what you said is reasonable, and I also hope there can be a way to remove unnecessary parts from the function(Besides unused imports, I think we should also clean up the outer class like in addition, do you think we should also remove Lines 130 to 140 in 7107742
?This is added to avoid similar issues. |
What changes were proposed in this pull request?
SPARK-43331 added two new test cases in
ClientE2ETestSuite:The udf in these two new test cases does not only require
spark-connect-client-jvm-tests.jarwhen deserializing on the server side due toClientE2ETestSuiteimplicitly import some classes fromspark-connect-client-jvm.jar, such asorg.apache.spark.sql.connect.client.SparkResult, so when run the maven test as follows:There will be the test failed in server side as follows:
We can choose to add
spark-connect-client-jvm.jarasArtifactto the server during testing, but this is relatively complex andspark-connect-client-jvm.jardoes not always exist during Maven testing. So this PR chooses to move these two test cases to a separate file to pass Maven test.sbt test is not failed by this because the classpath of sbt submitted
SimpleSparkConnectServicewill include connect client module, this is different from Maven, so GitHub Actions can passed.Why are the changes needed?
Make
interrupt all - background queries, foreground interruptandinterrupt all - foreground queries, background interruptcan test pass using maven.Does this PR introduce any user-facing change?
No, just for test
How was this patch tested?
The two test cases in the PR description can be tested successfully.