1717
1818package org .apache .spark .sql .hive .thriftserver
1919
20- import scala .collection .mutable .ArrayBuffer
21- import scala .concurrent .ExecutionContext .Implicits .global
22- import scala .concurrent .duration ._
23- import scala .concurrent .{Await , Future , Promise }
24- import scala .sys .process .{Process , ProcessLogger }
25-
2620import java .io .File
2721import java .net .ServerSocket
2822import java .sql .{DriverManager , Statement }
2923import java .util .concurrent .TimeoutException
3024
25+ import scala .collection .mutable .ArrayBuffer
26+ import scala .concurrent .duration ._
27+ import scala .concurrent .{Await , Promise }
28+ import scala .sys .process .{Process , ProcessLogger }
29+ import scala .util .Try
30+
3131import org .apache .hadoop .hive .conf .HiveConf .ConfVars
3232import org .apache .hive .jdbc .HiveDriver
3333import org .scalatest .FunSuite
@@ -41,25 +41,28 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
4141class HiveThriftServer2Suite extends FunSuite with Logging {
4242 Class .forName(classOf [HiveDriver ].getCanonicalName)
4343
44- private val listeningHost = " localhost"
45- private val listeningPort = {
46- // Let the system to choose a random available port to avoid collision with other parallel
47- // builds.
48- val socket = new ServerSocket (0 )
49- val port = socket.getLocalPort
50- socket.close()
51- port
52- }
53-
54- private val warehousePath = getTempFilePath(" warehouse" )
55- private val metastorePath = getTempFilePath(" metastore" )
56- private val metastoreJdbcUri = s " jdbc:derby:;databaseName= $metastorePath;create=true "
57-
58- def startThriftServerWithin (timeout : FiniteDuration = 30 .seconds)(f : Statement => Unit ) {
59- val serverScript = " ../../sbin/start-thriftserver.sh" .split(" /" ).mkString(File .separator)
44+ val verbose = Option (System .getenv(" SPARK_SQL_TEST_VERBOSE" )).isDefined
45+
46+ def startThriftServerWithin (timeout : FiniteDuration = 10 .seconds)(f : Statement => Unit ) {
47+ Thread .sleep(5000 )
48+
49+ val startScript = " ../../sbin/start-thriftserver.sh" .split(" /" ).mkString(File .separator)
50+ val stopScript = " ../../sbin/stop-thriftserver.sh" .split(" /" ).mkString(File .separator)
51+ val warehousePath = getTempFilePath(" warehouse" )
52+ val metastorePath = getTempFilePath(" metastore" )
53+ val metastoreJdbcUri = s " jdbc:derby:;databaseName= $metastorePath;create=true "
54+ val listeningHost = " localhost"
55+ val listeningPort = {
56+ // Let the system to choose a random available port to avoid collision with other parallel
57+ // builds.
58+ val socket = new ServerSocket (0 )
59+ val port = socket.getLocalPort
60+ socket.close()
61+ port
62+ }
6063
6164 val command =
62- s """ $serverScript
65+ s """ $startScript
6366 | --master local
6467 | --hiveconf hive.root.logger=INFO,console
6568 | --hiveconf ${ConfVars .METASTORECONNECTURLKEY }= $metastoreJdbcUri
@@ -68,29 +71,41 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
6871 | --hiveconf ${ConfVars .HIVE_SERVER2_THRIFT_PORT }= $listeningPort
6972 """ .stripMargin.split(" \\ s+" ).toSeq
7073
71- val serverStarted = Promise [Unit ]()
74+ val serverRunning = Promise [Unit ]()
7275 val buffer = new ArrayBuffer [String ]()
73-
74- def captureOutput (source : String )(line : String ) {
75- buffer += s " $source> $line"
76+ val LOGGING_MARK =
77+ s " starting ${HiveThriftServer2 .getClass.getCanonicalName.stripSuffix(" $" )}, logging to "
78+ var logTailingProcess : Process = null
79+ var logFilePath : String = null
80+
81+ def captureLogOutput (line : String ): Unit = {
82+ logInfo(s " server log | $line" )
83+ buffer += line
7684 if (line.contains(" ThriftBinaryCLIService listening on" )) {
77- serverStarted .success(())
85+ serverRunning .success(())
7886 }
7987 }
8088
81- val process = Process (command).run(
82- ProcessLogger (captureOutput(" stdout" ), captureOutput(" stderr" )))
83-
84- Future {
85- val exitValue = process.exitValue()
86- logInfo(s " Spark SQL Thrift server process exit value: $exitValue" )
89+ def captureThriftServerOutput (source : String )(line : String ): Unit = {
90+ logInfo(s " server $source | $line" )
91+ if (line.startsWith(LOGGING_MARK )) {
92+ logFilePath = line.drop(LOGGING_MARK .length).trim
93+ // Ensure that the log file is created so that the `tail' command won't fail
94+ Try (new File (logFilePath).createNewFile())
95+ logTailingProcess = Process (s " /usr/bin/env tail -f $logFilePath" )
96+ .run(ProcessLogger (captureLogOutput, _ => ()))
97+ }
8798 }
8899
100+ Process (command).run(ProcessLogger (
101+ captureThriftServerOutput(" stdout" ),
102+ captureThriftServerOutput(" stderr" )))
103+
89104 val jdbcUri = s " jdbc:hive2:// $listeningHost: $listeningPort/ "
90105 val user = System .getProperty(" user.name" )
91106
92107 try {
93- Await .result(serverStarted .future, timeout)
108+ Await .result(serverRunning .future, timeout)
94109
95110 val connection = DriverManager .getConnection(jdbcUri, user, " " )
96111 val statement = connection.createStatement()
@@ -122,10 +137,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
122137 |End HiveThriftServer2Suite failure output
123138 |=========================================
124139 """ .stripMargin, cause)
140+ throw cause
125141 } finally {
126142 warehousePath.delete()
127143 metastorePath.delete()
128- process.destroy()
144+ Process (stopScript).run().exitValue()
145+ // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
146+ Thread .sleep(3 .seconds.toMillis)
147+ Option (logTailingProcess).map(_.destroy())
148+ Option (logFilePath).map(new File (_).delete())
129149 }
130150 }
131151
0 commit comments