Skip to content

Commit bbb62c5

Browse files
committed
Revert "[SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener"
This reverts commit 6994c64.
1 parent 7ce3f76 commit bbb62c5

File tree

5 files changed

+51
-170
lines changed

5 files changed

+51
-170
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala

Lines changed: 49 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
2525
import org.apache.hive.service.server.HiveServer2
2626

2727
import org.apache.spark.{SparkConf, SparkContext}
28-
import org.apache.spark.internal.Logging
2928
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
3029
import org.apache.spark.scheduler._
3130
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -39,7 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener(
3938
kvstore: ElementTrackingStore,
4039
sparkConf: SparkConf,
4140
server: Option[HiveServer2],
42-
live: Boolean = true) extends SparkListener with Logging {
41+
live: Boolean = true) extends SparkListener {
4342

4443
private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
4544
private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()
@@ -132,81 +131,60 @@ private[thriftserver] class HiveThriftServer2Listener(
132131
updateLiveStore(session)
133132
}
134133

135-
private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit =
136-
Option(sessionList.get(e.sessionId)) match {
137-
case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
138-
case Some(sessionData) =>
139-
val session = sessionData
140-
session.finishTimestamp = e.finishTime
141-
updateStoreWithTriggerEnabled(session)
142-
sessionList.remove(e.sessionId)
143-
}
134+
private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
135+
val session = sessionList.get(e.sessionId)
136+
session.finishTimestamp = e.finishTime
137+
updateStoreWithTriggerEnabled(session)
138+
sessionList.remove(e.sessionId)
139+
}
144140

145-
private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit =
146-
Option(sessionList.get(e.sessionId)) match {
147-
case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}")
148-
case Some(sessionData) =>
149-
val info = getOrCreateExecution(
150-
e.id,
151-
e.statement,
152-
e.sessionId,
153-
e.startTime,
154-
e.userName)
155-
156-
info.state = ExecutionState.STARTED
157-
executionList.put(e.id, info)
158-
sessionData.totalExecution += 1
159-
executionList.get(e.id).groupId = e.groupId
160-
updateLiveStore(executionList.get(e.id))
161-
updateLiveStore(sessionData)
162-
}
141+
private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
142+
val info = getOrCreateExecution(
143+
e.id,
144+
e.statement,
145+
e.sessionId,
146+
e.startTime,
147+
e.userName)
148+
149+
info.state = ExecutionState.STARTED
150+
executionList.put(e.id, info)
151+
sessionList.get(e.sessionId).totalExecution += 1
152+
executionList.get(e.id).groupId = e.groupId
153+
updateLiveStore(executionList.get(e.id))
154+
updateLiveStore(sessionList.get(e.sessionId))
155+
}
163156

164-
private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit =
165-
Option(executionList.get(e.id)) match {
166-
case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
167-
case Some(executionData) =>
168-
executionData.executePlan = e.executionPlan
169-
executionData.state = ExecutionState.COMPILED
170-
updateLiveStore(executionData)
171-
}
157+
private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = {
158+
executionList.get(e.id).executePlan = e.executionPlan
159+
executionList.get(e.id).state = ExecutionState.COMPILED
160+
updateLiveStore(executionList.get(e.id))
161+
}
172162

173-
private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
174-
Option(executionList.get(e.id)) match {
175-
case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
176-
case Some(executionData) =>
177-
executionData.finishTimestamp = e.finishTime
178-
executionData.state = ExecutionState.CANCELED
179-
updateLiveStore(executionData)
180-
}
163+
private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = {
164+
executionList.get(e.id).finishTimestamp = e.finishTime
165+
executionList.get(e.id).state = ExecutionState.CANCELED
166+
updateLiveStore(executionList.get(e.id))
167+
}
181168

182-
private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
183-
Option(executionList.get(e.id)) match {
184-
case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
185-
case Some(executionData) =>
186-
executionData.finishTimestamp = e.finishTime
187-
executionData.detail = e.errorMsg
188-
executionData.state = ExecutionState.FAILED
189-
updateLiveStore(executionData)
190-
}
169+
private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = {
170+
executionList.get(e.id).finishTimestamp = e.finishTime
171+
executionList.get(e.id).detail = e.errorMsg
172+
executionList.get(e.id).state = ExecutionState.FAILED
173+
updateLiveStore(executionList.get(e.id))
174+
}
191175

192-
private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
193-
Option(executionList.get(e.id)) match {
194-
case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
195-
case Some(executionData) =>
196-
executionData.finishTimestamp = e.finishTime
197-
executionData.state = ExecutionState.FINISHED
198-
updateLiveStore(executionData)
199-
}
176+
private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = {
177+
executionList.get(e.id).finishTimestamp = e.finishTime
178+
executionList.get(e.id).state = ExecutionState.FINISHED
179+
updateLiveStore(executionList.get(e.id))
180+
}
200181

201-
private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
202-
Option(executionList.get(e.id)) match {
203-
case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
204-
case Some(executionData) =>
205-
executionData.closeTimestamp = e.closeTime
206-
executionData.state = ExecutionState.CLOSED
207-
updateStoreWithTriggerEnabled(executionData)
208-
executionList.remove(e.id)
209-
}
182+
private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = {
183+
executionList.get(e.id).closeTimestamp = e.closeTime
184+
executionList.get(e.id).state = ExecutionState.CLOSED
185+
updateStoreWithTriggerEnabled(executionList.get(e.id))
186+
executionList.remove(e.id)
187+
}
210188

211189
// Update both live and history stores. Trigger is enabled by default, hence
212190
// it will cleanup the entity which exceeds the threshold.

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala

Lines changed: 0 additions & 73 deletions
This file was deleted.

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,22 +140,6 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
140140
assert(listener.noLiveData())
141141
}
142142

143-
test("SPARK-31387 - listener update methods should not throw exception with unknown input") {
144-
val (statusStore: HiveThriftServer2AppStatusStore,
145-
listener: HiveThriftServer2Listener) = createAppStatusStore(true)
146-
val unknownSession = "unknown_session"
147-
val unknownOperation = "unknown_operation"
148-
listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
149-
listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession,
150-
"stmt", "groupId", 0))
151-
listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query"))
152-
listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0))
153-
listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
154-
"msg", "trace", 0))
155-
listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))
156-
listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0))
157-
}
158-
159143
private def createProperties: Properties = {
160144
val properties = new Properties()
161145
properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -636,11 +636,7 @@ public void close() throws HiveSQLException {
636636
acquire(true);
637637
// Iterate through the opHandles and close their operations
638638
for (OperationHandle opHandle : opHandleSet) {
639-
try {
640-
operationManager.closeOperation(opHandle);
641-
} catch (Exception e) {
642-
LOG.warn("Exception is thrown closing operation " + opHandle, e);
643-
}
639+
operationManager.closeOperation(opHandle);
644640
}
645641
opHandleSet.clear();
646642
// Cleanup session log directory.

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -650,11 +650,7 @@ public void close() throws HiveSQLException {
650650
acquire(true);
651651
// Iterate through the opHandles and close their operations
652652
for (OperationHandle opHandle : opHandleSet) {
653-
try {
654-
operationManager.closeOperation(opHandle);
655-
} catch (Exception e) {
656-
LOG.warn("Exception is thrown closing operation " + opHandle, e);
657-
}
653+
operationManager.closeOperation(opHandle);
658654
}
659655
opHandleSet.clear();
660656
// Cleanup session log directory.

0 commit comments

Comments
 (0)