Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
}

var next = 0
val foundMasterAndApplicationIdMessage = Promise.apply[Unit]()
val foundAllExpectedAnswers = Promise.apply[Unit]()
val buffer = new ArrayBuffer[String]()
val lock = new Object
Expand All @@ -143,6 +144,10 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
log.info(newLine)
buffer += newLine

if (line.startsWith("Spark master: ") && line.contains("Application Id: ")) {
foundMasterAndApplicationIdMessage.trySuccess(())
}

// If we haven't found all expected answers and another expected answer comes up...
if (next < expectedAnswers.size && line.contains(expectedAnswers(next))) {
log.info(s"$source> found expected output line $next: '${expectedAnswers(next)}'")
Expand Down Expand Up @@ -172,7 +177,18 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()

try {
ThreadUtils.awaitResult(foundAllExpectedAnswers.future, timeout)
val timeoutForQuery = if (!extraArgs.contains("-e")) {
// Wait for for cli driver to boot, up to two minutes
ThreadUtils.awaitResult(foundMasterAndApplicationIdMessage.future, 2.minutes)
log.info("Cli driver is booted. Waiting for expected answers.")
// Given timeout is applied after the cli driver is ready
timeout
} else {
// There's no boot message if -e option is provided, just extend timeout long enough
// so that the bootup duration is counted on the timeout
2.minutes + timeout
}
ThreadUtils.awaitResult(foundAllExpectedAnswers.future, timeoutForQuery)
log.info("Found all expected output.")
} catch { case cause: Throwable =>
val message =
Expand All @@ -194,7 +210,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
} finally {
if (!process.waitFor(1, MINUTES)) {
try {
fail("spark-sql did not exit gracefully.")
log.warn("spark-sql did not exit gracefully.")
} finally {
process.destroy()
}
Expand Down Expand Up @@ -447,7 +463,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
val jarFile = new File("../../sql/hive/src/test/resources/SPARK-21101-1.0.jar").getCanonicalPath
val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath
runCliWithin(
1.minute,
2.minutes,
Seq("--jars", s"$jarFile", "--conf",
s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
"CREATE TEMPORARY FUNCTION testjar AS" +
Expand Down