1
1
import logging
2
2
import time
3
3
from enum import Enum
4
- from math import ceil
5
- from typing import Any , Literal , NotRequired , TypedDict , cast , overload
4
+ from typing import Literal , NotRequired , TypedDict , overload
6
5
7
6
from django .core .cache import cache
8
7
15
14
16
15
logger = logging .getLogger (__name__ )
17
16
17
+ # How many times stricter to be with the error limit during recovery
18
+ DEFAULT_RECOVERY_STRICTNESS = 10
19
+
18
20
19
21
class CircuitBreakerState (Enum ):
20
22
CLOSED = "circuit_closed"
@@ -27,41 +29,35 @@ class CircuitBreakerConfig(TypedDict):
27
29
error_limit : int
28
30
# The time period, in seconds, over which we're tracking errors
29
31
error_limit_window : int
30
- # The length, in seconds, of each time bucket ("granule") used by the underlying rate limiter -
31
- # effectively the resolution of the time window. Will be set automatically based on
32
- # `error_limit_window` if not provided.
33
- error_limit_window_granularity : NotRequired [int ]
34
32
# How long, in seconds, to stay in the broken state (blocking all requests) before entering the
35
33
# recovery phase
36
34
broken_state_duration : int
37
- # The number of errors within the given time period necessary to trip the breaker while in recovery
38
- recovery_error_limit : int
39
- # The time period, in seconds, over which we're tracking errors in recovery
40
- recovery_error_limit_window : int
35
+ # The number of errors within the given time period necessary to trip the breaker while in
36
+ # recovery. Will be set automatically based on `error_limit` if not provided
37
+ recovery_error_limit : NotRequired [int ]
41
38
# The length, in seconds, of each time bucket ("granule") used by the underlying rate limiter -
42
39
# effectively the resolution of the time window. Will be set automatically based on
43
- # `recovery_error_limit_window ` if not provided.
44
- recovery_error_limit_window_granularity : NotRequired [int ]
40
+ # `error_limit_window ` if not provided.
41
+ error_limit_window_granularity : NotRequired [int ]
45
42
# How long, in seconds, to stay in the recovery state (allowing requests but with a stricter
46
- # error limit) before returning to normal operation.
47
- recovery_duration : int
43
+ # error limit) before returning to normal operation. Will be set based on `error_limit_window`
44
+ # if not provided.
45
+ recovery_duration : NotRequired [int ]
48
46
49
47
50
- # TODO: These limits were estimated based on EA traffic. (In an average 10 min period, there are
51
- # roughly 35K events without matching hashes. About 2% of orgs are EA, so for simplicity, assume 2%
52
- # of those events are from EA orgs. If we're willing to tolerate up to a 95% failure rate, then we
53
- # need 35K * 0.02 * 0.95 events to fail to trip the breaker. Technically that's 665, not 666, but
54
- # we're talking about everything going to hell, so the bump to 666 seemed appropriate!)
48
+ # TODO: The default error limit here was estimated based on EA traffic. (In an average 10 min
49
+ # period, there are roughly 35K events without matching hashes. About 2% of orgs are EA, so for
50
+ # simplicity, assume 2% of those events are from EA orgs. If we're willing to tolerate up to a 95%
51
+ # failure rate, then we need 35K * 0.02 * 0.95 events to fail to trip the breaker. Technically
52
+ # that's 665, not 666, but we're talking about everything going to hell, so the bump to 666 seemed
53
+ # appropriate!)
55
54
#
56
55
# When we GA, we should multiply both the limits by 50 (to remove the 2% part of the current
57
- # calculation).
56
+ # calculation), and remove this TODO .
58
57
CIRCUIT_BREAKER_DEFAULT_CONFIG : CircuitBreakerConfig = {
59
58
"error_limit" : 666 ,
60
59
"error_limit_window" : 600 , # 10 min
61
60
"broken_state_duration" : 300 , # 5 min
62
- "recovery_error_limit" : 3 , # In recovery, we're twice as strict as normal limit
63
- "recovery_error_limit_window" : 60 , # And we bail much more quickly
64
- "recovery_duration" : 300 , # 5 min
65
61
}
66
62
67
63
@@ -108,66 +104,80 @@ def get_top_dogs(payload):
108
104
109
105
return format_hof_entries(response)
110
106
111
- The `breaker.should_allow_request()` check can alternatively be used outside of
112
- `get_top_dogs`, to prevent calls to it. In that case, the original `breaker` object can be
113
- imported alongside `get_top_dogs` or reinstantiated with the same config - it has no state of
114
- its own, instead relying on redis and the cache to track error count and breaker status.
107
+ The `breaker.should_allow_request()` check can alternatively be used outside of `get_top_dogs`,
108
+ to prevent calls to it. In that case, the original `breaker` object can be imported alongside
109
+ `get_top_dogs` or reinstantiated with the same config - it has no state of its own, instead
110
+ relying on redis-backed rate limiters and the cache to track error count and breaker status.
115
111
"""
116
112
117
- def __init__ (self , key : str , config : CircuitBreakerConfig | None = None ):
113
+ def __init__ (self , key : str , config : CircuitBreakerConfig = CIRCUIT_BREAKER_DEFAULT_CONFIG ):
118
114
self .key = key
119
115
self .broken_state_key = f"{ key } .circuit_breaker.broken"
120
116
self .recovery_state_key = f"{ key } .circuit_breaker.in_recovery"
121
117
122
- final_config : CircuitBreakerConfig = {
123
- ** CIRCUIT_BREAKER_DEFAULT_CONFIG ,
124
- ** (config or cast (Any , {})),
125
- }
126
- default_window_granularity = self ._get_default_window_granularity (
127
- final_config ["error_limit_window" ]
118
+ self .error_limit = config ["error_limit" ]
119
+ self .recovery_error_limit = config .get (
120
+ "recovery_error_limit" , max (self .error_limit // DEFAULT_RECOVERY_STRICTNESS , 1 )
128
121
)
129
- default_recovery_window_granularity = self ._get_default_window_granularity (
130
- final_config ["recovery_error_limit_window" ]
122
+ self .window = config ["error_limit_window" ]
123
+ self .window_granularity = config .get (
124
+ "error_limit_window_granularity" , max (self .window // 10 , 5 )
131
125
)
126
+ self .broken_state_duration = config ["broken_state_duration" ]
127
+ self .recovery_duration = config .get ("recovery_duration" , self .window * 2 )
132
128
133
129
self .limiter = RedisSlidingWindowRateLimiter ()
134
130
self .primary_quota = Quota (
135
- final_config [ "error_limit_window" ] ,
136
- final_config . get ( "error_limit_window_granularity" , default_window_granularity ) ,
137
- final_config [ " error_limit" ] ,
131
+ self . window ,
132
+ self . window_granularity ,
133
+ self . error_limit ,
138
134
f"{ key } .circuit_breaker" ,
139
135
)
140
136
self .recovery_quota = Quota (
141
- final_config ["recovery_error_limit_window" ],
142
- final_config .get (
143
- "recovery_error_limit_window_granularity" , default_recovery_window_granularity
144
- ),
145
- final_config ["recovery_error_limit" ],
137
+ self .window ,
138
+ self .window_granularity ,
139
+ self .recovery_error_limit ,
146
140
f"{ key } .circuit_breaker_recovery" ,
147
141
)
148
142
149
- self .broken_state_duration = final_config ["broken_state_duration" ]
150
- self .recovery_duration = final_config ["recovery_duration" ]
151
-
152
- if self .recovery_duration < final_config ["recovery_error_limit_window" ]:
143
+ if self .recovery_error_limit >= self .error_limit :
144
+ self .recovery_error_limit = max (self .error_limit // DEFAULT_RECOVERY_STRICTNESS , 1 )
153
145
logger .warning (
154
- "Circuit breaker %s has `recovery_duration` < `recovery_error_limit_window`."
155
- + " Recovery duration has been reset to match the window." ,
146
+ "Circuit breaker '%s' has a recovery error limit (%d) greater than"
147
+ + " or equal to its error limit (%d). Using the stricter error-limit-based default"
148
+ + " (%d) instead." ,
156
149
key ,
150
+ config ["recovery_error_limit" ],
151
+ self .error_limit ,
152
+ self .recovery_error_limit ,
157
153
)
158
- self .recovery_duration = final_config ["recovery_error_limit_window" ]
159
- if self .recovery_duration < final_config ["recovery_error_limit_window" ]:
154
+
155
+ # XXX: If we discover we have a config where we want this combo to work, we can consider
156
+ # using the `MockCircuitBreaker._clear_quota` helper, which is currently only used in tests,
157
+ # to clear out the main quota when we switch to the broken state. (It will need tests of its
158
+ # own if so.)
159
+ if self .broken_state_duration + self .recovery_duration < self .window :
160
160
logger .warning (
161
- "Circuit breaker %s has `recovery_duration` < `recovery_error_limit_window`."
162
- + " Recovery duration has been reset to match the window." ,
161
+ "Circuit breaker '%s' has broken state and recovery durations (%d and %d sec)"
162
+ + " which together are less than the main error limit window (%d sec). This"
163
+ + " can lead to the breaker getting tripped unexpectedly, until the original"
164
+ + " spike in errors clears the main time window." ,
163
165
key ,
166
+ self .broken_state_duration ,
167
+ self .recovery_duration ,
168
+ self .window ,
164
169
)
165
- self .recovery_duration = final_config ["recovery_error_limit_window" ]
166
170
167
171
def record_error (self ) -> None :
172
+ """
173
+ Record a single error towards the breaker's quota, and handle the case where that error puts
174
+ us over the limit.
175
+ """
168
176
state , seconds_left_in_state = self ._get_state_and_remaining_time ()
169
177
170
178
if state == CircuitBreakerState .BROKEN :
179
+ assert seconds_left_in_state is not None # mypy appeasement
180
+
171
181
# If the circuit is broken, and `should_allow_request` is being used correctly, requests
172
182
# should be blocked and we shouldn't even be here. That said, maybe there was a race
173
183
# condition, so make sure the circuit hasn't just broken before crying foul.
@@ -222,6 +232,10 @@ def record_error(self) -> None:
222
232
cache .set (self .recovery_state_key , recovery_state_expiry , recovery_state_timeout )
223
233
224
234
def should_allow_request (self ) -> bool :
235
+ """
236
+ Determine, based on the current state of the breaker and the number of allowable errors
237
+ remaining, whether requests should be allowed through.
238
+ """
225
239
state , _ = self ._get_state_and_remaining_time ()
226
240
227
241
if state == CircuitBreakerState .BROKEN :
@@ -255,8 +269,13 @@ def get_remaining_error_quota(
255
269
self , quota : Quota | None = None , window_end : int | None = None
256
270
) -> int | None :
257
271
"""
258
- # TODO: write me
259
- returns None when in broken state
272
+ Get the number of allowable errors remaining in the given quota for the time window ending
273
+ at the given time.
274
+
275
+ If no quota is given, in closed and recovery states, return the current controlling quota's
276
+ remaining errors. In broken state, return None.
277
+
278
+ If no time window end is given, return the current amount of quota remaining.
260
279
"""
261
280
if not quota :
262
281
quota = self ._get_controlling_quota ()
@@ -273,12 +292,6 @@ def get_remaining_error_quota(
273
292
274
293
return result [0 ].granted
275
294
276
- def _get_default_window_granularity (self , window_duration : int ) -> int :
277
- # Never more than 10 buckets, and no bucket smaller than 5 seconds. If greater precision is
278
- # needed, the `error_limit_window_granularity` and `recovery_error_limit_window_granularity`
279
- # config options can be used.
280
- return max (ceil (window_duration / 10 ), 5 )
281
-
282
295
@overload
283
296
def _get_controlling_quota (
284
297
self , state : Literal [CircuitBreakerState .CLOSED , CircuitBreakerState .RECOVERY ]
@@ -295,8 +308,8 @@ def _get_controlling_quota(self) -> Quota | None:
295
308
296
309
def _get_controlling_quota (self , state : CircuitBreakerState | None = None ) -> Quota | None :
297
310
"""
298
- # TODO: write me
299
- returns None when in broken state
311
+ Return the Quota corresponding to the given breaker state (or the current breaker state, if
312
+ no state is provided). If the state is question is the broken state, return None.
300
313
"""
301
314
controlling_quota_by_state = {
302
315
CircuitBreakerState .CLOSED : self .primary_quota ,
@@ -308,10 +321,13 @@ def _get_controlling_quota(self, state: CircuitBreakerState | None = None) -> Qu
308
321
309
322
return controlling_quota_by_state [_state ]
310
323
311
- def _get_state_and_remaining_time (self ) -> tuple [CircuitBreakerState , int ]:
324
+ def _get_state_and_remaining_time (
325
+ self ,
326
+ ) -> tuple [CircuitBreakerState , int | None ]:
312
327
"""
313
328
Return the current state of the breaker (closed, broken, or in recovery), along with the
314
- number of seconds until that state expires.
329
+ number of seconds until that state expires (or `None` when in closed state, as it has no
330
+ expiry).
315
331
"""
316
332
now = int (time .time ())
317
333
@@ -322,7 +338,4 @@ def _get_state_and_remaining_time(self) -> tuple[CircuitBreakerState, int]:
322
338
if cache .has_key (self .recovery_state_key ):
323
339
return (CircuitBreakerState .RECOVERY , cache .get (self .recovery_state_key ) - now )
324
340
325
- # TODO Fix this with overloads?
326
- # 0 here is just a placeholder, as "remaining seconds" doesn't really apply to a state we
327
- # hope to stay in indefinitely
328
- return (CircuitBreakerState .CLOSED , 0 )
341
+ return (CircuitBreakerState .CLOSED , None )
0 commit comments