Skip to content

Commit 138a1a5

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-7299][CARMEL-4058] Workload Circuit Breaker (apache#104)
1 parent 68cc8f5 commit 138a1a5

File tree

22 files changed

+518
-24
lines changed

22 files changed

+518
-24
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ class SparkContext(config: SparkConf) extends Logging {
243243
private var _kafkaStore: Option[KafkaStore[String, String]] = _
244244
private var _userResourceManager: Option[UserResourceManager] = _
245245
private val _queryFeatureMetrics = new QueryFeatureMetrics
246+
private var _workloadCircuitBreaker: Option[WorkloadCircuitBreaker] = _
246247

247248
/* ------------------------------------------------------------------------------------- *
248249
| Accessors and public fields. These provide access to the internal state of the |
@@ -666,6 +667,12 @@ class SparkContext(config: SparkConf) extends Logging {
666667
None
667668
}
668669

670+
_workloadCircuitBreaker =
671+
if (conf.getBoolean(SparkContext.SPARK_WORKLOAD_CIRCUIT_BREAKER_ENABLED, false)) {
672+
Some(new WorkloadCircuitBreaker(this,
673+
conf.get(SparkContext.SPARK_WORKLOAD_CIRCUIT_BREAKER_UNBLOCK_SESSION_TYPE, "")))
674+
} else None
675+
669676
_cleaner =
670677
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
671678
Some(new ContextCleaner(this, _shuffleDriverComponents))
@@ -849,8 +856,12 @@ class SparkContext(config: SparkConf) extends Logging {
849856
private[spark] def userResourceManager: Option[UserResourceManager] =
850857
_userResourceManager
851858

859+
852860
private[spark] def queryFeatureMetrics: QueryFeatureMetrics = _queryFeatureMetrics
853861

862+
private[spark] def workloadCircuitBreaker: Option[WorkloadCircuitBreaker] =
863+
_workloadCircuitBreaker
864+
854865
/**
855866
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
856867
* different value or cleared.
@@ -3112,6 +3123,10 @@ object SparkContext extends Logging {
31123123
"spark.carmel.user.resource.management.enabled"
31133124
private[spark] val USER_RESOURCE_CONSUMER_ID = "spark.user.resource.consumerId"
31143125
private[spark] val USER_RESOURCE_CONSUMER_PROFILE = "spark.user.resource.consumerProfile"
3126+
private[spark] val SPARK_WORKLOAD_CIRCUIT_BREAKER_ENABLED =
3127+
"spark.workload.circuit.breaker.enabled"
3128+
private[spark] val SPARK_WORKLOAD_CIRCUIT_BREAKER_UNBLOCK_SESSION_TYPE =
3129+
"spark.workload.circuit.breaker.unblock.session.types"
31153130

31163131
/**
31173132
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSchedulerImpl.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import scala.collection.JavaConverters._
2929
import scala.collection.concurrent.TrieMap
3030
import scala.collection.mutable.{ArrayBuffer, Buffer}
3131

32+
import com.google.common.cache.{Cache, CacheBuilder}
33+
3234
import org.apache.spark._
3335
import org.apache.spark.TaskState.TaskState
3436
import org.apache.spark.executor.ExecutorMetrics
@@ -87,6 +89,11 @@ private[spark] class AnalyticsTaskSchedulerImpl(
8789

8890
val ZOMBIE_CHECK_INTERVAL_MS = conf.getTimeAsMs("spark.zombie.interval", "180s")
8991
val ZOMBIE_DURATION_MS = conf.getTimeAsMs("spark.zombie.duration", "900s")
92+
val WORKLOAD_CIRCUIT_BREAKER_USER_DURATION_S = conf.
93+
getTimeAsSeconds("spark.workload.circuit.breaker.user.duration", "2h")
94+
// store all block user queries and blockTime map
95+
private val blockUserQueries: Cache[String, java.lang.Long] = CacheBuilder.newBuilder()
96+
.maximumSize(10000).build[String, java.lang.Long]()
9097
private val taskSetHealthChecker =
9198
ThreadUtils.newDaemonSingleThreadScheduledExecutor("taskset-health-checker")
9299

@@ -934,13 +941,33 @@ private[spark] class AnalyticsTaskSchedulerImpl(
934941
}
935942
}
936943

937-
if (totalTaskTime > conf.getLong("spark.stage.maxTotalTaskTime", 10000 * 35 * 60 * 1000L) &&
944+
if (!taskSet.isZombie &&
945+
totalTaskTime > conf.getLong("spark.stage.maxTotalTaskTime", 10000 * 35 * 60 * 1000L) &&
938946
finishedRate < conf.getDouble("spark.stage.cancel.minFinishedRate", 0.95)) {
939-
logInfo(s"Cancel stage ${taskSet.stageId}-${taskSet.taskSet.stageAttemptId}, since the " +
940-
s"total task time $totalTaskTime exceeds the threshold, " +
947+
val groupId = taskSet.taskSet.properties.getProperty("spark.jobGroup.id", "")
948+
val sessionId = taskSet.taskSet.properties.getProperty("spark.hive.session.id", "")
949+
val sessionType = taskSet.taskSet.properties.getProperty("spark.sql.session.type", "")
950+
val isZombie = taskSet.isZombie
951+
logInfo(s"Cancel stage ${taskSet.stageId}-${taskSet.taskSet.stageAttemptId} of " +
952+
s"group $groupId, since the total task time $totalTaskTime exceeds the threshold, " +
941953
s"and only $finishedRate rate finished!")
942-
dagScheduler.cancelStage(taskSet.stageId, Some(s"Total task time $totalTaskTime in " +
943-
s"stage ${taskSet.stageId}-${taskSet.taskSet.stageAttemptId} exceeded the max value!"))
954+
955+
val reason = s"Total task time $totalTaskTime in " +
956+
s"stage ${taskSet.stageId}-${taskSet.taskSet.stageAttemptId} of group $groupId " +
957+
s"exceeded the max value!"
958+
dagScheduler.cancelStage(taskSet.stageId, Some(reason))
959+
960+
if (taskSet.userInfo.isDefined && !blockUserQueries.asMap.containsKey(groupId)) {
961+
val message = Some(s"Heavy query detected from session[${sessionId}], " +
962+
s"query[$groupId], check more details: " +
963+
s"http://viewpoint.hermes-prod.svc.25.tess.io/?session=${sessionId}, " +
964+
s"root cause: ")
965+
dagScheduler.sc.workloadCircuitBreaker.
966+
foreach(_.blockUser(sessionType, taskSet.userInfo().get.user,
967+
WORKLOAD_CIRCUIT_BREAKER_USER_DURATION_S, Some(message + reason)))
968+
// make sure that same query won't block user multiple times
969+
blockUserQueries.put(groupId, System.currentTimeMillis())
970+
}
944971
}
945972
}
946973
}

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSetManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private[spark] class AnalyticsTaskSetManager(
9898

9999
private val userResourceManager = sched.sc.userResourceManager
100100

101-
def userInfo(): Option[UserInfo] = _userInfo
101+
override def userInfo(): Option[UserInfo] = _userInfo
102102

103103
// Add all our tasks to the pending lists. We do this in reverse order
104104
// of task index so that tasks with low indices get launched first.

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ private[spark] class TaskSetManager(
239239
t.epoch = epoch
240240
}
241241

242+
def userInfo(): Option[UserInfo] = None
243+
242244
// Add all our tasks to the pending lists. We do this in reverse order
243245
// of task index so that tasks with low indices get launched first.
244246
addPendingTasks()

core/src/main/scala/org/apache/spark/scheduler/UserResourceManager.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.function.BiFunction
2222

2323
import scala.collection.mutable.{HashMap, HashSet}
2424

25-
import org.apache.spark.{CarmelSparkException, SparkContext}
25+
import org.apache.spark.{CarmelSparkException, SparkContext, SparkException}
2626
import org.apache.spark.internal.Logging
2727

2828
/**
@@ -225,23 +225,22 @@ class UserResourceManager(sc: SparkContext) extends Logging {
225225
// }
226226
}
227227

228-
// TODO: Uncomment this after workload circuit breaker merged
229228
def requestQuery(userInfo: UserInfo): Unit = {
230229
throwIfUserBlocked(userInfo)
231230
}
232231

233232
private def throwIfUserBlocked(userInfo: UserInfo): Unit = {
234-
// val res = if (sc.workloadCircuitBreaker.isDefined) {
235-
// sc.workloadCircuitBreaker.get.isUserBlocked(userInfo.user)
236-
// } else {
237-
// UserBlockResult(false, userInfo.user, "0 s", None)
238-
// }
239-
//
240-
// if (res.isBlocked) {
241-
// throw new SparkException(s"Workload circuit breaker is applied for user " +
242-
// s"[${userInfo.user}] due to [${res.reason.getOrElse("Unknown reason")}], " +
243-
// s"the block will last ${res.leftDuration}")
244-
// }
233+
val res = if (sc.workloadCircuitBreaker.isDefined) {
234+
sc.workloadCircuitBreaker.get.isUserBlocked(userInfo.user)
235+
} else {
236+
UserBlockResult(false, userInfo.user, "0 s", None)
237+
}
238+
239+
if (res.isBlocked) {
240+
throw new SparkException(s"Workload circuit breaker is applied for user " +
241+
s"[${userInfo.user}] due to [${res.reason.getOrElse("Unknown reason")}], " +
242+
s"the block will last ${res.leftDuration}")
243+
}
245244
}
246245
}
247246

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.util
21+
import java.util.concurrent.ConcurrentHashMap
22+
23+
import org.apache.commons.lang3.StringUtils
24+
25+
import org.apache.spark.SparkContext
26+
import org.apache.spark.internal.Logging
27+
28+
class WorkloadCircuitBreaker(sc: SparkContext, unblockSessionTypes: String) extends Logging {
29+
private val unblockSessionTypeList = util.Arrays.asList(StringUtils.split(
30+
StringUtils.trimToEmpty(unblockSessionTypes), ","))
31+
32+
private val users = new ConcurrentHashMap[String, BlockedUser]()
33+
34+
def blockUser(sessionType: String, user: String, durationInSeconds: Long,
35+
reason: Option[String] = None): UserBlockResult = {
36+
if (sessionType != null && unblockSessionTypeList.contains(sessionType)) {
37+
logInfo(s"Skip blocking session with type: $sessionType in $unblockSessionTypeList, " +
38+
s"source reason: $reason")
39+
} else {
40+
users.putIfAbsent(user, new BlockedUser(user))
41+
users.computeIfPresent(user,
42+
(user: String, blockedUser: BlockedUser) => {
43+
blockedUser.blockUser(durationInSeconds, reason)
44+
logInfo(s"User [$user] is blocked with session type [$sessionType] due to $reason.")
45+
blockedUser
46+
})
47+
}
48+
isUserBlocked(user)
49+
}
50+
51+
def isUserBlocked(user: String): UserBlockResult = {
52+
if (users.containsKey(user)) {
53+
users.get(user).isBlocked
54+
} else {
55+
UserBlockResult(isBlocked = false, user, "0 s", None)
56+
}
57+
}
58+
59+
def unBlockUser(user: String): Unit = {
60+
users.remove(user)
61+
}
62+
63+
def getBlockedUsers: Seq[UserBlockResult] = {
64+
import scala.collection.JavaConverters._
65+
users.keys().asScala.map(isUserBlocked).toSeq
66+
}
67+
}
68+
69+
case class UserBlockResult(isBlocked: Boolean, user: String,
70+
leftDuration: String, reason: Option[String] = None)
71+
72+
case class BlockDuration(startTime: Long, endTime: Long, reason: Option[String] = None)
73+
74+
private trait BlockedItem
75+
76+
private class BlockedUser(val user: String) extends BlockedItem {
77+
private var blockDuration: Option[BlockDuration] = None
78+
79+
def blockUser(durationInSeconds: Long, reason: Option[String] = None): Unit = {
80+
val startTime = System.currentTimeMillis()
81+
val endTime = startTime + durationInSeconds * 1000
82+
if (blockDuration.isEmpty || blockDuration.get.endTime < endTime) {
83+
blockDuration = Some(BlockDuration(startTime, endTime, reason))
84+
}
85+
}
86+
87+
def isBlocked: UserBlockResult = {
88+
val cur = System.currentTimeMillis()
89+
if (blockDuration.isDefined &&
90+
blockDuration.get.startTime <= cur &&
91+
blockDuration.get.endTime > cur) {
92+
UserBlockResult(isBlocked = true, user,
93+
formatDuration(blockDuration.get.endTime - cur), blockDuration.get.reason)
94+
} else {
95+
UserBlockResult(isBlocked = false, user, "0 s", None)
96+
}
97+
}
98+
99+
private def formatDuration(secs: Long): String = {
100+
val seconds = secs.toDouble / 1000
101+
if (seconds < 1) {
102+
"%.1f s".format(seconds)
103+
} else if (seconds < 60) {
104+
"%.0f s".format(seconds)
105+
} else {
106+
val minutes = seconds / 60
107+
if (minutes < 10) {
108+
"%.1f min".format(minutes)
109+
} else if (minutes < 60) {
110+
"%.0f min".format(minutes)
111+
} else {
112+
val hours = minutes / 60
113+
"%.1f h".format(hours)
114+
}
115+
}
116+
}
117+
}

docs/sql-ref-ansi-compliance.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ Below is a list of all the keywords in Spark SQL.
370370
|BETWEEN|non-reserved|non-reserved|reserved|
371371
|BIGINT|non-reserved|non-reserved|reserved|
372372
|BINARY|non-reserved|non-reserved|reserved|
373+
|BLOCK|non-reserved|non-reserved|non-reserved|
373374
|BOOLEAN|non-reserved|non-reserved|reserved|
374375
|BOTH|reserved|non-reserved|reserved|
375376
|BUCKET|non-reserved|non-reserved|non-reserved|
@@ -438,6 +439,7 @@ Below is a list of all the keywords in Spark SQL.
438439
|DISTRIBUTE|non-reserved|non-reserved|non-reserved|
439440
|DIV|non-reserved|non-reserved|not a keyword|
440441
|DOUBLE|non-reserved|non-reserved|reserved|
442+
|DURATION|non-reserved|non-reserved|non-reserved|
441443
|DROP|non-reserved|non-reserved|reserved|
442444
|ELSE|reserved|non-reserved|reserved|
443445
|END|reserved|non-reserved|reserved|
@@ -574,6 +576,7 @@ Below is a list of all the keywords in Spark SQL.
574576
|QUERY|non-reserved|non-reserved|non-reserved|
575577
|RANGE|non-reserved|non-reserved|reserved|
576578
|REAL|non-reserved|non-reserved|reserved|
579+
|REASON|non-reserved|non-reserved|non-reserved|
577580
|RECORDREADER|non-reserved|non-reserved|non-reserved|
578581
|RECORDWRITER|non-reserved|non-reserved|non-reserved|
579582
|RECOVER|non-reserved|non-reserved|non-reserved|
@@ -658,6 +661,7 @@ Below is a list of all the keywords in Spark SQL.
658661
|TRY_CAST|non-reserved|non-reserved|non-reserved|
659662
|TYPE|non-reserved|non-reserved|non-reserved|
660663
|UNARCHIVE|non-reserved|non-reserved|non-reserved|
664+
|UNBLOCK|non-reserved|non-reserved|non-reserved|
661665
|UNBOUNDED|non-reserved|non-reserved|non-reserved|
662666
|UNCACHE|non-reserved|non-reserved|non-reserved|
663667
|UNION|reserved|strict-non-reserved|reserved|

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ AUTHORIZATION: 'AUTHORIZATION';
108108
BETWEEN: 'BETWEEN';
109109
BIGINT: 'BIGINT';
110110
BINARY: 'BINARY';
111+
BLOCK: 'BLOCK';
111112
BOOLEAN: 'BOOLEAN';
112113
BOTH: 'BOTH';
113114
BUCKET: 'BUCKET';
@@ -177,6 +178,7 @@ DISTRIBUTE: 'DISTRIBUTE';
177178
DIV: 'DIV';
178179
DOUBLE: 'DOUBLE';
179180
DROP: 'DROP';
181+
DURATION: 'DURATION';
180182
ELSE: 'ELSE';
181183
END: 'END';
182184
ESCAPE: 'ESCAPE';
@@ -312,6 +314,7 @@ QUARTER: 'QUARTER';
312314
QUERY: 'QUERY';
313315
RANGE: 'RANGE';
314316
REAL: 'REAL';
317+
REASON: 'REASON';
315318
RECORDREADER: 'RECORDREADER';
316319
RECORDWRITER: 'RECORDWRITER';
317320
RECOVER: 'RECOVER';
@@ -395,6 +398,7 @@ TRUNCATE: 'TRUNCATE';
395398
TRY_CAST: 'TRY_CAST';
396399
TYPE: 'TYPE';
397400
UNARCHIVE: 'UNARCHIVE';
401+
UNBLOCK: 'UNBLOCK';
398402
UNBOUNDED: 'UNBOUNDED';
399403
UNCACHE: 'UNCACHE';
400404
UNION: 'UNION';

0 commit comments

Comments
 (0)