diff --git a/core/circuitbreaker/circuit_breaker.go b/core/circuitbreaker/circuit_breaker.go index ae82aada3..1276b2510 100644 --- a/core/circuitbreaker/circuit_breaker.go +++ b/core/circuitbreaker/circuit_breaker.go @@ -49,11 +49,13 @@ func (s *State) get() State { statePtr := (*int32)(unsafe.Pointer(s)) return State(atomic.LoadInt32(statePtr)) } + func (s *State) set(update State) { statePtr := (*int32)(unsafe.Pointer(s)) newState := int32(update) atomic.StoreInt32(statePtr, newState) } + func (s *State) casState(expect State, update State) bool { statePtr := (*int32)(unsafe.Pointer(s)) oldState := int32(expect) @@ -62,24 +64,29 @@ func (s *State) casState(expect State, update State) bool { } type StateChangeListener interface { - OnChangeToClosed(prev State, rule Rule) + // OnTransformToClosed is triggered when circuit breaker state transformed to Closed. + OnTransformToClosed(prev State, rule Rule) - OnChangeToOpen(prev State, rule Rule, snapshot interface{}) + // OnTransformToOpen is triggered when circuit breaker state transformed to Open. + // The "snapshot" indicates the triggered value when the transformation occurs. + OnTransformToOpen(prev State, rule Rule, snapshot interface{}) - OnChangeToHalfOpen(prev State, rule Rule) + // OnTransformToHalfOpen is triggered when circuit breaker state transformed to HalfOpen. + OnTransformToHalfOpen(prev State, rule Rule) } type CircuitBreaker interface { + // BoundRule returns the associated circuit breaking rule. BoundRule() Rule - + // BoundStat returns the associated statistic data structure. BoundStat() interface{} - + // TryPass acquires permission of an invocation only if it is available at the time of invocation. TryPass(ctx *base.EntryContext) bool - + // CurrentState returns current state of the circuit breaker. CurrentState() State - // HandleCompleted handle the entry completed, Will not call HandleCompleted if request is blocked. - // rt: the response time this entry cost. - HandleCompleted(rt uint64, err error) + // OnRequestComplete record a completed request with the given response time as well as error (if present), + // and handle state transformation of the circuit breaker. + OnRequestComplete(rtt uint64, err error) } //================================= circuitBreakerBase ==================================== @@ -106,54 +113,50 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() { atomic.StoreUint64(&b.nextRetryTimestamp, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs)) } -// fromClosedToOpen update circuit breaker status machine from closed to open -// Used for opening circuit breaker from closed when checking circuit breaker -// return true if succeed to update +// fromClosedToOpen updates circuit breaker state machine from closed to open. +// Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool { if b.status.casState(Closed, Open) { b.updateNextRetryTimestamp() - for _, listener := range statusSwitchListeners { - listener.OnChangeToOpen(Closed, b.rule, snapshot) + for _, listener := range stateChangeListeners { + listener.OnTransformToOpen(Closed, b.rule, snapshot) } return true } return false } -// fromOpenToHalfOpen update circuit breaker status machine from open to half-open -// Used for probing -// return true if succeed to update +// fromOpenToHalfOpen updates circuit breaker state machine from open to half-open. +// Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromOpenToHalfOpen() bool { if b.status.casState(Open, HalfOpen) { - for _, listener := range statusSwitchListeners { - listener.OnChangeToHalfOpen(Open, b.rule) + for _, listener := range stateChangeListeners { + listener.OnTransformToHalfOpen(Open, b.rule) } return true } return false } -// fromHalfOpenToOpen update circuit breaker status machine from half-open to open -// Used for failing to probe -// return true if succeed to update +// fromHalfOpenToOpen updates circuit breaker state machine from half-open to open. +// Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool { if b.status.casState(HalfOpen, Open) { b.updateNextRetryTimestamp() - for _, listener := range statusSwitchListeners { - listener.OnChangeToOpen(HalfOpen, b.rule, snapshot) + for _, listener := range stateChangeListeners { + listener.OnTransformToOpen(HalfOpen, b.rule, snapshot) } return true } return false } -// fromHalfOpenToOpen update circuit breaker status machine from half-open to closed -// Used for succeeding to probe -// return true if succeed to update +// fromHalfOpenToOpen updates circuit breaker state machine from half-open to closed +// Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromHalfOpenToClosed() bool { if b.status.casState(HalfOpen, Closed) { - for _, listener := range statusSwitchListeners { - listener.OnChangeToClosed(HalfOpen, b.rule) + for _, listener := range stateChangeListeners { + listener.OnTransformToClosed(HalfOpen, b.rule) } return true } @@ -198,7 +201,7 @@ func (b *slowRtCircuitBreaker) BoundStat() interface{} { return b.stat } -// TryPass check circuit breaker based on status machine of circuit breaker. +// TryPass check circuit breaker based on state machine of circuit breaker. func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool { curStatus := b.CurrentState() if curStatus == Closed { @@ -212,7 +215,7 @@ func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool { return false } -func (b *slowRtCircuitBreaker) HandleCompleted(rt uint64, err error) { +func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, err error) { // add slow and add total metricStat := b.stat counter := metricStat.currentCounter() @@ -393,7 +396,7 @@ func (b *errorRatioCircuitBreaker) TryPass(_ *base.EntryContext) bool { return false } -func (b *errorRatioCircuitBreaker) HandleCompleted(rt uint64, err error) { +func (b *errorRatioCircuitBreaker) OnRequestComplete(rt uint64, err error) { metricStat := b.stat counter := metricStat.currentCounter() if err != nil { @@ -570,7 +573,7 @@ func (b *errorCountCircuitBreaker) TryPass(_ *base.EntryContext) bool { return false } -func (b *errorCountCircuitBreaker) HandleCompleted(rt uint64, err error) { +func (b *errorCountCircuitBreaker) OnRequestComplete(rt uint64, err error) { metricStat := b.stat counter := metricStat.currentCounter() if err != nil { diff --git a/core/circuitbreaker/circuit_breaker_test.go b/core/circuitbreaker/circuit_breaker_test.go index 0932e0de4..a75d9c57f 100644 --- a/core/circuitbreaker/circuit_breaker_test.go +++ b/core/circuitbreaker/circuit_breaker_test.go @@ -4,10 +4,8 @@ import ( "testing" "github.com/alibaba/sentinel-golang/core/base" - - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) type CircuitBreakerMock struct { @@ -19,6 +17,11 @@ func (m *CircuitBreakerMock) BoundRule() Rule { return args.Get(0).(Rule) } +func (m *CircuitBreakerMock) BoundStat() interface{} { + args := m.Called() + return args.Get(0) +} + func (m *CircuitBreakerMock) TryPass(ctx *base.EntryContext) bool { args := m.Called(ctx) return args.Bool(0) @@ -29,9 +32,7 @@ func (m *CircuitBreakerMock) CurrentState() State { return args.Get(0).(State) } -// HandleCompleted handle the entry completed -// rt: the response time this entry cost. -func (m *CircuitBreakerMock) HandleCompleted(rt uint64, err error) { +func (m *CircuitBreakerMock) OnRequestComplete(rt uint64, err error) { m.Called(rt, err) return } diff --git a/core/circuitbreaker/rule.go b/core/circuitbreaker/rule.go index 6990aaea8..5236e71b3 100644 --- a/core/circuitbreaker/rule.go +++ b/core/circuitbreaker/rule.go @@ -3,27 +3,26 @@ package circuitbreaker import ( "fmt" - "go.uber.org/multierr" - "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" + "go.uber.org/multierr" ) -// The strategy of circuit breaker -// Each strategy represent one rule type +// The strategy of circuit breaker. +// Each strategy represents one rule type. type Strategy int8 const ( - SlowRt Strategy = iota + SlowRequestRatio Strategy = iota ErrorRatio ErrorCount ) func (s Strategy) String() string { switch s { - case SlowRt: - return "SlowRt" + case SlowRequestRatio: + return "SlowRequestRatio" case ErrorRatio: return "ErrorRatio" case ErrorCount: @@ -33,32 +32,36 @@ func (s Strategy) String() string { } } -// The base interface of circuit breaker rule +// Rule represents the base interface of the circuit breaker rule. type Rule interface { base.SentinelRule - // return the strategy type + // BreakerStrategy returns the strategy. BreakerStrategy() Strategy - // check whether the rule is valid and could be converted to corresponding circuit breaker + // IsApplicable checks whether the rule is valid and could be converted to a corresponding circuit breaker. IsApplicable() error - // return circuit breaker stat interval + // BreakerStatIntervalMs returns the statistic interval of circuit breaker (in milliseconds). BreakerStatIntervalMs() uint32 - // check whether is consistent with new rule + // IsEqualsTo checks whether current rule is consistent with the given rule. IsEqualsTo(r Rule) bool - // check whether the statistic of old circuit breaker is reuseable for new circuit breaker + // IsStatReusable checks whether current rule is "statistically" equal to the given rule. IsStatReusable(r Rule) bool } -// The common fields of circuit breaker rule +// RuleBase encompasses common fields of circuit breaking rule. type RuleBase struct { // unique id Id string // resource name Resource string Strategy Strategy - // auto recover timeout in ms, all requests would be broken before auto recover - RetryTimeoutMs uint32 + // RetryTimeoutMs represents recovery timeout (in seconds) before the circuit breaker opens. + // During the open period, no requests are permitted until the timeout has elapsed. + // After that, the circuit breaker will transform to half-open state for trying a few "trial" requests. + RetryTimeoutMs uint32 + // MinRequestAmount represents the minimum number of requests (in an active statistic time span) + // that can trigger circuit breaking. MinRequestAmount uint64 - + // StatIntervalMs represents statistic time interval of the internal circuit breaker (in ms). StatIntervalMs uint32 } @@ -92,11 +95,13 @@ func (b *RuleBase) ResourceName() string { return b.Resource } -// SlowRt circuit breaker rule +// SlowRequestRatio circuit breaker rule type slowRtRule struct { RuleBase - // max allowed rt in ms - MaxAllowedRt uint64 + // MaxAllowedRt indicates that any invocation whose response time exceeds this value + // will be recorded as a slow request. + MaxAllowedRt uint64 + // MaxSlowRequestRatio represents the threshold of slow request ratio. MaxSlowRequestRatio float64 } @@ -105,7 +110,7 @@ func NewSlowRtRule(resource string, intervalMs uint32, retryTimeoutMs uint32, ma RuleBase: RuleBase{ Id: util.NewUuid(), Resource: resource, - Strategy: SlowRt, + Strategy: SlowRequestRatio, RetryTimeoutMs: retryTimeoutMs, MinRequestAmount: minRequestAmount, StatIntervalMs: intervalMs, diff --git a/core/circuitbreaker/rule_manager.go b/core/circuitbreaker/rule_manager.go index 5608a1767..5c1090b2d 100644 --- a/core/circuitbreaker/rule_manager.go +++ b/core/circuitbreaker/rule_manager.go @@ -1,8 +1,8 @@ package circuitbreaker import ( + "encoding/json" "fmt" - "strings" "sync" "github.com/alibaba/sentinel-golang/logging" @@ -19,11 +19,11 @@ var ( breakers = make(map[string][]CircuitBreaker) updateMux = &sync.RWMutex{} - statusSwitchListeners = make([]StateChangeListener, 0) + stateChangeListeners = make([]StateChangeListener, 0) ) func init() { - cbGenFuncMap[SlowRt] = func(r Rule, reuseStat interface{}) CircuitBreaker { + cbGenFuncMap[SlowRequestRatio] = func(r Rule, reuseStat interface{}) CircuitBreaker { rtRule, ok := r.(*slowRtRule) if !ok || rtRule == nil { return nil @@ -82,9 +82,10 @@ func GetResRules(resource string) []Rule { return ret } -// Load the newer rules to manager. -// rules: the newer rules, if len of rules is 0, will clear all rules of manager. +// LoadRules replaces old rules with the given circuit breaking rules. +// // return value: +// // bool: was designed to indicate whether the internal map has been changed // error: was designed to indicate whether occurs the error. func LoadRules(rules []Rule) (bool, error) { @@ -123,7 +124,7 @@ func onRuleUpdate(rules []Rule) (err error) { continue } if err := rule.IsApplicable(); err != nil { - logger.Warnf("Ignoring invalid breaker rule when loading new rules,rule: %+v, err: %+v.", rule, err) + logger.Warnf("Ignoring invalid circuit breaking rule when loading new rules, rule: %+v, err: %+v", rule, err) continue } @@ -144,6 +145,7 @@ func onRuleUpdate(rules []Rule) (err error) { for res, resRules := range newBreakerRules { emptyCircuitBreakerList := make([]CircuitBreaker, 0, 0) for _, r := range resRules { + // TODO: rearrange the code here. oldResCbs := breakers[res] if oldResCbs == nil { oldResCbs = emptyCircuitBreakerList @@ -239,33 +241,32 @@ func rulesFrom(rm map[string][]Rule) []Rule { } func logRuleUpdate(rules map[string][]Rule) { - sb := strings.Builder{} - sb.WriteString("[CircuitBreakerRuleManager] succeed to load circuit breakers:\n") - for _, rule := range rulesFrom(rules) { - sb.WriteString(rule.String()) - sb.WriteString("\n") + s, err := json.Marshal(rules) + if err != nil { + logger.Info("Circuit breaking rules loaded") + } else { + logger.Infof("Circuit breaking rules loaded: %s", s) } - logger.Info(sb.String()) } -func RegisterStatusSwitchListeners(listeners ...StateChangeListener) { +func RegisterStateChangeListeners(listeners ...StateChangeListener) { if len(listeners) == 0 { return } updateMux.Lock() defer updateMux.Unlock() - statusSwitchListeners = append(statusSwitchListeners, listeners...) + stateChangeListeners = append(stateChangeListeners, listeners...) } -// SetTrafficShapingGenerator sets the traffic controller generator for the given control behavior. -// Note that modifying the generator of default control behaviors is not allowed. -func SetTrafficShapingGenerator(s Strategy, generator CircuitBreakerGenFunc) error { +// SetCircuitBreakerGenerator sets the circuit breaker generator for the given strategy. +// Note that modifying the generator of default strategies is not allowed. +func SetCircuitBreakerGenerator(s Strategy, generator CircuitBreakerGenFunc) error { if generator == nil { return errors.New("nil generator") } - if s >= SlowRt && s <= ErrorCount { - return errors.New("not allowed to replace the generator for default control behaviors") + if s >= SlowRequestRatio && s <= ErrorCount { + return errors.New("not allowed to replace the generator for default circuit breaking strategies") } updateMux.Lock() defer updateMux.Unlock() @@ -274,9 +275,9 @@ func SetTrafficShapingGenerator(s Strategy, generator CircuitBreakerGenFunc) err return nil } -func RemoveTrafficShapingGenerator(s Strategy) error { - if s >= SlowRt && s <= ErrorCount { - return errors.New("not allowed to replace the generator for default control behaviors") +func RemoveCircuitBreakerGenerator(s Strategy) error { + if s >= SlowRequestRatio && s <= ErrorCount { + return errors.New("not allowed to replace the generator for default circuit breaking strategies") } updateMux.Lock() defer updateMux.Unlock() diff --git a/core/circuitbreaker/stat_slot.go b/core/circuitbreaker/stat_slot.go index 6d1dc41b2..2f2581e88 100644 --- a/core/circuitbreaker/stat_slot.go +++ b/core/circuitbreaker/stat_slot.go @@ -4,8 +4,7 @@ import ( "github.com/alibaba/sentinel-golang/core/base" ) -// MetricStatSlot add statistic metric for circuit breaker -// statistic is based on completed. +// MetricStatSlot records metrics for circuit breaker on invocation completed. type MetricStatSlot struct { } @@ -20,6 +19,7 @@ func (c *MetricStatSlot) OnEntryBlocked(_ *base.EntryContext, _ *base.BlockError } func (c *MetricStatSlot) OnCompleted(ctx *base.EntryContext) { + // The slot will ignore blocked requests. if ctx.RuleCheckResult == nil || ctx.RuleCheckResult.IsBlocked() { return } @@ -27,6 +27,6 @@ func (c *MetricStatSlot) OnCompleted(ctx *base.EntryContext) { err := ctx.Err() rt := ctx.Rt() for _, cb := range getResBreakers(res) { - cb.HandleCompleted(rt, err) + cb.OnRequestComplete(rt, err) } } diff --git a/example/circuitbreaker/circuit_breaker_example.go b/example/circuitbreaker/circuit_breaker_example.go index 305c38be5..f01dce00e 100644 --- a/example/circuitbreaker/circuit_breaker_example.go +++ b/example/circuitbreaker/circuit_breaker_example.go @@ -15,15 +15,15 @@ import ( type stateChangeTestListener struct { } -func (s *stateChangeTestListener) OnChangeToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { +func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.BreakerStrategy(), prev.String(), util.CurrentTimeMillis()) } -func (s *stateChangeTestListener) OnChangeToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { +func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.BreakerStrategy(), prev.String(), snapshot, util.CurrentTimeMillis()) } -func (s *stateChangeTestListener) OnChangeToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { +func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.BreakerStrategy(), prev.String(), util.CurrentTimeMillis()) } @@ -33,7 +33,7 @@ func main() { log.Fatal(err) } ch := make(chan struct{}) - circuitbreaker.RegisterStatusSwitchListeners(&stateChangeTestListener{}) + circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{}) _, err = circuitbreaker.LoadRules([]circuitbreaker.Rule{ circuitbreaker.NewSlowRtRule("abc", 10000, 3000, 50, 10, 0.5),