Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func (s *State) get() State {
statePtr := (*int32)(unsafe.Pointer(s))
return State(atomic.LoadInt32(statePtr))
}

func (s *State) set(update State) {
statePtr := (*int32)(unsafe.Pointer(s))
newState := int32(update)
atomic.StoreInt32(statePtr, newState)
}

func (s *State) casState(expect State, update State) bool {
statePtr := (*int32)(unsafe.Pointer(s))
oldState := int32(expect)
Expand All @@ -62,24 +64,29 @@ func (s *State) casState(expect State, update State) bool {
}

type StateChangeListener interface {
OnChangeToClosed(prev State, rule Rule)
// OnTransformToClosed is triggered when when circuit breaker state transformed to Closed.
OnTransformToClosed(prev State, rule Rule)

OnChangeToOpen(prev State, rule Rule, snapshot interface{})
// OnTransformToOpen is triggered when when circuit breaker state transformed to Open.
// The "snapshot" indicates the triggered value when the transformation occurs.
OnTransformToOpen(prev State, rule Rule, snapshot interface{})

OnChangeToHalfOpen(prev State, rule Rule)
// OnTransformToOpen is triggered when when circuit breaker state transformed to HalfOpen.
OnTransformToHalfOpen(prev State, rule Rule)
}

type CircuitBreaker interface {
// BoundRule returns the associated circuit breaking rule.
BoundRule() Rule

// BoundStat returns the associated statistic data structure.
BoundStat() interface{}

// TryPass acquires permission of an invocation only if it is available at the time of invocation.
TryPass(ctx *base.EntryContext) bool

// CurrentState returns current state of the circuit breaker.
CurrentState() State
// HandleCompleted handle the entry completed, Will not call HandleCompleted if request is blocked.
// rt: the response time this entry cost.
HandleCompleted(rt uint64, err error)
// OnRequestComplete record a completed request with the given response time as well as error (if present),
// and handle state transformation of the circuit breaker.
OnRequestComplete(rtt uint64, err error)
}

//================================= circuitBreakerBase ====================================
Expand All @@ -106,54 +113,53 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
atomic.StoreUint64(&b.nextRetryTimestamp, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
}

// fromClosedToOpen update circuit breaker status machine from closed to open
// fromClosedToOpen update circuit breaker state machine from closed to open
// Used for opening circuit breaker from closed when checking circuit breaker
// return true if succeed to update
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
if b.status.casState(Closed, Open) {
b.updateNextRetryTimestamp()
for _, listener := range statusSwitchListeners {
listener.OnChangeToOpen(Closed, b.rule, snapshot)
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(Closed, b.rule, snapshot)
}
return true
}
return false
}

// fromOpenToHalfOpen update circuit breaker status machine from open to half-open
// fromOpenToHalfOpen update circuit breaker state machine from open to half-open
// Used for probing
// return true if succeed to update
func (b *circuitBreakerBase) fromOpenToHalfOpen() bool {
if b.status.casState(Open, HalfOpen) {
for _, listener := range statusSwitchListeners {
listener.OnChangeToHalfOpen(Open, b.rule)
for _, listener := range stateChangeListeners {
listener.OnTransformToHalfOpen(Open, b.rule)
}
return true
}
return false
}

// fromHalfOpenToOpen update circuit breaker status machine from half-open to open
// fromHalfOpenToOpen update circuit breaker state machine from half-open to open
// Used for failing to probe
// return true if succeed to update
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
if b.status.casState(HalfOpen, Open) {
b.updateNextRetryTimestamp()
for _, listener := range statusSwitchListeners {
listener.OnChangeToOpen(HalfOpen, b.rule, snapshot)
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, b.rule, snapshot)
}
return true
}
return false
}

// fromHalfOpenToOpen update circuit breaker status machine from half-open to closed
// Used for succeeding to probe
// return true if succeed to update
// fromHalfOpenToOpen update circuit breaker state machine from half-open to closed
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
if b.status.casState(HalfOpen, Closed) {
for _, listener := range statusSwitchListeners {
listener.OnChangeToClosed(HalfOpen, b.rule)
for _, listener := range stateChangeListeners {
listener.OnTransformToClosed(HalfOpen, b.rule)
}
return true
}
Expand Down Expand Up @@ -198,7 +204,7 @@ func (b *slowRtCircuitBreaker) BoundStat() interface{} {
return b.stat
}

// TryPass check circuit breaker based on status machine of circuit breaker.
// TryPass check circuit breaker based on state machine of circuit breaker.
func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool {
curStatus := b.CurrentState()
if curStatus == Closed {
Expand All @@ -212,7 +218,7 @@ func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool {
return false
}

func (b *slowRtCircuitBreaker) HandleCompleted(rt uint64, err error) {
func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, err error) {
// add slow and add total
metricStat := b.stat
counter := metricStat.currentCounter()
Expand Down Expand Up @@ -393,7 +399,7 @@ func (b *errorRatioCircuitBreaker) TryPass(_ *base.EntryContext) bool {
return false
}

func (b *errorRatioCircuitBreaker) HandleCompleted(rt uint64, err error) {
func (b *errorRatioCircuitBreaker) OnRequestComplete(rt uint64, err error) {
metricStat := b.stat
counter := metricStat.currentCounter()
if err != nil {
Expand Down Expand Up @@ -570,7 +576,7 @@ func (b *errorCountCircuitBreaker) TryPass(_ *base.EntryContext) bool {
return false
}

func (b *errorCountCircuitBreaker) HandleCompleted(rt uint64, err error) {
func (b *errorCountCircuitBreaker) OnRequestComplete(rt uint64, err error) {
metricStat := b.stat
counter := metricStat.currentCounter()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (m *CircuitBreakerMock) CurrentState() State {
return args.Get(0).(State)
}

// HandleCompleted handle the entry completed
// OnRequestComplete handle the entry completed
// rt: the response time this entry cost.
func (m *CircuitBreakerMock) HandleCompleted(rt uint64, err error) {
m.Called(rt, err)
Expand Down
44 changes: 25 additions & 19 deletions core/circuitbreaker/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ import (
"github.com/pkg/errors"
)

// The strategy of circuit breaker
// Each strategy represent one rule type
// The strategy of circuit breaker.
// Each strategy represents one rule type.
type Strategy int8

const (
SlowRt Strategy = iota
SlowRequestRatio Strategy = iota
ErrorRatio
ErrorCount
)

func (s Strategy) String() string {
switch s {
case SlowRt:
return "SlowRt"
case SlowRequestRatio:
return "SlowRequestRatio"
case ErrorRatio:
return "ErrorRatio"
case ErrorCount:
Expand All @@ -33,32 +33,36 @@ func (s Strategy) String() string {
}
}

// The base interface of circuit breaker rule
// Rule represents the base interface of the circuit breaker rule.
type Rule interface {
base.SentinelRule
// return the strategy type
// BreakerStrategy returns the strategy.
BreakerStrategy() Strategy
// check whether the rule is valid and could be converted to corresponding circuit breaker
// IsApplicable checks whether the rule is valid and could be converted to a corresponding circuit breaker.
IsApplicable() error
// return circuit breaker stat interval
// BreakerStatIntervalMs returns the statistic interval of circuit breaker (in milliseconds).
BreakerStatIntervalMs() uint32
// check whether is consistent with new rule
// IsEqualsTo checks whether current rule is consistent with the given rule.
IsEqualsTo(r Rule) bool
// check whether the statistic of old circuit breaker is reuseable for new circuit breaker
// IsStatReusable checks whether current rule is "statistically" equal to the given rule.
IsStatReusable(r Rule) bool
}

// The common fields of circuit breaker rule
// RuleBase encompasses common fields of circuit breaking rule.
type RuleBase struct {
// unique id
Id string
// resource name
Resource string
Strategy Strategy
// auto recover timeout in ms, all requests would be broken before auto recover
RetryTimeoutMs uint32
// RetryTimeoutMs represents recovery timeout (in seconds) before the circuit breaker opens.
// During the open period, no requests are permitted until the timeout has elapsed.
// After that, the circuit breaker will transform to half-open state for trying a few "trial" requests.
RetryTimeoutMs uint32
// MinRequestAmount represents the minimum number of requests (in an active statistic time span)
// that can trigger circuit breaking.
MinRequestAmount uint64

// StatIntervalMs represents statistic time interval of the internal circuit breaker (in ms).
StatIntervalMs uint32
}

Expand Down Expand Up @@ -92,11 +96,13 @@ func (b *RuleBase) ResourceName() string {
return b.Resource
}

// SlowRt circuit breaker rule
// SlowRequestRatio circuit breaker rule
type slowRtRule struct {
RuleBase
// max allowed rt in ms
MaxAllowedRt uint64
// MaxAllowedRt indicates that any invocation whose response time exceeds this value
// will be recorded as a slow request.
MaxAllowedRt uint64
// MaxSlowRequestRatio represents the threshold of slow request ratio.
MaxSlowRequestRatio float64
}

Expand All @@ -105,7 +111,7 @@ func NewSlowRtRule(resource string, intervalMs uint32, retryTimeoutMs uint32, ma
RuleBase: RuleBase{
Id: util.NewUuid(),
Resource: resource,
Strategy: SlowRt,
Strategy: SlowRequestRatio,
RetryTimeoutMs: retryTimeoutMs,
MinRequestAmount: minRequestAmount,
StatIntervalMs: intervalMs,
Expand Down
45 changes: 23 additions & 22 deletions core/circuitbreaker/rule_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package circuitbreaker

import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/alibaba/sentinel-golang/logging"
Expand All @@ -19,11 +19,11 @@ var (
breakers = make(map[string][]CircuitBreaker)
updateMux = &sync.RWMutex{}

statusSwitchListeners = make([]StateChangeListener, 0)
stateChangeListeners = make([]StateChangeListener, 0)
)

func init() {
cbGenFuncMap[SlowRt] = func(r Rule, reuseStat interface{}) CircuitBreaker {
cbGenFuncMap[SlowRequestRatio] = func(r Rule, reuseStat interface{}) CircuitBreaker {
rtRule, ok := r.(*slowRtRule)
if !ok || rtRule == nil {
return nil
Expand Down Expand Up @@ -82,9 +82,10 @@ func GetResRules(resource string) []Rule {
return ret
}

// Load the newer rules to manager.
// rules: the newer rules, if len of rules is 0, will clear all rules of manager.
// LoadRules replaces old rules with the given circuit breaking rules.
//
// return value:
//
// bool: was designed to indicate whether the internal map has been changed
// error: was designed to indicate whether occurs the error.
func LoadRules(rules []Rule) (bool, error) {
Expand Down Expand Up @@ -123,7 +124,7 @@ func onRuleUpdate(rules []Rule) (err error) {
continue
}
if err := rule.IsApplicable(); err != nil {
logger.Warnf("Ignoring invalid breaker rule when loading new rules,rule: %+v, err: %+v.", rule, err)
logger.Warnf("Ignoring invalid circuit breaking rule when loading new rules, rule: %+v, err: %+v", rule, err)
continue
}

Expand All @@ -144,6 +145,7 @@ func onRuleUpdate(rules []Rule) (err error) {
for res, resRules := range newBreakerRules {
emptyCircuitBreakerList := make([]CircuitBreaker, 0, 0)
for _, r := range resRules {
// TODO: rearrange the code here.
oldResCbs := breakers[res]
if oldResCbs == nil {
oldResCbs = emptyCircuitBreakerList
Expand Down Expand Up @@ -239,33 +241,32 @@ func rulesFrom(rm map[string][]Rule) []Rule {
}

func logRuleUpdate(rules map[string][]Rule) {
sb := strings.Builder{}
sb.WriteString("[CircuitBreakerRuleManager] succeed to load circuit breakers:\n")
for _, rule := range rulesFrom(rules) {
sb.WriteString(rule.String())
sb.WriteString("\n")
s, err := json.Marshal(rules)
if err != nil {
logger.Info("Circuit breaking rules loaded")
} else {
logger.Infof("Circuit breaking rules loaded: %s", s)
}
logger.Info(sb.String())
}

func RegisterStatusSwitchListeners(listeners ...StateChangeListener) {
func RegisterStateChangeListeners(listeners ...StateChangeListener) {
if len(listeners) == 0 {
return
}
updateMux.Lock()
defer updateMux.Unlock()

statusSwitchListeners = append(statusSwitchListeners, listeners...)
stateChangeListeners = append(stateChangeListeners, listeners...)
}

// SetTrafficShapingGenerator sets the traffic controller generator for the given control behavior.
// Note that modifying the generator of default control behaviors is not allowed.
func SetTrafficShapingGenerator(s Strategy, generator CircuitBreakerGenFunc) error {
// SetCircuitBreakerGenerator sets the circuit breaker generator for the given strategy.
// Note that modifying the generator of default strategies is not allowed.
func SetCircuitBreakerGenerator(s Strategy, generator CircuitBreakerGenFunc) error {
if generator == nil {
return errors.New("nil generator")
}
if s >= SlowRt && s <= ErrorCount {
return errors.New("not allowed to replace the generator for default control behaviors")
if s >= SlowRequestRatio && s <= ErrorCount {
return errors.New("not allowed to replace the generator for default circuit breaking strategies")
}
updateMux.Lock()
defer updateMux.Unlock()
Expand All @@ -274,9 +275,9 @@ func SetTrafficShapingGenerator(s Strategy, generator CircuitBreakerGenFunc) err
return nil
}

func RemoveTrafficShapingGenerator(s Strategy) error {
if s >= SlowRt && s <= ErrorCount {
return errors.New("not allowed to replace the generator for default control behaviors")
func RemoveCircuitBreakerGenerator(s Strategy) error {
if s >= SlowRequestRatio && s <= ErrorCount {
return errors.New("not allowed to replace the generator for default circuit breaking strategies")
}
updateMux.Lock()
defer updateMux.Unlock()
Expand Down
Loading