Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,25 @@ package org.apache.spark.sql.hive.thriftserver

import java.sql.{DriverManager, Statement}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.{Random, Try}
import scala.util.Try

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.ThriftCLIService

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

trait SharedThriftServer extends SharedSparkSession {

private var hiveServer2: HiveThriftServer2 = _
private var serverPort: Int = 0

override def beforeAll(): Unit = {
super.beforeAll()
// Chooses a random port between 10000 and 19999
var listeningPort = 10000 + Random.nextInt(10000)

// Retries up to 3 times with different port numbers if the server fails to start
(1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
started.orElse {
listeningPort += 1
Try(startThriftServer(listeningPort, attempt))
}
(1 to 3).foldLeft(Try(startThriftServer(0))) { case (started, attempt) =>
started.orElse(Try(startThriftServer(attempt)))
}.recover {
case cause: Throwable =>
throw cause
Expand All @@ -59,8 +55,7 @@ trait SharedThriftServer extends SharedSparkSession {

protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")

val serverPort = hiveServer2.getHiveConf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
val connections =
fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
val statements = connections.map(_.createStatement())
Expand All @@ -73,11 +68,19 @@ trait SharedThriftServer extends SharedSparkSession {
}
}

private def startThriftServer(port: Int, attempt: Int): Unit = {
logInfo(s"Trying to start HiveThriftServer2: port=$port, attempt=$attempt")
private def startThriftServer(attempt: Int): Unit = {
logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
val sqlContext = spark.newSession().sqlContext
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port.toString)
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
// It's much more robust than set a random port generated by ourselves ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
hiveServer2.getServices.asScala.foreach {
case t: ThriftCLIService if t.getPortNumber != 0 =>
serverPort = t.getPortNumber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveThriftServer2.startWithContext -> HiveThriftServer2.start() -> CompositeService.start() -> Service.start()...
ThriftCLIService.start() creates new Thread(this).start();
So ThriftBinaryCLIService / ThriftHttpCLIService run() is launched in a background thread. I think this can race before the actual port gets assigned?

Copy link
Member Author

@yaooqinn yaooqinn Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run the tests with http mode locally with #28738 and

 build/sbt "hive-thriftserver/test-only *HiveThriftHttpServerSuite" -Phive -Phive-thriftserver -Dsbt.override.build.repos=true -Phive-2.3

I check the logs in target/unit-tests.log

20/06/05 06:35:55.237 pool-1-thread-1 INFO AbstractService: Service:ThriftHttpCLIService is started.
20/06/05 06:35:55.237 pool-1-thread-1 INFO AbstractService: Service:HiveServer2 is started.
20/06/05 06:35:55.326 Thread-17 INFO Server: jetty-9.4.18.v20190429; built: 2019-04-29T20:42:08.989Z; git: e1bc35120a6617ee3df052294e433f3a25ce7097; jvm 1.8.0_251-b08
20/06/05 06:35:55.358 Thread-17 INFO session: DefaultSessionIdManager workerName=node0
20/06/05 06:35:55.358 Thread-17 INFO session: No SessionScavenger set, using defaults
20/06/05 06:35:55.359 Thread-17 INFO session: node0 Scavenging every 660000ms
20/06/05 06:35:55.366 Thread-17 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@23f4b16a{/,null,AVAILABLE}
20/06/05 06:35:55.438 Thread-17 INFO AbstractConnector: Started ServerConnector@1b76f67f{HTTP/1.1,[http/1.1]}{0.0.0.0:55923}
20/06/05 06:35:55.438 Thread-17 INFO Server: Started @7043ms
20/06/05 06:35:55.438 Thread-17 INFO ThriftCLIService: Started ThriftHttpCLIService in http mode on port 55923 path=/cliservice/* with 5...500 worker threads
20/06/05 06:35:55.442 pool-1-thread-1 INFO Utils: Supplied authorities: localhost:0
20/06/05 06:35:55.442 pool-1-thread-1 WARN Utils: ***** JDBC param deprecation *****
20/06/05 06:35:55.442 pool-1-thread-1 WARN Utils: The use of hive.server2.transport.mode is deprecated.
20/06/05 06:35:55.442 pool-1-thread-1 WARN Utils: Please use transportMode like so: jdbc:hive2://<host>:<port>/dbName;transportMode=<transport_mode_value>
20/06/05 06:35:55.442 pool-1-thread-1 WARN Utils: ***** JDBC param deprecation *****
20/06/05 06:35:55.442 pool-1-thread-1 WARN Utils: The use of hive.server2.thrift.http.path is deprecated.
20/06/05 06:35:55.442 pool-1-thread-1 WARN Utils: Please use httpPath like so: jdbc:hive2://<host>:<port>/dbName;httpPath=<http_path_value>
20/06/05 06:35:55.442 pool-1-thread-1 INFO Utils: Resolved authority: localhost:0
20/06/05 06:35:55.663 pool-1-thread-1 ERROR HiveConnection: Error opening session
org.apache.thrift.transport.TTransportException: org.apache.http.conn.HttpHostConnectException: Connect to localhost:80 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
	at org.apache.thrift.transport.THttpClient.flushUsingHttpClient(THttpClient.java:297)
	at org.apache.thrift.transport.THttpClient.flush(THttpClient.java:316)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:73)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62)
	at org.apache.hive.service.rpc.thrift.TCLIService$Client.send_OpenSession(TCLIService.java:162)
	at org.apache.hive.service.rpc.thrift.TCLIService$Client.OpenSession(TCLIService.java:154)
	at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:680)
	at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:200)
	at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107)
	at java.sql.DriverManager.getConnection(DriverManager.java:664)
	at java.sql.DriverManager.getConnection(DriverManager.java:247)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$withMultipleConnectionJdbcStatement$1(SharedThriftServer.scala:106)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.withMultipleConnectionJdbcStatement(SharedThriftServer.scala:106)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.withMultipleConnectionJdbcStatement$(SharedThriftServer.scala:105)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.withMultipleConnectionJdbcStatement(HiveThriftServer2Suites.scala:280)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.withJdbcStatement(SharedThriftServer.scala:141)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.withJdbcStatement$(SharedThriftServer.scala:140)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.withJdbcStatement(HiveThriftServer2Suites.scala:280)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$startThriftServer$4(SharedThriftServer.scala:166)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.concurrent.Eventually.makeAValiantAttempt$1(Eventually.scala:395)
	at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:409)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:439)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:391)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.eventually(HiveThriftServer2Suites.scala:280)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:308)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:307)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.eventually(HiveThriftServer2Suites.scala:280)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.startThriftServer(SharedThriftServer.scala:165)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$beforeAll$1(SharedThriftServer.scala:71)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.beforeAll(SharedThriftServer.scala:71)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.beforeAll$(SharedThriftServer.scala:68)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftHttpServerSuite.beforeAll(HiveThriftServer2Suites.scala:280)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:59)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:317)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:510)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to localhost:80 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159)
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:373)
	at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:394)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
	at org.apache.http.impl.execchain.ServiceUnavailableRetryExec.execute(ServiceUnavailableRetryExec.java:85)
	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:118)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at org.apache.thrift.transport.THttpClient.flushUsingHttpClient(THttpClient.java:251)
	... 53 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:606)
	at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
	... 64 more
20/06/05 06:35:55.665 pool-1-thread-1 WARN HiveConnection: Failed to connect to localhost:0

The starting phase looks ok to me and the port logged assigned Started ThriftHttpCLIService in http mode on port 55923 path=/cliservice/* with 5...500

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log line Started ThriftHttpCLIService in http mode on port 55923 path=/cliservice/* with 5...500 is logged in a background thread that is launched to start the server, so

        serverPort = t.getPortNumber

may be executed before it's assigned and be still 0 at this point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see

logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
case _ =>
}

// Wait for thrift server to be ready to serve the query, via executing simple query
// till the query succeeds. See SPARK-30345 for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public void run() {
keyStorePassword, sslVersionBlacklist);
}

// In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which
// represents any free port, we should set it to the actual one
portNum = serverSocket.getServerSocket().getLocalPort();

// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
int requestTimeout = (int) hiveConf.getTimeVar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public void run() {
// TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
// Finally, start the server
httpServer.start();
// In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with
// 0 which represents any free port, we should set it to the actual one
portNum = connector.getLocalPort();
String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public void run() {
keyStorePassword, sslVersionBlacklist);
}

// In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which
// represents any free port, we should set it to the actual one
portNum = serverSocket.getServerSocket().getLocalPort();

// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
int requestTimeout = (int) hiveConf.getTimeVar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ public void run() {
// TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
// Finally, start the server
httpServer.start();
// In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with
// 0 which represents any free port, we should set it to the actual one
portNum = connector.getLocalPort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn did you try to test that? For me setting hive.server2.thrift.http.port to 0 doesn't seem to work when trying to start it in http mode like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 kentyao@hulk  ~/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200601  sbin/start-thriftserver.sh --conf spark.hadoop.hive.server2.thrift.http.port=0 --conf spark.hadoop.hive.server2.transport.mode=http
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200601/logs/spark-kentyao-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hulk.local.out
 kentyao@hulk  ~/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200601 tail -f -2 /Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200601/logs/spark-kentyao-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hulk.local.out
20/06/05 21:26:20 INFO HiveThriftServer2: HiveThriftServer2 started
20/06/05 21:26:20 INFO ThriftCLIService: Started ThriftHttpCLIService in http mode on port 54379 path=/cliservice/* with 5...500 worker threads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks... let me debug this more on my end why am I getting 0 back...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verify this locally, it seems to be ok.

but in #28738, I just run into the same issue.

String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+ " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
Expand Down