Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

import io.grpc.StatusRuntimeException
import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalactic.TolerantNumerics
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SPARK_VERSION, SparkException}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
Expand All @@ -45,7 +41,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils

class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester {

Expand Down Expand Up @@ -950,71 +945,6 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
assert(!df.filter(df("_2").endsWith(suffix)).isEmpty)
}
}

test("interrupt all - background queries, foreground interrupt") {
val session = spark
import session.implicits._
implicit val ec = ExecutionContext.global
val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
val q2 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
var q1Interrupted = false
var q2Interrupted = false
var error: Option[String] = None
q1.onComplete {
case Success(_) =>
error = Some("q1 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q1Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q1: " + t.toString)
}
q2.onComplete {
case Success(_) =>
error = Some("q2 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q2Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q2: " + t.toString)
}
// 20 seconds is < 30 seconds the queries should be running,
// because it should be interrupted sooner
eventually(timeout(20.seconds), interval(1.seconds)) {
// keep interrupting every second, until both queries get interrupted.
spark.interruptAll()
assert(error.isEmpty, s"Error not empty: $error")
assert(q1Interrupted)
assert(q2Interrupted)
}
}

test("interrupt all - foreground queries, background interrupt") {
val session = spark
import session.implicits._
implicit val ec = ExecutionContext.global

@volatile var finished = false
val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
spark.interruptAll()
assert(finished)
}
finished
}
val e1 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e1.getMessage.contains("cancelled"), s"Unexpected exception: $e1")
val e2 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e2.getMessage.contains("cancelled"), s"Unexpected exception: $e2")
finished = true
assert(ThreadUtils.awaitResult(interruptor, 10.seconds) == true)
}
}

private[sql] case class MyType(id: Long, a: Double, b: Double)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

import org.scalatest.concurrent.Eventually._

import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.util.ThreadUtils

/**
* Warning(SPARK-43648): Please don't import classes that only exist in
* `spark-connect-client-jvm.jar` into the this class, as it will trigger udf deserialization
* error during Maven testing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am more inclined towards implicit imports caused by the use of df.withResult:

if it happens through implicit imports, then such a rule maybe won't help...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't add this originally because I found it difficult to describe

*/
class SparkSessionCleanRoomE2ESuite extends RemoteSparkSession {

test("interrupt all - background queries, foreground interrupt") {
val session = spark
import session.implicits._
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just because ClientE2ETestSuite imported org.apache.spark.sql.connect.client.SparkResult, the serialization of this closure, which in no way uses SparkResult was referencing it, and causing an issue on the server during deserialization?
Wouldn't this pop up as a serious and hard to understand issue for users of Spark Connect - when they have their client code, they create a simple UDF closure in a class that imports/uses something totally unrelated to the UDF... and then UDF somehow pulls it in causing inexplicable deserialization problems on the server?

Also, this import of SparkResult was only added yesterday in 62338ed#diff-7fa161b193c8792c8c0d8dd4bcae3e683ab8553edafa2ae5c13df42b26f612b0R41, while this test was already failing before, complaining about the lack of SparkResult class during deserialization? So it must have pulled it somehow even before it was imported in the test suite class?

Copy link
Contributor Author

@LuciferYang LuciferYang Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am more inclined towards implicit imports caused by the use of df.withResult:

private[sql] def withResult[E](f: SparkResult[T] => E): E = {
val result = collectResult()
try f(result)
finally {
result.close()
}

I have tried deleting cases related to withResult from ClientE2ETestSuite , and these two cases can also be passed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this pop up as a serious and hard to understand issue for users of Spark Connect - when they have their client code, they create a simple UDF closure in a class that imports/uses something totally unrelated to the UDF... and then UDF somehow pulls it in causing inexplicable deserialization problems on the server?

@vicennial do we have a way to clean up unused Imports during the udf serialization process? I haven't investigated this yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert on this, but it looks to me that Spark has a ClosureCleaner that looks like something used for that purpose.
I think ClosureCleaner solves a similar problem between Spark driver and executors - makes irrelevant code not be tied up into closures that are send from the driver to executors for execution?
Maybe just doing ClosureCleaner.clean on UdfPacket.function will fix this?

Copy link
Contributor Author

@LuciferYang LuciferYang Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I've already tried it before give this pr, at least the existing ClosureCleaner cannot achieve the goal

Copy link
Contributor Author

@LuciferYang LuciferYang Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In non-connect mode, we should add the necessary jars when submitting a Spark App, I thinks similar operations should also be performed in connect mode, so in https://github.com/apache/spark/pull/41483/files, using spark-connect-client-jvm.jar as an Artifact( if it exists )can also solve this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rednaxelafx could I ask for your help?
You're the last person who wrote a lot of code to ClosureCleaner (in 2020).
Do you understand better what might be going on, and why the serialization of this closure in UdfPacket.function would pull in unrelated stuff that ClosureCleaner wouldn't clean?

Copy link
Contributor Author

@LuciferYang LuciferYang Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, in traditional mode, all dependencies are in the classpath, so I think there is no need to cleanup imports. Perhaps my understanding is incorrect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked that in this case ClosureCleaner doesn't do anything, because in

      // only need to clean when there is an enclosing "this" captured by the closure, and it
      // should be something cleanable, i.e. a Scala REPL line object
      val needsCleaning = isClosureDeclaredInScalaRepl &&
        outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == capturingClassName

      logError(s"needsCleaning: $needsCleaning")
      logError(s"outerThisOpt: $outerThisOpt")
      logError(s"capturingClassName: $capturingClassName")

there is

23/06/07 14:01:41.888 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: Cleaning indylambda closure: $anonfun$mapFuncToMapPartitionsAdaptor$1
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: needsCleaning: false
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: outerThisOpt: Some(org.apache.spark.sql.ClientE2ETestSuite$$Lambda$1137/224191215@39257c6f)
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner: capturingClassName: org.apache.spark.sql.connect.common.UdfUtils$
23/06/07 14:01:41.890 pool-1-thread-1-ScalaTest-running-ClientE2ETestSuite ERROR ClosureCleaner:  +++ indylambda closure ($anonfun$mapFuncToMapPartitionsAdaptor$1) is now cleaned +++

but I don't really understand it.
If I just force needsCleaning=true, then my JVM just crashes :-).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crash or threw NoSuchFieldException or throw other exceptions? In my impression, ClosureCleaner focuses on the repl, and there are some reflection calls base on convention rules in the code

}
val q2 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
var q1Interrupted = false
var q2Interrupted = false
var error: Option[String] = None
q1.onComplete {
case Success(_) =>
error = Some("q1 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q1Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q1: " + t.toString)
}
q2.onComplete {
case Success(_) =>
error = Some("q2 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q2Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q2: " + t.toString)
}
// 20 seconds is < 30 seconds the queries should be running,
// because it should be interrupted sooner
eventually(timeout(20.seconds), interval(1.seconds)) {
// keep interrupting every second, until both queries get interrupted.
spark.interruptAll()
assert(error.isEmpty, s"Error not empty: $error")
assert(q1Interrupted)
assert(q2Interrupted)
}
}

test("interrupt all - foreground queries, background interrupt") {
val session = spark
import session.implicits._
implicit val ec: ExecutionContextExecutor = ExecutionContext.global

@volatile var finished = false
val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
spark.interruptAll()
assert(finished)
}
finished
}
val e1 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e1.getMessage.contains("cancelled"), s"Unexpected exception: $e1")
val e2 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e2.getMessage.contains("cancelled"), s"Unexpected exception: $e2")
finished = true
assert(ThreadUtils.awaitResult(interruptor, 10.seconds))
}
}