diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index d14d70f7d3d83..b193c73563ae0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -44,12 +44,13 @@ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( + val sqlContext: SQLContext, parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], runInBackground: Boolean = true) - (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) + with SparkOperation with Logging { private var result: DataFrame = _ @@ -62,7 +63,6 @@ private[hive] class SparkExecuteStatementOperation( private var previousFetchStartOffset: Long = 0 private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ - private var statementId: String = _ private lazy val resultSchema: TableSchema = { if (result == null || result.schema.isEmpty) { @@ -73,13 +73,6 @@ private[hive] class SparkExecuteStatementOperation( } } - override def close(): Unit = { - // RDDs will be cleaned automatically upon garbage collection. - logInfo(s"Close statement with $statementId") - cleanup(OperationState.CLOSED) - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } - def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = { dataTypes(ordinal) match { case StringType => @@ -100,12 +93,15 @@ private[hive] class SparkExecuteStatementOperation( to += from.getByte(ordinal) case ShortType => to += from.getShort(ordinal) - case DateType => - to += from.getAs[Date](ordinal) - case TimestampType => - to += from.getAs[Timestamp](ordinal) case BinaryType => to += from.getAs[Array[Byte]](ordinal) + // SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to String here to: + // - respect spark.sql.session.timeZone + // - work with spark.sql.datetime.java8API.enabled + // These types have always been sent over the wire as string, converted later. + case _: DateType | _: TimestampType => + val hiveString = HiveResult.toHiveString((from.get(ordinal), dataTypes(ordinal))) + to += hiveString case CalendarIntervalType => to += HiveResult.toHiveString((from.getAs[CalendarInterval](ordinal), CalendarIntervalType)) case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] => @@ -114,7 +110,7 @@ private[hive] class SparkExecuteStatementOperation( } } - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool { + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + s"with ${statementId}") validateDefaultFetchOrientation(order) @@ -193,7 +189,6 @@ private[hive] class SparkExecuteStatementOperation( override def runInternal(): Unit = { setState(OperationState.PENDING) - statementId = UUID.randomUUID().toString logInfo(s"Submitting query '$statement' with $statementId") HiveThriftServer2.eventManager.onStatementStart( statementId, @@ -217,7 +212,9 @@ private[hive] class SparkExecuteStatementOperation( override def run(): Unit = { registerCurrentOperationLog() try { - execute() + withLocalProperties { + execute() + } } catch { case e: HiveSQLException => setOperationException(e) @@ -259,7 +256,7 @@ private[hive] class SparkExecuteStatementOperation( } } - private def execute(): Unit = withSchedulerPool { + private def execute(): Unit = { try { synchronized { if (getStatus.getState.isTerminal) { @@ -282,13 +279,6 @@ private[hive] class SparkExecuteStatementOperation( sqlContext.sparkContext.setJobGroup(statementId, statement) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) - result.queryExecution.logical match { - case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => - sessionToActivePool.put(parentSession.getSessionHandle, value) - logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " + - "in this session.") - case _ => - } HiveThriftServer2.eventManager.onStatementParsed(statementId, result.queryExecution.toString()) iter = { @@ -346,38 +336,25 @@ private[hive] class SparkExecuteStatementOperation( synchronized { if (!getStatus.getState.isTerminal) { logInfo(s"Cancel query with $statementId") - cleanup(OperationState.CANCELED) + cleanup() + setState(OperationState.CANCELED) HiveThriftServer2.eventManager.onStatementCanceled(statementId) } } } - private def cleanup(state: OperationState): Unit = { - setState(state) + override protected def cleanup(): Unit = { if (runInBackground) { val backgroundHandle = getBackgroundHandle() if (backgroundHandle != null) { backgroundHandle.cancel(true) } } + // RDDs will be cleaned automatically upon garbage collection. if (statementId != null) { sqlContext.sparkContext.cancelJobGroup(statementId) } } - - private def withSchedulerPool[T](body: => T): T = { - val pool = sessionToActivePool.get(parentSession.getSessionHandle) - if (pool != null) { - sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) - } - try { - body - } finally { - if (pool != null) { - sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null) - } - } - } } object SparkExecuteStatementOperation { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index 2945cfd200e46..55070e035b944 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -36,19 +36,13 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param parentSession a HiveSession from SessionManager */ private[hive] class SparkGetCatalogsOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession) - extends GetCatalogsOperation(parentSession) with Logging { - - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } + extends GetCatalogsOperation(parentSession) + with SparkOperation + with Logging { override def runInternal(): Unit = { - statementId = UUID.randomUUID().toString val logMsg = "Listing catalogs" logInfo(s"$logMsg with $statementId") setState(OperationState.RUNNING) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index ff7cbfeae13be..ca8ad5e6ad134 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -48,26 +48,19 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param columnName column name */ private[hive] class SparkGetColumnsOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession, catalogName: String, schemaName: String, tableName: String, columnName: String) extends GetColumnsOperation(parentSession, catalogName, schemaName, tableName, columnName) - with Logging { + with SparkOperation + with Logging { val catalog: SessionCatalog = sqlContext.sessionState.catalog - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } - override def runInternal(): Unit = { - statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index d9c12b6ca9e64..f5e647bfd4f38 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -43,22 +43,16 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param functionName function name pattern */ private[hive] class SparkGetFunctionsOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession, catalogName: String, schemaName: String, functionName: String) - extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging { - - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } + extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) + with SparkOperation + with Logging { override def runInternal(): Unit = { - statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index db19880d1b99f..74220986fcd34 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -40,21 +40,15 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param schemaName database name, null or a concrete database name */ private[hive] class SparkGetSchemasOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession, catalogName: String, schemaName: String) - extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging { - - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } + extends GetSchemasOperation(parentSession, catalogName, schemaName) + with SparkOperation + with Logging { override def runInternal(): Unit = { - statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing databases '$cmdStr'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index b4093e58d3c07..1cf9c3a731af5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -37,16 +37,11 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param parentSession a HiveSession from SessionManager */ private[hive] class SparkGetTableTypesOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession) - extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging { - - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } + extends GetTableTypesOperation(parentSession) + with SparkOperation + with Logging { override def runInternal(): Unit = { statementId = UUID.randomUUID().toString diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 45c6d980aac47..a1d21e2d60c63 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -46,24 +46,17 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW" */ private[hive] class SparkGetTablesOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession, catalogName: String, schemaName: String, tableName: String, tableTypes: JList[String]) extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) - with SparkMetadataOperationUtils with Logging { - - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } + with SparkOperation + with Logging { override def runInternal(): Unit = { - statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala index dd5668a93f82d..e38139d60df60 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala @@ -36,16 +36,11 @@ import org.apache.spark.util.{Utils => SparkUtils} * @param parentSession a HiveSession from SessionManager */ private[hive] class SparkGetTypeInfoOperation( - sqlContext: SQLContext, + val sqlContext: SQLContext, parentSession: HiveSession) - extends GetTypeInfoOperation(parentSession) with Logging { - - private var statementId: String = _ - - override def close(): Unit = { - super.close() - HiveThriftServer2.eventManager.onOperationClosed(statementId) - } + extends GetTypeInfoOperation(parentSession) + with SparkOperation + with Logging { override def runInternal(): Unit = { statementId = UUID.randomUUID().toString diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala deleted file mode 100644 index f4c4b04bada2a..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW} - -/** - * Utils for metadata operations. - */ -private[hive] trait SparkMetadataOperationUtils { - - def tableTypeString(tableType: CatalogTableType): String = tableType match { - case EXTERNAL | MANAGED => "TABLE" - case VIEW => "VIEW" - case t => - throw new IllegalArgumentException(s"Unknown table type is found: $t") - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala new file mode 100644 index 0000000000000..3da568cfa256e --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import org.apache.hive.service.cli.OperationState +import org.apache.hive.service.cli.operation.Operation + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * Utils for Spark operations. + */ +private[hive] trait SparkOperation extends Operation with Logging { + + protected def sqlContext: SQLContext + + protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString() + + protected def cleanup(): Unit = Unit // noop by default + + abstract override def run(): Unit = { + withLocalProperties { + super.run() + } + } + + abstract override def close(): Unit = { + cleanup() + super.close() + logInfo(s"Close statement with $statementId") + HiveThriftServer2.eventManager.onOperationClosed(statementId) + } + + // Set thread local properties for the execution of the operation. + // This method should be applied during the execution of the operation, by all the child threads. + // The original spark context local properties will be restored after the operation. + // + // It is used to: + // - set appropriate SparkSession + // - set scheduler pool for the operation + def withLocalProperties[T](f: => T): T = { + val originalProps = Utils.cloneProperties(sqlContext.sparkContext.getLocalProperties) + val originalSession = SparkSession.getActiveSession + + try { + // Set active SparkSession + SparkSession.setActiveSession(sqlContext.sparkSession) + + // Set scheduler pool + sqlContext.sparkSession.conf.getOption(SQLConf.THRIFTSERVER_POOL.key) match { + case Some(pool) => + sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) + case None => + } + + // run the body + f + } finally { + // reset local properties, will also reset SPARK_SCHEDULER_POOL + sqlContext.sparkContext.setLocalProperties(originalProps) + + originalSession match { + case Some(session) => SparkSession.setActiveSession(session) + case None => SparkSession.clearActiveSession() + } + } + } + + def tableTypeString(tableType: CatalogTableType): String = tableType match { + case EXTERNAL | MANAGED => "TABLE" + case VIEW => "VIEW" + case t => + throw new IllegalArgumentException(s"Unknown table type is found: $t") + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index b3171897141c2..e10e7ed1a2769 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -78,7 +78,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: val ctx = sparkSqlOperationManager.sessionToContexts.getOrDefault(sessionHandle, sqlContext) ctx.sparkSession.sessionState.catalog.getTempViewNames().foreach(ctx.uncacheTable) super.closeSession(sessionHandle) - sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle) sparkSqlOperationManager.sessionToContexts.remove(sessionHandle) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 3396560f43502..bc9c13eb0d4f8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -38,7 +38,6 @@ private[thriftserver] class SparkSQLOperationManager() val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( @@ -51,8 +50,8 @@ private[thriftserver] class SparkSQLOperationManager() s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) - val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, - runInBackground)(sqlContext, sessionToActivePool) + val operation = new SparkExecuteStatementOperation( + sqlContext, parentSession, statement, confOverlay, runInBackground) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 0cec63460814c..21256ad02c134 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -811,6 +811,61 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } } + + test("SPARK-31859 Thriftserver works with spark.sql.datetime.java8API.enabled=true") { + withJdbcStatement() { statement => + withJdbcStatement() { st => + st.execute("set spark.sql.datetime.java8API.enabled=true") + val rs = st.executeQuery("select date '2020-05-28', timestamp '2020-05-28 00:00:00'") + rs.next() + assert(rs.getDate(1).toString() == "2020-05-28") + assert(rs.getTimestamp(2).toString() == "2020-05-28 00:00:00.0") + } + } + } + + test("SPARK-31861 Thriftserver respects spark.sql.session.timeZone") { + withJdbcStatement() { statement => + withJdbcStatement() { st => + st.execute("set spark.sql.session.timeZone=+03:15") // different than Thriftserver's JVM tz + val rs = st.executeQuery("select timestamp '2020-05-28 10:00:00'") + rs.next() + // The timestamp as string is the same as the literal + assert(rs.getString(1) == "2020-05-28 10:00:00.0") + // Parsing it to java.sql.Timestamp in the client will always result in a timestamp + // in client default JVM timezone. The string value of the Timestamp will match the literal, + // but if the JDBC application cares about the internal timezone and UTC offset of the + // Timestamp object, it should set spark.sql.session.timeZone to match its client JVM tz. + assert(rs.getTimestamp(1).toString() == "2020-05-28 10:00:00.0") + } + } + } + + test("SPARK-31863 Session conf should persist between Thriftserver worker threads") { + val iter = 20 + withJdbcStatement() { statement => + // date 'now' is resolved during parsing, and relies on SQLConf.get to + // obtain the current set timezone. We exploit this to run this test. + // If the timezones are set correctly to 25 hours apart across threads, + // the dates should reflect this. + + // iterate a few times for the odd chance the same thread is selected + for (_ <- 0 until iter) { + statement.execute("SET spark.sql.session.timeZone=GMT-12") + val firstResult = statement.executeQuery("SELECT date 'now'") + firstResult.next() + val beyondDateLineWest = firstResult.getDate(1) + + statement.execute("SET spark.sql.session.timeZone=GMT+13") + val secondResult = statement.executeQuery("SELECT date 'now'") + secondResult.next() + val dateLineEast = secondResult.getDate(1) + assert( + dateLineEast after beyondDateLineWest, + "SQLConf changes should persist across execution threads") + } + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java index a770bea9c2aa6..462b93a0f09fe 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java @@ -123,22 +123,6 @@ private static TColumnValue stringValue(HiveVarchar value) { return TColumnValue.stringVal(tStringValue); } - private static TColumnValue dateValue(Date value) { - TStringValue tStringValue = new TStringValue(); - if (value != null) { - tStringValue.setValue(value.toString()); - } - return new TColumnValue(TColumnValue.stringVal(tStringValue)); - } - - private static TColumnValue timestampValue(Timestamp value) { - TStringValue tStringValue = new TStringValue(); - if (value != null) { - tStringValue.setValue(value.toString()); - } - return TColumnValue.stringVal(tStringValue); - } - private static TColumnValue stringValue(HiveIntervalYearMonth value) { TStringValue tStrValue = new TStringValue(); if (value != null) { @@ -178,9 +162,9 @@ public static TColumnValue toTColumnValue(Type type, Object value) { case VARCHAR_TYPE: return stringValue((HiveVarchar)value); case DATE_TYPE: - return dateValue((Date)value); case TIMESTAMP_TYPE: - return timestampValue((Timestamp)value); + // SPARK-31859, SPARK-31861: converted to string already in SparkExecuteStatementOperation + return stringValue((String)value); case INTERVAL_YEAR_MONTH_TYPE: return stringValue((HiveIntervalYearMonth) value); case INTERVAL_DAY_TIME_TYPE: diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java index 51bb28748d9e2..4b331423948fa 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java @@ -280,7 +280,10 @@ public void cancel() throws HiveSQLException { throw new UnsupportedOperationException("SQLOperation.cancel()"); } - public abstract void close() throws HiveSQLException; + public void close() throws HiveSQLException { + setState(OperationState.CLOSED); + cleanupOperationLog(); + } public abstract TableSchema getResultSetSchema() throws HiveSQLException; diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java index 53f0465a056d8..85adf55df15e0 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java @@ -124,22 +124,6 @@ private static TColumnValue stringValue(HiveVarchar value) { return TColumnValue.stringVal(tStringValue); } - private static TColumnValue dateValue(Date value) { - TStringValue tStringValue = new TStringValue(); - if (value != null) { - tStringValue.setValue(value.toString()); - } - return new TColumnValue(TColumnValue.stringVal(tStringValue)); - } - - private static TColumnValue timestampValue(Timestamp value) { - TStringValue tStringValue = new TStringValue(); - if (value != null) { - tStringValue.setValue(value.toString()); - } - return TColumnValue.stringVal(tStringValue); - } - private static TColumnValue stringValue(HiveIntervalYearMonth value) { TStringValue tStrValue = new TStringValue(); if (value != null) { @@ -181,9 +165,9 @@ public static TColumnValue toTColumnValue(TypeDescriptor typeDescriptor, Object case VARCHAR_TYPE: return stringValue((HiveVarchar)value); case DATE_TYPE: - return dateValue((Date)value); case TIMESTAMP_TYPE: - return timestampValue((Timestamp)value); + // SPARK-31859, SPARK-31861: converted to string already in SparkExecuteStatementOperation + return stringValue((String)value); case INTERVAL_YEAR_MONTH_TYPE: return stringValue((HiveIntervalYearMonth) value); case INTERVAL_DAY_TIME_TYPE: diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java index f26c715add987..558c68f85c16b 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java @@ -298,7 +298,10 @@ public void cancel() throws HiveSQLException { throw new UnsupportedOperationException("SQLOperation.cancel()"); } - public abstract void close() throws HiveSQLException; + public void close() throws HiveSQLException { + setState(OperationState.CLOSED); + cleanupOperationLog(); + } public abstract TableSchema getResultSetSchema() throws HiveSQLException;