Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func (s *State) get() State {
statePtr := (*int32)(unsafe.Pointer(s))
return State(atomic.LoadInt32(statePtr))
}

func (s *State) set(update State) {
statePtr := (*int32)(unsafe.Pointer(s))
newState := int32(update)
atomic.StoreInt32(statePtr, newState)
}

func (s *State) casState(expect State, update State) bool {
statePtr := (*int32)(unsafe.Pointer(s))
oldState := int32(expect)
Expand All @@ -62,24 +64,29 @@ func (s *State) casState(expect State, update State) bool {
}

type StateChangeListener interface {
OnChangeToClosed(prev State, rule Rule)
// OnTransformToClosed is triggered when circuit breaker state transformed to Closed.
OnTransformToClosed(prev State, rule Rule)

OnChangeToOpen(prev State, rule Rule, snapshot interface{})
// OnTransformToOpen is triggered when circuit breaker state transformed to Open.
// The "snapshot" indicates the triggered value when the transformation occurs.
OnTransformToOpen(prev State, rule Rule, snapshot interface{})

OnChangeToHalfOpen(prev State, rule Rule)
// OnTransformToHalfOpen is triggered when circuit breaker state transformed to HalfOpen.
OnTransformToHalfOpen(prev State, rule Rule)
}

type CircuitBreaker interface {
// BoundRule returns the associated circuit breaking rule.
BoundRule() Rule

// BoundStat returns the associated statistic data structure.
BoundStat() interface{}

// TryPass acquires permission of an invocation only if it is available at the time of invocation.
TryPass(ctx *base.EntryContext) bool

// CurrentState returns current state of the circuit breaker.
CurrentState() State
// HandleCompleted handle the entry completed, Will not call HandleCompleted if request is blocked.
// rt: the response time this entry cost.
HandleCompleted(rt uint64, err error)
// OnRequestComplete record a completed request with the given response time as well as error (if present),
// and handle state transformation of the circuit breaker.
OnRequestComplete(rtt uint64, err error)
}

//================================= circuitBreakerBase ====================================
Expand All @@ -106,54 +113,50 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
atomic.StoreUint64(&b.nextRetryTimestamp, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
}

// fromClosedToOpen update circuit breaker status machine from closed to open
// Used for opening circuit breaker from closed when checking circuit breaker
// return true if succeed to update
// fromClosedToOpen updates circuit breaker state machine from closed to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
if b.status.casState(Closed, Open) {
b.updateNextRetryTimestamp()
for _, listener := range statusSwitchListeners {
listener.OnChangeToOpen(Closed, b.rule, snapshot)
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(Closed, b.rule, snapshot)
}
return true
}
return false
}

// fromOpenToHalfOpen update circuit breaker status machine from open to half-open
// Used for probing
// return true if succeed to update
// fromOpenToHalfOpen updates circuit breaker state machine from open to half-open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromOpenToHalfOpen() bool {
if b.status.casState(Open, HalfOpen) {
for _, listener := range statusSwitchListeners {
listener.OnChangeToHalfOpen(Open, b.rule)
for _, listener := range stateChangeListeners {
listener.OnTransformToHalfOpen(Open, b.rule)
}
return true
}
return false
}

// fromHalfOpenToOpen update circuit breaker status machine from half-open to open
// Used for failing to probe
// return true if succeed to update
// fromHalfOpenToOpen updates circuit breaker state machine from half-open to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
if b.status.casState(HalfOpen, Open) {
b.updateNextRetryTimestamp()
for _, listener := range statusSwitchListeners {
listener.OnChangeToOpen(HalfOpen, b.rule, snapshot)
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, b.rule, snapshot)
}
return true
}
return false
}

// fromHalfOpenToOpen update circuit breaker status machine from half-open to closed
// Used for succeeding to probe
// return true if succeed to update
// fromHalfOpenToOpen updates circuit breaker state machine from half-open to closed
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
if b.status.casState(HalfOpen, Closed) {
for _, listener := range statusSwitchListeners {
listener.OnChangeToClosed(HalfOpen, b.rule)
for _, listener := range stateChangeListeners {
listener.OnTransformToClosed(HalfOpen, b.rule)
}
return true
}
Expand Down Expand Up @@ -198,7 +201,7 @@ func (b *slowRtCircuitBreaker) BoundStat() interface{} {
return b.stat
}

// TryPass check circuit breaker based on status machine of circuit breaker.
// TryPass check circuit breaker based on state machine of circuit breaker.
func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool {
curStatus := b.CurrentState()
if curStatus == Closed {
Expand All @@ -212,7 +215,7 @@ func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool {
return false
}

func (b *slowRtCircuitBreaker) HandleCompleted(rt uint64, err error) {
func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, err error) {
// add slow and add total
metricStat := b.stat
counter := metricStat.currentCounter()
Expand Down Expand Up @@ -393,7 +396,7 @@ func (b *errorRatioCircuitBreaker) TryPass(_ *base.EntryContext) bool {
return false
}

func (b *errorRatioCircuitBreaker) HandleCompleted(rt uint64, err error) {
func (b *errorRatioCircuitBreaker) OnRequestComplete(rt uint64, err error) {
metricStat := b.stat
counter := metricStat.currentCounter()
if err != nil {
Expand Down Expand Up @@ -570,7 +573,7 @@ func (b *errorCountCircuitBreaker) TryPass(_ *base.EntryContext) bool {
return false
}

func (b *errorCountCircuitBreaker) HandleCompleted(rt uint64, err error) {
func (b *errorCountCircuitBreaker) OnRequestComplete(rt uint64, err error) {
metricStat := b.stat
counter := metricStat.currentCounter()
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions core/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"testing"

"github.com/alibaba/sentinel-golang/core/base"

"github.com/stretchr/testify/mock"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type CircuitBreakerMock struct {
Expand All @@ -19,6 +17,11 @@ func (m *CircuitBreakerMock) BoundRule() Rule {
return args.Get(0).(Rule)
}

func (m *CircuitBreakerMock) BoundStat() interface{} {
args := m.Called()
return args.Get(0)
}

func (m *CircuitBreakerMock) TryPass(ctx *base.EntryContext) bool {
args := m.Called(ctx)
return args.Bool(0)
Expand All @@ -29,9 +32,7 @@ func (m *CircuitBreakerMock) CurrentState() State {
return args.Get(0).(State)
}

// HandleCompleted handle the entry completed
// rt: the response time this entry cost.
func (m *CircuitBreakerMock) HandleCompleted(rt uint64, err error) {
func (m *CircuitBreakerMock) OnRequestComplete(rt uint64, err error) {
m.Called(rt, err)
return
}
Expand Down
47 changes: 26 additions & 21 deletions core/circuitbreaker/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,26 @@ package circuitbreaker
import (
"fmt"

"go.uber.org/multierr"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
"go.uber.org/multierr"
)

// The strategy of circuit breaker
// Each strategy represent one rule type
// The strategy of circuit breaker.
// Each strategy represents one rule type.
type Strategy int8

const (
SlowRt Strategy = iota
SlowRequestRatio Strategy = iota
ErrorRatio
ErrorCount
)

func (s Strategy) String() string {
switch s {
case SlowRt:
return "SlowRt"
case SlowRequestRatio:
return "SlowRequestRatio"
case ErrorRatio:
return "ErrorRatio"
case ErrorCount:
Expand All @@ -33,32 +32,36 @@ func (s Strategy) String() string {
}
}

// The base interface of circuit breaker rule
// Rule represents the base interface of the circuit breaker rule.
type Rule interface {
base.SentinelRule
// return the strategy type
// BreakerStrategy returns the strategy.
BreakerStrategy() Strategy
// check whether the rule is valid and could be converted to corresponding circuit breaker
// IsApplicable checks whether the rule is valid and could be converted to a corresponding circuit breaker.
IsApplicable() error
// return circuit breaker stat interval
// BreakerStatIntervalMs returns the statistic interval of circuit breaker (in milliseconds).
BreakerStatIntervalMs() uint32
// check whether is consistent with new rule
// IsEqualsTo checks whether current rule is consistent with the given rule.
IsEqualsTo(r Rule) bool
// check whether the statistic of old circuit breaker is reuseable for new circuit breaker
// IsStatReusable checks whether current rule is "statistically" equal to the given rule.
IsStatReusable(r Rule) bool
}

// The common fields of circuit breaker rule
// RuleBase encompasses common fields of circuit breaking rule.
type RuleBase struct {
// unique id
Id string
// resource name
Resource string
Strategy Strategy
// auto recover timeout in ms, all requests would be broken before auto recover
RetryTimeoutMs uint32
// RetryTimeoutMs represents recovery timeout (in seconds) before the circuit breaker opens.
// During the open period, no requests are permitted until the timeout has elapsed.
// After that, the circuit breaker will transform to half-open state for trying a few "trial" requests.
RetryTimeoutMs uint32
// MinRequestAmount represents the minimum number of requests (in an active statistic time span)
// that can trigger circuit breaking.
MinRequestAmount uint64

// StatIntervalMs represents statistic time interval of the internal circuit breaker (in ms).
StatIntervalMs uint32
}

Expand Down Expand Up @@ -92,11 +95,13 @@ func (b *RuleBase) ResourceName() string {
return b.Resource
}

// SlowRt circuit breaker rule
// SlowRequestRatio circuit breaker rule
type slowRtRule struct {
RuleBase
// max allowed rt in ms
MaxAllowedRt uint64
// MaxAllowedRt indicates that any invocation whose response time exceeds this value
// will be recorded as a slow request.
MaxAllowedRt uint64
// MaxSlowRequestRatio represents the threshold of slow request ratio.
MaxSlowRequestRatio float64
}

Expand All @@ -105,7 +110,7 @@ func NewSlowRtRule(resource string, intervalMs uint32, retryTimeoutMs uint32, ma
RuleBase: RuleBase{
Id: util.NewUuid(),
Resource: resource,
Strategy: SlowRt,
Strategy: SlowRequestRatio,
RetryTimeoutMs: retryTimeoutMs,
MinRequestAmount: minRequestAmount,
StatIntervalMs: intervalMs,
Expand Down
Loading