Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}

private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with SparkOperation
with Logging {

private var result: DataFrame = _
Expand All @@ -62,7 +63,6 @@ private[hive] class SparkExecuteStatementOperation(
private var previousFetchStartOffset: Long = 0
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _

private lazy val resultSchema: TableSchema = {
if (result == null || result.schema.isEmpty) {
Expand All @@ -73,13 +73,6 @@ private[hive] class SparkExecuteStatementOperation(
}
}

override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logInfo(s"Close statement with $statementId")
cleanup(OperationState.CLOSED)
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
dataTypes(ordinal) match {
case StringType =>
Expand All @@ -100,12 +93,15 @@ private[hive] class SparkExecuteStatementOperation(
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
case DateType =>
to += from.getAs[Date](ordinal)
case TimestampType =>
to += from.getAs[Timestamp](ordinal)
case BinaryType =>
to += from.getAs[Array[Byte]](ordinal)
// SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to String here to:
// - respect spark.sql.session.timeZone
// - work with spark.sql.datetime.java8API.enabled
// These types have always been sent over the wire as string, converted later.
case _: DateType | _: TimestampType =>
val hiveString = HiveResult.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to += hiveString
case CalendarIntervalType =>
to += HiveResult.toHiveString((from.getAs[CalendarInterval](ordinal), CalendarIntervalType))
case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] =>
Expand All @@ -114,7 +110,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool {
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
Expand Down Expand Up @@ -193,7 +189,6 @@ private[hive] class SparkExecuteStatementOperation(

override def runInternal(): Unit = {
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
logInfo(s"Submitting query '$statement' with $statementId")
HiveThriftServer2.eventManager.onStatementStart(
statementId,
Expand All @@ -217,7 +212,9 @@ private[hive] class SparkExecuteStatementOperation(
override def run(): Unit = {
registerCurrentOperationLog()
try {
execute()
withLocalProperties {
execute()
}
} catch {
case e: HiveSQLException =>
setOperationException(e)
Expand Down Expand Up @@ -259,7 +256,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def execute(): Unit = withSchedulerPool {
private def execute(): Unit = {
try {
synchronized {
if (getStatus.getState.isTerminal) {
Expand All @@ -282,13 +279,6 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.setJobGroup(statementId, statement)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " +
"in this session.")
case _ =>
}
HiveThriftServer2.eventManager.onStatementParsed(statementId,
result.queryExecution.toString())
iter = {
Expand Down Expand Up @@ -346,38 +336,25 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup(OperationState.CANCELED)
cleanup()
setState(OperationState.CANCELED)
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}
}

private def cleanup(state: OperationState): Unit = {
setState(state)
override protected def cleanup(): Unit = {
if (runInBackground) {
val backgroundHandle = getBackgroundHandle()
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
}
// RDDs will be cleaned automatically upon garbage collection.
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}

private def withSchedulerPool[T](body: => T): T = {
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
}
try {
body
} finally {
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null)
}
}
}
}

object SparkExecuteStatementOperation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,13 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetCatalogsOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession)
extends GetCatalogsOperation(parentSession) with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
extends GetCatalogsOperation(parentSession)
with SparkOperation
with Logging {

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
val logMsg = "Listing catalogs"
logInfo(s"$logMsg with $statementId")
setState(OperationState.RUNNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,19 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param columnName column name
*/
private[hive] class SparkGetColumnsOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
extends GetColumnsOperation(parentSession, catalogName, schemaName, tableName, columnName)
with Logging {
with SparkOperation
with Logging {

val catalog: SessionCatalog = sqlContext.sessionState.catalog

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName"
val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,16 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param functionName function name pattern
*/
private[hive] class SparkGetFunctionsOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
functionName: String)
extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName)
with SparkOperation
with Logging {

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,15 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param schemaName database name, null or a concrete database name
*/
private[hive] class SparkGetSchemasOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String)
extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
extends GetSchemasOperation(parentSession, catalogName, schemaName)
with SparkOperation
with Logging {

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val logMsg = s"Listing databases '$cmdStr'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,11 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetTableTypesOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession)
extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
extends GetTableTypesOperation(parentSession)
with SparkOperation
with Logging {

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,17 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW"
*/
private[hive] class SparkGetTablesOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: JList[String])
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
with SparkMetadataOperationUtils with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
with SparkOperation
with Logging {

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,11 @@ import org.apache.spark.util.{Utils => SparkUtils}
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetTypeInfoOperation(
sqlContext: SQLContext,
val sqlContext: SQLContext,
parentSession: HiveSession)
extends GetTypeInfoOperation(parentSession) with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
extends GetTypeInfoOperation(parentSession)
with SparkOperation
with Logging {

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
Expand Down

This file was deleted.

Loading