Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

import io.grpc.StatusRuntimeException
import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalactic.TolerantNumerics
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SPARK_VERSION, SparkException}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
Expand All @@ -45,7 +41,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils

class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester {

Expand Down Expand Up @@ -952,71 +947,6 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
}
}
}

test("interrupt all - background queries, foreground interrupt") {
val session = spark
import session.implicits._
implicit val ec = ExecutionContext.global
val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
val q2 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
var q1Interrupted = false
var q2Interrupted = false
var error: Option[String] = None
q1.onComplete {
case Success(_) =>
error = Some("q1 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q1Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q1: " + t.toString)
}
q2.onComplete {
case Success(_) =>
error = Some("q2 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q2Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q2: " + t.toString)
}
// 20 seconds is < 30 seconds the queries should be running,
// because it should be interrupted sooner
eventually(timeout(20.seconds), interval(1.seconds)) {
// keep interrupting every second, until both queries get interrupted.
spark.interruptAll()
assert(error.isEmpty, s"Error not empty: $error")
assert(q1Interrupted)
assert(q2Interrupted)
}
}

test("interrupt all - foreground queries, background interrupt") {
val session = spark
import session.implicits._
implicit val ec = ExecutionContext.global

@volatile var finished = false
val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
spark.interruptAll()
assert(finished)
}
finished
}
val e1 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e1.getMessage.contains("cancelled"), s"Unexpected exception: $e1")
val e2 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e2.getMessage.contains("cancelled"), s"Unexpected exception: $e2")
finished = true
assert(ThreadUtils.awaitResult(interruptor, 10.seconds) == true)
}
}

private[sql] case class MyType(id: Long, a: Double, b: Double)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

import org.scalatest.concurrent.Eventually._

import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.util.ThreadUtils

/**
* NOTE: Do not import classes that only exist in `spark-connect-client-jvm.jar` into the this
* class, whether explicit or implicit, as it will trigger a UDF deserialization error during
* Maven build/test.
*/
class SparkSessionE2ESuite extends RemoteSparkSession {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the investigation and suggestion, @LuciferYang .

If this is supposed to be a clean room unlike ClientE2ETestSuite. Could you add some test class description officially here? For example, something like Do not import 'org.apache.spark.sql.connect.client.SparkResult? Or, Do not import any class from spark-connect-client-jvm.jar?

import some classes from spark-connect-client-jvm.jar, such as org.apache.spark.sql.connect.client.SparkResult

Without an explicit written warning, this test suite could be broken again when someone adds a new test case. It will be a head-ache of re-analysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun ~

b9e7e4a add waring comments to SparkSessionE2ESuite.

At the same time, I am preparing a pr use to add Maven test of connect related modules on GitHub Action(#41253), as Maven testing often failures before. If this pr can be merged, we can start testing the client module first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: I wonder if we can also add something to the class name to make it more indicative that it's a "clean room" suite? Maybe something like SparkSessionCleanRoomE2ESuite or some variation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry but let's not use CleanRoom keyword here, @vicennial . :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert


test("interrupt all - background queries, foreground interrupt") {
val session = spark
import session.implicits._
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
val q2 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
var q1Interrupted = false
var q2Interrupted = false
var error: Option[String] = None
q1.onComplete {
case Success(_) =>
error = Some("q1 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q1Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q1: " + t.toString)
}
q2.onComplete {
case Success(_) =>
error = Some("q2 shouldn't have finished!")
case Failure(t) if t.getMessage.contains("cancelled") =>
q2Interrupted = true
case Failure(t) =>
error = Some("unexpected failure in q2: " + t.toString)
}
// 20 seconds is < 30 seconds the queries should be running,
// because it should be interrupted sooner
eventually(timeout(20.seconds), interval(1.seconds)) {
// keep interrupting every second, until both queries get interrupted.
spark.interruptAll()
assert(error.isEmpty, s"Error not empty: $error")
assert(q1Interrupted)
assert(q2Interrupted)
}
}

test("interrupt all - foreground queries, background interrupt") {
val session = spark
import session.implicits._
implicit val ec: ExecutionContextExecutor = ExecutionContext.global

@volatile var finished = false
val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
spark.interruptAll()
assert(finished)
}
finished
}
val e1 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e1.getMessage.contains("cancelled"), s"Unexpected exception: $e1")
val e2 = intercept[io.grpc.StatusRuntimeException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e2.getMessage.contains("cancelled"), s"Unexpected exception: $e2")
finished = true
assert(ThreadUtils.awaitResult(interruptor, 10.seconds))
}
}