Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}

private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with SparkOperationUtils
with Logging {

private var result: DataFrame = _
Expand Down Expand Up @@ -100,12 +101,15 @@ private[hive] class SparkExecuteStatementOperation(
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
case DateType =>
to += from.getAs[Date](ordinal)
case TimestampType =>
to += from.getAs[Timestamp](ordinal)
case BinaryType =>
to += from.getAs[Array[Byte]](ordinal)
// SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to String here to:
// - respect spark.sql.session.timeZone
// - work with spark.sql.datetime.java8API.enabled
// These types have always been sent over the wire as string, converted later.
case _: DateType | _: TimestampType =>
val hiveString = HiveResult.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to += hiveString
case CalendarIntervalType =>
to += HiveResult.toHiveString((from.getAs[CalendarInterval](ordinal), CalendarIntervalType))
case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] =>
Expand All @@ -114,7 +118,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool {
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
Expand Down Expand Up @@ -191,6 +195,12 @@ private[hive] class SparkExecuteStatementOperation(

def getResultSetSchema: TableSchema = resultSchema

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
Expand All @@ -217,7 +227,9 @@ private[hive] class SparkExecuteStatementOperation(
override def run(): Unit = {
registerCurrentOperationLog()
try {
execute()
withLocalProperties {
execute()
}
} catch {
case e: HiveSQLException =>
setOperationException(e)
Expand Down Expand Up @@ -259,7 +271,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def execute(): Unit = withSchedulerPool {
private def execute(): Unit = {
try {
synchronized {
if (getStatus.getState.isTerminal) {
Expand All @@ -282,13 +294,6 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.setJobGroup(statementId, statement)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " +
"in this session.")
case _ =>
}
HiveThriftServer2.eventManager.onStatementParsed(statementId,
result.queryExecution.toString())
iter = {
Expand Down Expand Up @@ -364,20 +369,6 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}

private def withSchedulerPool[T](body: => T): T = {
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
}
try {
body
} finally {
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null)
}
}
}
}

object SparkExecuteStatementOperation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetCatalogsOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession)
extends GetCatalogsOperation(parentSession) with Logging {
extends GetCatalogsOperation(parentSession) with SparkOperationUtils with Logging {

private var statementId: String = _

Expand All @@ -47,6 +47,12 @@ private[hive] class SparkGetCatalogsOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
val logMsg = "Listing catalogs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param columnName column name
*/
private[hive] class SparkGetColumnsOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
extends GetColumnsOperation(parentSession, catalogName, schemaName, tableName, columnName)
with Logging {
with SparkOperationUtils
with Logging {

val catalog: SessionCatalog = sqlContext.sessionState.catalog

Expand All @@ -66,6 +67,12 @@ private[hive] class SparkGetColumnsOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param functionName function name pattern
*/
private[hive] class SparkGetFunctionsOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
functionName: String)
extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging {
extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName)
with SparkOperationUtils
with Logging {

private var statementId: String = _

Expand All @@ -57,6 +59,12 @@ private[hive] class SparkGetFunctionsOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param schemaName database name, null or a concrete database name
*/
private[hive] class SparkGetSchemasOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String)
extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging {
extends GetSchemasOperation(parentSession, catalogName, schemaName)
with SparkOperationUtils
with Logging {

private var statementId: String = _

Expand All @@ -53,6 +55,12 @@ private[hive] class SparkGetSchemasOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetTableTypesOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession)
extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging {
extends GetTableTypesOperation(parentSession) with SparkOperationUtils with Logging {

private var statementId: String = _

Expand All @@ -48,6 +48,12 @@ private[hive] class SparkGetTableTypesOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
val logMsg = "Listing table types"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW"
*/
private[hive] class SparkGetTablesOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: JList[String])
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
with SparkMetadataOperationUtils with Logging {
with SparkOperationUtils
with Logging {

private var statementId: String = _

Expand All @@ -62,6 +63,12 @@ private[hive] class SparkGetTablesOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetTypeInfoOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession)
extends GetTypeInfoOperation(parentSession) with Logging {
extends GetTypeInfoOperation(parentSession)
with SparkOperationUtils
with Logging {

private var statementId: String = _

Expand All @@ -47,6 +49,12 @@ private[hive] class SparkGetTypeInfoOperation(
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def run(): Unit = {
withLocalProperties {
super.run()
}
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
val logMsg = "Listing type info"
Expand Down

This file was deleted.

Loading