diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index ce610098156f3..e002bc0117c8b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -19,29 +19,25 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{DriverManager, Statement} +import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.util.{Random, Try} +import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.thrift.ThriftCLIService -import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ + private var serverPort: Int = 0 override def beforeAll(): Unit = { super.beforeAll() - // Chooses a random port between 10000 and 19999 - var listeningPort = 10000 + Random.nextInt(10000) - // Retries up to 3 times with different port numbers if the server fails to start - (1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) => - started.orElse { - listeningPort += 1 - Try(startThriftServer(listeningPort, attempt)) - } + (1 to 3).foldLeft(Try(startThriftServer(0))) { case (started, attempt) => + started.orElse(Try(startThriftServer(attempt))) }.recover { case cause: Throwable => throw cause @@ -59,8 +55,7 @@ trait SharedThriftServer extends SharedSparkSession { protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = { val user = System.getProperty("user.name") - - val serverPort = hiveServer2.getHiveConf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2") val connections = fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") } val statements = connections.map(_.createStatement()) @@ -73,11 +68,19 @@ trait SharedThriftServer extends SharedSparkSession { } } - private def startThriftServer(port: Int, attempt: Int): Unit = { - logInfo(s"Trying to start HiveThriftServer2: port=$port, attempt=$attempt") + private def startThriftServer(attempt: Int): Unit = { + logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt") val sqlContext = spark.newSession().sqlContext - sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port.toString) + // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use. + // It's much more robust than set a random port generated by ourselves ahead + sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) + hiveServer2.getServices.asScala.foreach { + case t: ThriftCLIService if t.getPortNumber != 0 => + serverPort = t.getPortNumber + logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") + case _ => + } // Wait for thrift server to be ready to serve the query, via executing simple query // till the query succeeds. See SPARK-30345 for more details. diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 21b8bf7de75ce..e1ee503b81209 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -76,6 +76,10 @@ public void run() { keyStorePassword, sslVersionBlacklist); } + // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which + // represents any free port, we should set it to the actual one + portNum = serverSocket.getServerSocket().getLocalPort(); + // Server args int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 504e63dbc5e5e..1099a00b67eb7 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -143,6 +143,9 @@ public void run() { // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. // Finally, start the server httpServer.start(); + // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with + // 0 which represents any free port, we should set it to the actual one + portNum = connector.getLocalPort(); String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index fc19c65daaf54..a7de9c0f3d0d2 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -77,6 +77,10 @@ public void run() { keyStorePassword, sslVersionBlacklist); } + // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which + // represents any free port, we should set it to the actual one + portNum = serverSocket.getServerSocket().getLocalPort(); + // Server args int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 08626e7eb146d..73d5f84476af0 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -144,6 +144,9 @@ public void run() { // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. // Finally, start the server httpServer.start(); + // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with + // 0 which represents any free port, we should set it to the actual one + portNum = connector.getLocalPort(); String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";