From 03f41ab2017f8e09af2526a26322e3c9291031ce Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 16 Jun 2020 11:40:39 +0800 Subject: [PATCH 1/7] Revert "Revert "[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue for ThriftCLIService to getPortNumber"" This reverts commit 75afd889044c789b259b9e609aea34b542490f53. --- project/SparkBuild.scala | 1 - .../src/test/resources/log4j.properties | 65 +++++++++++++++++++ .../thriftserver/SharedThriftServer.scala | 50 ++++++++++---- .../ThriftServerQueryTestSuite.scala | 3 + .../ThriftServerWithSparkContextSuite.scala | 11 +++- .../cli/thrift/ThriftBinaryCLIService.java | 11 +++- .../service/cli/thrift/ThriftCLIService.java | 3 + .../cli/thrift/ThriftHttpCLIService.java | 21 ++++-- .../cli/thrift/ThriftBinaryCLIService.java | 11 +++- .../service/cli/thrift/ThriftCLIService.java | 3 + .../cli/thrift/ThriftHttpCLIService.java | 21 ++++-- 11 files changed, 170 insertions(+), 30 deletions(-) create mode 100644 sql/hive-thriftserver/src/test/resources/log4j.properties diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 0035f1d95a90d..04a3fc4b63050 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -480,7 +480,6 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", "org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", "org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite", - "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite", "org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite" ) diff --git a/sql/hive-thriftserver/src/test/resources/log4j.properties b/sql/hive-thriftserver/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..21975ba818142 --- /dev/null +++ b/sql/hive-thriftserver/src/test/resources/log4j.properties @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file hive-thriftserver/target/unit-tests.log +log4j.rootLogger=DEBUG, CA, FA + +#Console Appender +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n +log4j.appender.CA.Threshold = WARN + + +#File Appender +log4j.appender.FA=org.apache.log4j.FileAppender +log4j.appender.FA.append=false +log4j.appender.FA.file=target/unit-tests.log +log4j.appender.FA.layout=org.apache.log4j.PatternLayout +log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Set the logger level of File Appender to WARN +log4j.appender.FA.Threshold = DEBUG + +# Some packages are noisy for no good reason. +log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false +log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF + +log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF + +log4j.additivity.hive.log=false +log4j.logger.hive.log=OFF + +log4j.additivity.parquet.hadoop.ParquetRecordReader=false +log4j.logger.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false +log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false +log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF + +log4j.additivity.hive.ql.metadata.Hive=false +log4j.logger.hive.ql.metadata.Hive=OFF + +log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false +log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR + +# Parquet related logging +log4j.logger.org.apache.parquet.CorruptStatistics=ERROR +log4j.logger.parquet.CorruptStatistics=ERROR diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index e002bc0117c8b..1c33abff0780c 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hive.service.cli.thrift.ThriftCLIService import org.apache.spark.sql.test.SharedSparkSession @@ -33,6 +34,8 @@ trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ private var serverPort: Int = 0 + def mode: ServerMode.Value + override def beforeAll(): Unit = { super.beforeAll() // Retries up to 3 times with different port numbers if the server fails to start @@ -50,14 +53,21 @@ trait SharedThriftServer extends SharedSparkSession { hiveServer2.stop() } finally { super.afterAll() + SessionState.detachSession() } } + protected def jdbcUri: String = if (mode == ServerMode.http) { + s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice" + } else { + s"jdbc:hive2://localhost:$serverPort/" + } + protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = { val user = System.getProperty("user.name") require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2") val connections = - fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") } + fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } val statements = connections.map(_.createStatement()) try { @@ -69,23 +79,35 @@ trait SharedThriftServer extends SharedSparkSession { } private def startThriftServer(attempt: Int): Unit = { - logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt") + logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt") val sqlContext = spark.newSession().sqlContext - // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use. + // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could + // randomly pick any free port to use. // It's much more robust than set a random port generated by ourselves ahead sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") - hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) - hiveServer2.getServices.asScala.foreach { - case t: ThriftCLIService if t.getPortNumber != 0 => - serverPort = t.getPortNumber - logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") - case _ => - } + sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") + sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) + + try { + hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) + hiveServer2.getServices.asScala.foreach { + case t: ThriftCLIService => + serverPort = t.getPortNumber + logInfo(s"Started HiveThriftServer2: mode=$mode, port=$serverPort, attempt=$attempt") + case _ => + } - // Wait for thrift server to be ready to serve the query, via executing simple query - // till the query succeeds. See SPARK-30345 for more details. - eventually(timeout(30.seconds), interval(1.seconds)) { - withJdbcStatement { _.execute("SELECT 1") } + // Wait for thrift server to be ready to serve the query, via executing simple query + // till the query succeeds. See SPARK-30345 for more details. + eventually(timeout(30.seconds), interval(1.seconds)) { + withJdbcStatement { _.execute("SELECT 1") } + } + } catch { + case e: Exception => + logError("Error start hive server with Context ", e) + if (hiveServer2 != null) { + hiveServer2.stop() + } } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 15cc3109da3f7..553f10a275bce 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -54,6 +54,9 @@ import org.apache.spark.sql.types._ */ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer { + + override def mode: ServerMode.Value = ServerMode.binary + override protected def testFile(fileName: String): String = { val url = Thread.currentThread().getContextClassLoader.getResource(fileName) // Copy to avoid URISyntaxException during accessing the resources in `sql/core` diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 3e1fce78ae71c..d6420dee41adb 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -class ThriftServerWithSparkContextSuite extends SharedThriftServer { +trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("SPARK-29911: Uncache cached tables when session closed") { val cacheManager = spark.sharedState.cacheManager @@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer { } } } + + +class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite { + override def mode: ServerMode.Value = ServerMode.binary +} + +class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite { + override def mode: ServerMode.Value = ServerMode.http +} diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index e1ee503b81209..00bdf7e19126e 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; @@ -45,7 +46,7 @@ public ThriftBinaryCLIService(CLIService cliService) { } @Override - public void run() { + protected void initializeServer() { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; @@ -100,6 +101,14 @@ public void run() { String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + @Override + public void run() { + try { server.serve(); } catch (Throwable t) { LOG.fatal( diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 8fce9d9383438..783e5795aca76 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -175,6 +175,7 @@ public synchronized void init(HiveConf hiveConf) { public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { + initializeServer(); new Thread(this).start(); isStarted = true; } @@ -633,6 +634,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { return resp; } + protected abstract void initializeServer(); + @Override public abstract void run(); diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 1099a00b67eb7..bd64c777c1d76 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.TCLIService.Iface; @@ -53,13 +54,8 @@ public ThriftHttpCLIService(CLIService cliService) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); } - /** - * Configure Jetty to serve http requests. Example of a client connection URL: - * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, - * e.g. http://gateway:port/hive2/servlets/thrifths2/ - */ @Override - public void run() { + protected void initializeServer() { try { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests @@ -150,6 +146,19 @@ public void run() { + " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + /** + * Configure Jetty to serve http requests. Example of a client connection URL: + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, + * e.g. http://gateway:port/hive2/servlets/thrifths2/ + */ + @Override + public void run() { + try { httpServer.join(); } catch (Throwable t) { LOG.fatal( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index a7de9c0f3d0d2..ce79e3c8228a6 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; @@ -46,7 +47,7 @@ public ThriftBinaryCLIService(CLIService cliService) { } @Override - public void run() { + protected void initializeServer() { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; @@ -101,6 +102,14 @@ public void run() { String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + @Override + public void run() { + try { server.serve(); } catch (Throwable t) { LOG.error( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index d41c3b493bb47..e46799a1c427d 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -176,6 +176,7 @@ public synchronized void init(HiveConf hiveConf) { public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { + initializeServer(); new Thread(this).start(); isStarted = true; } @@ -670,6 +671,8 @@ public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req) return resp; } + protected abstract void initializeServer(); + @Override public abstract void run(); diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 73d5f84476af0..ab9ed5b1f371e 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.rpc.thrift.TCLIService; @@ -54,13 +55,8 @@ public ThriftHttpCLIService(CLIService cliService) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); } - /** - * Configure Jetty to serve http requests. Example of a client connection URL: - * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, - * e.g. http://gateway:port/hive2/servlets/thrifths2/ - */ @Override - public void run() { + protected void initializeServer() { try { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests @@ -151,6 +147,19 @@ public void run() { + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + /** + * Configure Jetty to serve http requests. Example of a client connection URL: + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, + * e.g. http://gateway:port/hive2/servlets/thrifths2/ + */ + @Override + public void run() { + try { httpServer.join(); } catch (Throwable t) { LOG.error( From 9c6ac83afbe26f1277a72c04ac3b1ec40a8c7141 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 16 Jun 2020 11:42:38 +0800 Subject: [PATCH 2/7] rebase and classloader update --- .../hive/thriftserver/HiveThriftServer2.scala | 1 + .../thriftserver/SharedThriftServer.scala | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index f9f2ceeed8a75..c9597746fde94 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -57,6 +57,7 @@ object HiveThriftServer2 extends Logging { val executionHive = HiveUtils.newClientForExecution( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) + executionHive.conf.setClassLoader(sqlContext.sharedState.jarClassLoader) server.init(executionHive.conf) server.start() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index 1c33abff0780c..f79818a30145b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -24,15 +24,22 @@ import scala.concurrent.duration._ import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hive.service.cli.thrift.ThriftCLIService import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ private var serverPort: Int = 0 + private val metastorePath = { + val dir = Utils.createTempDir() + dir.delete() + dir + } def mode: ServerMode.Value @@ -50,10 +57,13 @@ trait SharedThriftServer extends SharedSparkSession { override def afterAll(): Unit = { try { - hiveServer2.stop() + if (hiveServer2 != null) { + hiveServer2.stop() + } } finally { super.afterAll() SessionState.detachSession() + Hive.closeCurrent() } } @@ -81,6 +91,9 @@ trait SharedThriftServer extends SharedSparkSession { private def startThriftServer(attempt: Int): Unit = { logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt") val sqlContext = spark.newSession().sqlContext + sqlContext.setConf(ConfVars.METASTORECONNECTURLKEY.varname, + s"jdbc:derby:;databaseName=$metastorePath;create=true") + sqlContext.setConf(ConfVars.METASTOREURIS.varname, "") // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could // randomly pick any free port to use. // It's much more robust than set a random port generated by ourselves ahead @@ -107,7 +120,11 @@ trait SharedThriftServer extends SharedSparkSession { logError("Error start hive server with Context ", e) if (hiveServer2 != null) { hiveServer2.stop() + hiveServer2 = null } + SessionState.detachSession() + Hive.closeCurrent() + throw e } } } From 5e79ad98264a263b5d6d9829dc81258f3eff1497 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 16 Jun 2020 16:35:32 +0800 Subject: [PATCH 3/7] override start method --- .../thriftserver/SparkSQLCLIService.scala | 48 +++++++++++++++++-- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 1644ecb2453be..1b1eff11b6c63 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -22,13 +22,14 @@ import java.util.{List => JList} import javax.security.auth.login.LoginException import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.shims.Utils import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} -import org.apache.hive.service.{AbstractService, Service, ServiceException} +import org.apache.hive.service.{AbstractService, CompositeService, Service, ServiceException} import org.apache.hive.service.Service.STATE import org.apache.hive.service.auth.HiveAuthFactory import org.apache.hive.service.cli._ @@ -94,6 +95,12 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC initCompositeService(hiveConf) } + /** + * the super class [[CLIService#start]] starts a useless dummy metastore client, skip it and call + * the ancestor [[CompositeService#start]] directly. + */ + override def start(): Unit = startCompositeService() + override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = { getInfoType match { case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL") @@ -105,6 +112,19 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC } private[thriftserver] trait ReflectedCompositeService { this: AbstractService => + + private val logInfo = (msg: String) => if (HiveUtils.isHive23) { + getAncestorField[Logger](this, 3, "LOG").info(msg) + } else { + getAncestorField[Log](this, 3, "LOG").info(msg) + } + + private val logError = (msg: String, e: Throwable) => if (HiveUtils.isHive23) { + getAncestorField[Logger](this, 3, "LOG").error(msg, e) + } else { + getAncestorField[Log](this, 3, "LOG").error(msg, e) + } + def initCompositeService(hiveConf: HiveConf): Unit = { // Emulating `CompositeService.init(hiveConf)` val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") @@ -114,10 +134,28 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) setAncestorField(this, 3, "hiveConf", hiveConf) invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED) - if (HiveUtils.isHive23) { - getAncestorField[Logger](this, 3, "LOG").info(s"Service: $getName is inited.") - } else { - getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.") + logInfo(s"Service: $getName is inited.") + } + + def startCompositeService(): Unit = { + // Emulating `CompositeService.init(hiveConf)` + val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") + var serviceStartCount = 0 + try { + serviceList.asScala.foreach { service => + service.start() + serviceStartCount += 1 + } + } catch { + case NonFatal(e) => + logError(s"Error starting services $getName", e) + invoke(classOf[CompositeService], this, "stop", + classOf[Int] -> new Integer(serviceStartCount)) } + + // Emulating `AbstractService.start` + invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.INITED) + invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.STARTED) + logInfo(s"Service: $getName is started.") } } From 9cdd7fac27148308a78bd59d39954b8f86984493 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 16 Jun 2020 18:10:46 +0800 Subject: [PATCH 4/7] won't reach class loder part anymore --- .../apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index c9597746fde94..f9f2ceeed8a75 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -57,7 +57,6 @@ object HiveThriftServer2 extends Logging { val executionHive = HiveUtils.newClientForExecution( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) - executionHive.conf.setClassLoader(sqlContext.sharedState.jarClassLoader) server.init(executionHive.conf) server.start() From 5ce343aef6e17eaedb258f19f21823015e8066ce Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 16 Jun 2020 20:46:46 +0800 Subject: [PATCH 5/7] throw --- .../apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 1b1eff11b6c63..d45eeeb9c593e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -151,6 +151,7 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => logError(s"Error starting services $getName", e) invoke(classOf[CompositeService], this, "stop", classOf[Int] -> new Integer(serviceStartCount)) + throw new ServiceException("Failed to Start " + getName, e) } // Emulating `AbstractService.start` From d5341ea8b3d46c44b35c4a0eb293da4cda40bcc3 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 18 Jun 2020 18:49:10 +0800 Subject: [PATCH 6/7] start time --- .../spark/sql/hive/thriftserver/SparkSQLCLIService.scala | 4 +++- .../spark/sql/hive/thriftserver/SharedThriftServer.scala | 9 --------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index d45eeeb9c593e..6ed2df7e3d006 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -138,7 +138,7 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => } def startCompositeService(): Unit = { - // Emulating `CompositeService.init(hiveConf)` + // Emulating `CompositeService.start` val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") var serviceStartCount = 0 try { @@ -155,6 +155,8 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => } // Emulating `AbstractService.start` + val startTime = new java.lang.Long(System.currentTimeMillis()) + setAncestorField(this, 3, "startTime", startTime) invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.INITED) invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.STARTED) logInfo(s"Service: $getName is started.") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index f79818a30145b..df86cdef3a337 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -29,17 +29,11 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hive.service.cli.thrift.ThriftCLIService import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.Utils trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ private var serverPort: Int = 0 - private val metastorePath = { - val dir = Utils.createTempDir() - dir.delete() - dir - } def mode: ServerMode.Value @@ -91,9 +85,6 @@ trait SharedThriftServer extends SharedSparkSession { private def startThriftServer(attempt: Int): Unit = { logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt") val sqlContext = spark.newSession().sqlContext - sqlContext.setConf(ConfVars.METASTORECONNECTURLKEY.varname, - s"jdbc:derby:;databaseName=$metastorePath;create=true") - sqlContext.setConf(ConfVars.METASTOREURIS.varname, "") // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could // randomly pick any free port to use. // It's much more robust than set a random port generated by ourselves ahead From e409f9ac1dd899e1d81f1e45000e4799ce3e3a2c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 18 Jun 2020 22:30:27 +0800 Subject: [PATCH 7/7] move service.start into try-catch --- .../sql/hive/thriftserver/SparkSQLCLIService.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 6ed2df7e3d006..984625c76e057 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -146,6 +146,12 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => service.start() serviceStartCount += 1 } + // Emulating `AbstractService.start` + val startTime = new java.lang.Long(System.currentTimeMillis()) + setAncestorField(this, 3, "startTime", startTime) + invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.INITED) + invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.STARTED) + logInfo(s"Service: $getName is started.") } catch { case NonFatal(e) => logError(s"Error starting services $getName", e) @@ -153,12 +159,5 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => classOf[Int] -> new Integer(serviceStartCount)) throw new ServiceException("Failed to Start " + getName, e) } - - // Emulating `AbstractService.start` - val startTime = new java.lang.Long(System.currentTimeMillis()) - setAncestorField(this, 3, "startTime", startTime) - invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.INITED) - invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.STARTED) - logInfo(s"Service: $getName is started.") } }