Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,11 @@ import org.apache.spark.util.Utils

/**
* An util class to start a local spark connect server in a different process for local E2E tests.
* Pre-running the tests, the spark connect artifact needs to be built using e.g. `build/sbt
* package`. It is designed to start the server once but shared by all tests. It is equivalent to
* use the following command to start the connect server via command line:
* It calls the script `connector/connect/bin/spark-connect` to start the server.
*
* {{{
* bin/spark-shell \
* --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar | paste -sd ',' -` \
* --conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin
* }}}
*
* Set system property `spark.test.home` or env variable `SPARK_HOME` if the test is not executed
* from the Spark project top folder. Set system property `spark.debug.sc.jvm.client=true` to
* print the server process output in the console to debug server start stop problems.
* When starting the server the first time from a clean ./build/sbt clean, it may take a few
* minutes to compile the jars needed for the server. The subsequent runs may take around 1 min to
* start the server.
*/
object SparkConnectServerUtils {

Expand All @@ -58,24 +50,15 @@ object SparkConnectServerUtils {

private lazy val sparkConnect: Process = {
debug("Starting the Spark Connect Server...")
val jar = findJar(
"connector/connect/server",
"spark-connect-assembly",
"spark-connect").getCanonicalPath
val builder = Process(
Seq(
"bin/spark-submit",
"--driver-class-path",
jar,
"connector/connect/bin/spark-connect",
"--conf",
s"spark.connect.grpc.binding.port=$port",
"--conf",
"spark.sql.catalog.testcat=org.apache.spark.sql.connect.catalog.InMemoryTableCatalog",
"--conf",
"spark.sql.catalogImplementation=hive",
"--class",
"org.apache.spark.sql.connect.SimpleSparkConnectService",
jar),
"spark.sql.catalogImplementation=hive"),
new File(sparkHome))

val io = new ProcessIO(
Expand Down Expand Up @@ -123,8 +106,8 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
spark = SparkSession.builder().client(SparkConnectClient.builder().port(port).build()).build()

// Retry and wait for the server to start
val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(5) // ~5 min
val sleepInternalMs = TimeUnit.SECONDS.toMillis(5) // every 5s
var success = false
val error = new RuntimeException(s"Failed to start the test server on port $port.")

Expand All @@ -142,7 +125,6 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
case e: Throwable =>
error.addSuppressed(e)
Thread.sleep(sleepInternalMs)
sleepInternalMs *= 2
}
}

Expand Down
11 changes: 0 additions & 11 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -852,17 +852,6 @@ object SparkConnectClient {
)
},

buildTestDeps := {
(LocalProject("assembly") / Compile / Keys.`package`).value
},

// SPARK-42538: Make sure the `${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars` is available for testing.
// At the same time, the build of `connect`, `connect-client-jvm` and `sql` will be triggered by `assembly` build,
// so no additional configuration is required.
test := ((Test / test) dependsOn (buildTestDeps)).value,

testOnly := ((Test / testOnly) dependsOn (buildTestDeps)).evaluated,

(assembly / test) := { },

(assembly / logLevel) := Level.Info,
Expand Down