diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index eb12f2f1f6ab7..1360d30fdd575 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -480,7 +480,8 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", "org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", "org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite", - "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite", + "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite", + "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite", "org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite" ) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index e002bc0117c8b..c9e41db52cd50 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ private var serverPort: Int = 0 + def mode: ServerMode.Value + override def beforeAll(): Unit = { super.beforeAll() // Retries up to 3 times with different port numbers if the server fails to start @@ -53,11 +55,17 @@ trait SharedThriftServer extends SharedSparkSession { } } + protected def jdbcUri: String = if (mode == ServerMode.http) { + s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice" + } else { + s"jdbc:hive2://localhost:$serverPort" + } + protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = { val user = System.getProperty("user.name") require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2") val connections = - fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") } + fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } val statements = connections.map(_.createStatement()) try { @@ -71,21 +79,33 @@ trait SharedThriftServer extends SharedSparkSession { private def startThriftServer(attempt: Int): Unit = { logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt") val sqlContext = spark.newSession().sqlContext - // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use. + // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could + // randomly pick any free port to use. // It's much more robust than set a random port generated by ourselves ahead sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") - hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) - hiveServer2.getServices.asScala.foreach { - case t: ThriftCLIService if t.getPortNumber != 0 => - serverPort = t.getPortNumber - logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") - case _ => - } + sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") + sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) + + try { + hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) + hiveServer2.getServices.asScala.foreach { + case t: ThriftCLIService => + serverPort = t.getPortNumber + logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") + case _ => + } - // Wait for thrift server to be ready to serve the query, via executing simple query - // till the query succeeds. See SPARK-30345 for more details. - eventually(timeout(30.seconds), interval(1.seconds)) { - withJdbcStatement { _.execute("SELECT 1") } + // Wait for thrift server to be ready to serve the query, via executing simple query + // till the query succeeds. See SPARK-30345 for more details. + eventually(timeout(30.seconds), interval(1.seconds)) { + withJdbcStatement { _.execute("SELECT 1") } + } + } catch { + case e: Exception => + logError("Error start hive server with Context ", e) + if (hiveServer2 != null) { + hiveServer2.stop() + } } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 15cc3109da3f7..553f10a275bce 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -54,6 +54,9 @@ import org.apache.spark.sql.types._ */ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer { + + override def mode: ServerMode.Value = ServerMode.binary + override protected def testFile(fileName: String): String = { val url = Thread.currentThread().getContextClassLoader.getResource(fileName) // Copy to avoid URISyntaxException during accessing the resources in `sql/core` diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 3e1fce78ae71c..d6420dee41adb 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -class ThriftServerWithSparkContextSuite extends SharedThriftServer { +trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("SPARK-29911: Uncache cached tables when session closed") { val cacheManager = spark.sharedState.cacheManager @@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer { } } } + + +class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite { + override def mode: ServerMode.Value = ServerMode.binary +} + +class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite { + override def mode: ServerMode.Value = ServerMode.http +} diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index e1ee503b81209..00bdf7e19126e 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; @@ -45,7 +46,7 @@ public ThriftBinaryCLIService(CLIService cliService) { } @Override - public void run() { + protected void initializeServer() { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; @@ -100,6 +101,14 @@ public void run() { String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + @Override + public void run() { + try { server.serve(); } catch (Throwable t) { LOG.fatal( diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 8fce9d9383438..783e5795aca76 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -175,6 +175,7 @@ public synchronized void init(HiveConf hiveConf) { public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { + initializeServer(); new Thread(this).start(); isStarted = true; } @@ -633,6 +634,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { return resp; } + protected abstract void initializeServer(); + @Override public abstract void run(); diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 1099a00b67eb7..bd64c777c1d76 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.TCLIService.Iface; @@ -53,13 +54,8 @@ public ThriftHttpCLIService(CLIService cliService) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); } - /** - * Configure Jetty to serve http requests. Example of a client connection URL: - * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, - * e.g. http://gateway:port/hive2/servlets/thrifths2/ - */ @Override - public void run() { + protected void initializeServer() { try { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests @@ -150,6 +146,19 @@ public void run() { + " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + /** + * Configure Jetty to serve http requests. Example of a client connection URL: + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, + * e.g. http://gateway:port/hive2/servlets/thrifths2/ + */ + @Override + public void run() { + try { httpServer.join(); } catch (Throwable t) { LOG.fatal( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index a7de9c0f3d0d2..ce79e3c8228a6 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; @@ -46,7 +47,7 @@ public ThriftBinaryCLIService(CLIService cliService) { } @Override - public void run() { + protected void initializeServer() { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; @@ -101,6 +102,14 @@ public void run() { String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + @Override + public void run() { + try { server.serve(); } catch (Throwable t) { LOG.error( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index d41c3b493bb47..e46799a1c427d 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -176,6 +176,7 @@ public synchronized void init(HiveConf hiveConf) { public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { + initializeServer(); new Thread(this).start(); isStarted = true; } @@ -670,6 +671,8 @@ public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req) return resp; } + protected abstract void initializeServer(); + @Override public abstract void run(); diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 73d5f84476af0..ab9ed5b1f371e 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.rpc.thrift.TCLIService; @@ -54,13 +55,8 @@ public ThriftHttpCLIService(CLIService cliService) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); } - /** - * Configure Jetty to serve http requests. Example of a client connection URL: - * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, - * e.g. http://gateway:port/hive2/servlets/thrifths2/ - */ @Override - public void run() { + protected void initializeServer() { try { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests @@ -151,6 +147,19 @@ public void run() { + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + /** + * Configure Jetty to serve http requests. Example of a client connection URL: + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, + * e.g. http://gateway:port/hive2/servlets/thrifths2/ + */ + @Override + public void run() { + try { httpServer.join(); } catch (Throwable t) { LOG.error(