diff --git a/api/slot_chain.go b/api/slot_chain.go index 46806e6e5..4649af06e 100644 --- a/api/slot_chain.go +++ b/api/slot_chain.go @@ -2,6 +2,7 @@ package api import ( "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/flow" "github.com/alibaba/sentinel-golang/core/log" "github.com/alibaba/sentinel-golang/core/stat" @@ -28,7 +29,9 @@ func BuildDefaultSlotChain() *base.SlotChain { sc.AddStatPrepareSlotLast(&stat.StatNodePrepareSlot{}) sc.AddRuleCheckSlotLast(&system.SystemAdaptiveSlot{}) sc.AddRuleCheckSlotLast(&flow.FlowSlot{}) + sc.AddRuleCheckSlotLast(&circuitbreaker.CircuitBreakerSlot{}) sc.AddStatSlotLast(&stat.StatisticSlot{}) sc.AddStatSlotLast(&log.LogSlot{}) + sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{}) return sc } diff --git a/core/base/context.go b/core/base/context.go index 8fafb797e..dc597cdbe 100644 --- a/core/base/context.go +++ b/core/base/context.go @@ -1,8 +1,15 @@ package base +import "github.com/alibaba/sentinel-golang/util" + type EntryContext struct { + // internal error when sentinel entry or + // biz error of downstream + err error // Use to calculate RT startTime uint64 + // the rt of this transaction + rt uint64 Resource *ResourceWrapper StatNode StatNode @@ -14,6 +21,14 @@ type EntryContext struct { Data map[interface{}]interface{} } +func (ctx *EntryContext) Err() error { + return ctx.err +} + +func (ctx *EntryContext) SetError(err error) { + ctx.err = err +} + func (ctx *EntryContext) StartTime() uint64 { return ctx.startTime } @@ -25,6 +40,18 @@ func (ctx *EntryContext) IsBlocked() bool { return ctx.RuleCheckResult.IsBlocked() } +func (ctx *EntryContext) PutRt(rt uint64) { + ctx.rt = rt +} + +func (ctx *EntryContext) Rt() uint64 { + if ctx.rt == 0 { + rt := util.CurrentTimeMillis() - ctx.StartTime() + return rt + } + return ctx.rt +} + func NewEmptyEntryContext() *EntryContext { return &EntryContext{} } @@ -50,7 +77,9 @@ func newEmptyInput() *SentinelInput { // Reset init EntryContext, func (ctx *EntryContext) Reset() { // reset all fields of ctx + ctx.err = nil ctx.startTime = 0 + ctx.rt = 0 ctx.Resource = nil ctx.StatNode = nil ctx.Input = nil diff --git a/core/base/entry.go b/core/base/entry.go index d504a74fb..cdbe036c7 100644 --- a/core/base/entry.go +++ b/core/base/entry.go @@ -15,6 +15,20 @@ type SentinelEntry struct { exitCtl sync.Once } +func NewSentinelEntry(ctx *EntryContext, rw *ResourceWrapper, sc *SlotChain) *SentinelEntry { + return &SentinelEntry{ + res: rw, + ctx: ctx, + sc: sc, + } +} + +func (e *SentinelEntry) SetError(err error) { + if e.ctx != nil { + e.ctx.SetError(err) + } +} + func (e *SentinelEntry) Context() *EntryContext { return e.ctx } @@ -23,12 +37,28 @@ func (e *SentinelEntry) Resource() *ResourceWrapper { return e.res } -func NewSentinelEntry(ctx *EntryContext, rw *ResourceWrapper, sc *SlotChain) *SentinelEntry { - return &SentinelEntry{res: rw, ctx: ctx, sc: sc} +type ExitOptions struct { + err error +} +type ExitOption func(*ExitOptions) + +func WithError(err error) ExitOption { + return func(opts *ExitOptions) { + opts.err = err + } } -func (e *SentinelEntry) Exit() { +func (e *SentinelEntry) Exit(exitOps ...ExitOption) { + var options = ExitOptions{ + err: nil, + } + for _, opt := range exitOps { + opt(&options) + } ctx := e.ctx + if options.err != nil { + ctx.SetError(options.err) + } e.exitCtl.Do(func() { if e.sc != nil { e.sc.exit(ctx) diff --git a/core/base/slot_chain.go b/core/base/slot_chain.go index 83d5b657d..f59502eba 100644 --- a/core/base/slot_chain.go +++ b/core/base/slot_chain.go @@ -5,6 +5,7 @@ import ( "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" + "github.com/pkg/errors" ) var logger = logging.GetDefaultLogger() @@ -124,6 +125,7 @@ func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult { defer func() { if err := recover(); err != nil { logger.Panicf("Sentinel internal panic in SlotChain, err: %+v", err) + ctx.SetError(errors.Errorf("%+v", err)) return } }() diff --git a/core/circuit_breaker/circuit_breaker.go b/core/circuit_breaker/circuit_breaker.go deleted file mode 100644 index d5b9448e9..000000000 --- a/core/circuit_breaker/circuit_breaker.go +++ /dev/null @@ -1,253 +0,0 @@ -package circuit_breaker - -import ( - "sync/atomic" - "time" - - "github.com/alibaba/sentinel-golang/core/base" - "github.com/alibaba/sentinel-golang/core/stat" - "github.com/alibaba/sentinel-golang/util" -) - -type CircuitBreaker interface { - getRule() Rule - TryPass(ctx *base.EntryContext) bool -} - -// average rt circuit breaker will cut resource if the rt of resource exceed the threshold of rule. -type averageRtCircuitBreaker struct { - // status of the circuit breaker - cut util.AtomicBool - // the count of request exceed the threshold - passCount int64 - rule *averageRtRule - metric base.ReadStat -} - -func newAverageRtCircuitBreaker(rule *averageRtRule) *averageRtCircuitBreaker { - resNode := stat.GetResourceNode(rule.Resource) - var metric base.ReadStat - // TODO need to optimize, we should to handle the scenario that resNode is nil - if resNode != nil { - metric = resNode.GetOrCreateSlidingWindowMetric(rule.SampleCount, rule.IntervalInMs) - } - return &averageRtCircuitBreaker{ - rule: rule, - metric: metric, - } -} - -// For test -func newAverageRtCircuitBreakerWithMetric(rule *averageRtRule, metric base.ReadStat) *averageRtCircuitBreaker { - return &averageRtCircuitBreaker{ - rule: rule, - metric: metric, - } -} - -func (b averageRtCircuitBreaker) getRule() Rule { - return b.rule -} - -func (b *averageRtCircuitBreaker) TryPass(_ *base.EntryContext) bool { - // currently, the breaker is before auto recover, direct return blocked . - if b.cut.Get() { - return false - } - rule := b.rule - if rule == nil { - return true - } - - // TODO need to optimize here. - // We might create individual stat structures for circuit breakers, rather than use the universal ResourceNode. - if b.metric == nil { - resNode := stat.GetResourceNode(rule.Resource) - if resNode == nil { - logger.Errorf("Resource(%s)'s stat node is nil.", rule.Resource) - return true - } - b.metric = resNode.GetOrCreateSlidingWindowMetric(rule.SampleCount, rule.IntervalInMs) - logger.Errorf("Delayed to initialize the metric of averageRtCircuitBreaker.") - } - - avgRt := b.metric.AvgRT() - if avgRt < rule.Threshold { - atomic.StoreInt64(&b.passCount, 0) - return true - } - if util.IncrementAndGetInt64(&b.passCount) < rule.RtSlowRequestAmount { - return true - } - // trigger circuit breaker - if b.cut.CompareAndSet(false, true) { - go util.RunWithRecover(func() { - // recover after RecoverTimeout seconds - time.Sleep(time.Second * time.Duration(rule.RecoverTimeout)) - atomic.StoreInt64(&b.passCount, 0) - b.cut.Set(false) - }, logger) - } - return false -} - -// error ratio circuit breaker will cut resource if the error ratio of resource exceed the threshold of rule. -type errorRatioCircuitBreaker struct { - // status of the breaker - cut util.AtomicBool - // the count of request exceed the threshold - passCount int64 - rule *errorRatioRule - metric base.ReadStat -} - -func newErrorRatioCircuitBreaker(rule *errorRatioRule) *errorRatioCircuitBreaker { - resNode := stat.GetResourceNode(rule.Resource) - var metric base.ReadStat - // TODO need to optimize, we should to handle the scenario that resNode is nil - if resNode != nil { - metric = resNode.GetOrCreateSlidingWindowMetric(rule.SampleCount, rule.IntervalInMs) - } - return &errorRatioCircuitBreaker{ - rule: rule, - metric: metric, - } -} - -func newErrorRatioCircuitBreakerWithMetric(rule *errorRatioRule, metric base.ReadStat) *errorRatioCircuitBreaker { - return &errorRatioCircuitBreaker{ - rule: rule, - metric: metric, - } -} - -func (b *errorRatioCircuitBreaker) getRule() Rule { - return b.rule -} - -func (b *errorRatioCircuitBreaker) TryPass(_ *base.EntryContext) bool { - if b.cut.Get() { - return false - } - - rule := b.rule - if rule == nil { - return true - } - - // TODO need to optimize here. - // We might create individual stat structures for circuit breakers, rather than use the universal ResourceNode. - if b.metric == nil { - resNode := stat.GetResourceNode(rule.Resource) - if resNode == nil { - logger.Errorf("Resource(%s)'s stat node is nil.", rule.Resource) - return true - } - b.metric = resNode.GetOrCreateSlidingWindowMetric(rule.SampleCount, rule.IntervalInMs) - logger.Errorf("Delayed to initialize the metric of errorRatioCircuitBreaker.") - } - - // biz error total - err := b.metric.GetQPS(base.MetricEventError) - // complete = err + realComplete - complete := b.metric.GetQPS(base.MetricEventComplete) - // total = pass + blocked - total := b.metric.GetQPS(base.MetricEventPass) + b.metric.GetQPS(base.MetricEventBlock) - - // If total amount is less than minRequestAmount, the request will pass. - if total < float64(rule.MinRequestAmount) { - return true - } - - // "success" (aka. completed count) = error count + non-error count (realComplete) - realComplete := complete - err - // error count - if realComplete <= 0 && err < float64(rule.MinRequestAmount) { - return true - } - - // err/complete is error ratio of the biz - if err/complete < rule.Threshold { - return true - } - - if b.cut.CompareAndSet(false, true) { - go util.RunWithRecover(func() { - // recover after RecoverTimeout seconds - time.Sleep(time.Second * time.Duration(rule.RecoverTimeout)) - b.cut.Set(false) - }, logger) - } - return false -} - -// error count circuit breaker will cut resource if the error count of resource exceed the threshold of rule. -type errorCountCircuitBreaker struct { - // status of the breaker - cut util.AtomicBool - // the count of request exceed the threshold - passCount int64 - rule *errorCountRule - metric base.ReadStat -} - -func newErrorCountCircuitBreaker(rule *errorCountRule) *errorCountCircuitBreaker { - resNode := stat.GetResourceNode(rule.Resource) - var metric base.ReadStat - // TODO need to optimize, we should to handle the scenario that resNode is nil - if resNode != nil { - metric = resNode.GetOrCreateSlidingWindowMetric(rule.SampleCount, rule.IntervalInMs) - } - return &errorCountCircuitBreaker{ - rule: rule, - metric: metric, - } -} - -func newErrorCountCircuitBreakerWithMetric(rule *errorCountRule, metric base.ReadStat) *errorCountCircuitBreaker { - return &errorCountCircuitBreaker{ - rule: rule, - metric: metric, - } -} - -func (b *errorCountCircuitBreaker) getRule() Rule { - return b.rule -} - -func (b *errorCountCircuitBreaker) TryPass(_ *base.EntryContext) bool { - if b.cut.Get() { - return false - } - - rule := b.rule - if rule == nil { - return true - } - - // TODO need to optimize here. - // We might create individual stat structures for circuit breakers, rather than use the universal ResourceNode. - if b.metric == nil { - resNode := stat.GetResourceNode(rule.Resource) - if resNode == nil { - logger.Errorf("Resource(%s)'s stat node is nil.", rule.Resource) - return true - } - b.metric = resNode.GetOrCreateSlidingWindowMetric(rule.SampleCount, rule.IntervalInMs) - logger.Errorf("Delayed to initialize the metric of errorCountCircuitBreaker.") - } - - err := b.metric.GetQPS(base.MetricEventError) - if err < float64(rule.Threshold) { - return true - } - - if b.cut.CompareAndSet(false, true) { - go util.RunWithRecover(func() { - // recover after RecoverTimeout seconds - time.Sleep(time.Second * time.Duration(rule.RecoverTimeout)) - b.cut.Set(false) - }, logger) - } - return false -} diff --git a/core/circuit_breaker/circuit_breaker_test.go b/core/circuit_breaker/circuit_breaker_test.go deleted file mode 100644 index c5807d31f..000000000 --- a/core/circuit_breaker/circuit_breaker_test.go +++ /dev/null @@ -1,247 +0,0 @@ -package circuit_breaker - -import ( - "testing" - "time" - - "github.com/alibaba/sentinel-golang/core/base" - "github.com/alibaba/sentinel-golang/util" - "github.com/stretchr/testify/mock" -) - -type ReadStatMock struct { - mock.Mock -} - -func (m *ReadStatMock) GetQPS(event base.MetricEvent) float64 { - args := m.Called(event) - return float64(args.Int(0)) -} - -func (m *ReadStatMock) GetQPSWithTime(now uint64, event base.MetricEvent) float64 { - args := m.Called(now, event) - return float64(args.Int(0)) -} - -func (m *ReadStatMock) GetSum(event base.MetricEvent) int64 { - args := m.Called(event) - return int64(args.Int(0)) -} - -func (m *ReadStatMock) GetSumWithTime(now uint64, event base.MetricEvent) int64 { - args := m.Called(now, event) - return int64(args.Int(0)) -} - -func (m *ReadStatMock) AvgRT() float64 { - args := m.Called() - return float64(args.Int(0)) -} - -func (m *ReadStatMock) MinRT() float64 { - args := m.Called() - return float64(args.Int(0)) -} - -type circuitBreakerMock struct { - mock.Mock -} - -func (m *circuitBreakerMock) getRule() Rule { - args := m.Called() - return args.Get(0).(Rule) -} - -func (m *circuitBreakerMock) TryPass(_ *base.EntryContext) bool { - args := m.Called() - return args.Bool(0) -} - -func Test_AverageRtCircuitBreaker_Check(t *testing.T) { - type args struct { - ctx *base.EntryContext - } - m := &ReadStatMock{} - rule := &averageRtRule{ - ruleBase: ruleBase{ - Id: util.NewUuid(), - Resource: "abc01", - Strategy: AverageRt, - RecoverTimeout: 1, - SampleCount: 2, - IntervalInMs: 1000, - }, - Threshold: 100, - RtSlowRequestAmount: 5, - } - tests := []struct { - name string - breaker *averageRtCircuitBreaker - args args - }{ - { - name: "Test_AverageRtCircuitBreaker_Check", - breaker: newAverageRtCircuitBreakerWithMetric(rule, m), - args: args{ - ctx: nil, - }, - }, - } - - m.On("AvgRT").Return(100) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for i := 0; i < 4; i++ { - if got := tt.breaker.TryPass(tt.args.ctx); got != true { - t.Errorf("averageRtCircuitBreaker.TryPass() = %v, want %v", got, true) - } - } - - if got := tt.breaker.TryPass(tt.args.ctx); got != false { - t.Errorf("averageRtCircuitBreaker.TryPass() = %v, want %v", got, false) - } - - // before auto recover - if got := tt.breaker.TryPass(tt.args.ctx); got != false { - t.Errorf("averageRtCircuitBreaker.TryPass() = %v, want %v", got, false) - } - - time.Sleep(2 * time.Second) - if got := tt.breaker.TryPass(tt.args.ctx); got != true { - t.Errorf("averageRtCircuitBreaker.TryPass() = %v, want %v", got, true) - } - }) - } -} - -func Test_ErrorRatioCircuitBreaker_Check(t *testing.T) { - type args struct { - ctx *base.EntryContext - } - m := &ReadStatMock{} - rule := &errorRatioRule{ - ruleBase: ruleBase{ - Id: util.NewUuid(), - Resource: "abc01", - Strategy: ErrorRatio, - RecoverTimeout: 1, - SampleCount: 2, - IntervalInMs: 1000, - }, - Threshold: 0.3, - MinRequestAmount: 5, - } - tests := []struct { - name string - breaker *errorRatioCircuitBreaker - args args - }{ - { - name: "Test_ErrorRatioCircuitBreaker_Check", - breaker: newErrorRatioCircuitBreakerWithMetric(rule, m), - args: args{ - ctx: nil, - }, - }, - } - - // mock data - m.On("GetQPS", base.MetricEventError).Return(100) - m.On("GetQPS", base.MetricEventComplete).Return(400) - m.On("GetQPS", base.MetricEventPass).Return(800) - m.On("GetQPS", base.MetricEventBlock).Return(200) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for i := 0; i < 4; i++ { - if got := tt.breaker.TryPass(tt.args.ctx); got != true { - t.Errorf("ErrorRatioCircuitBreaker.TryPass() = %v, want %v", got, true) - } - } - - m2 := &ReadStatMock{} - tt.breaker.metric = m2 - m2.On("GetQPS", base.MetricEventError).Return(200) - m2.On("GetQPS", base.MetricEventComplete).Return(400) - m2.On("GetQPS", base.MetricEventPass).Return(800) - m2.On("GetQPS", base.MetricEventBlock).Return(200) - - if got := tt.breaker.TryPass(tt.args.ctx); got != false { - t.Errorf("ErrorRatioCircuitBreaker.TryPass() = %v, want %v", got, false) - } - time.Sleep(2 * time.Second) - - m3 := &ReadStatMock{} - tt.breaker.metric = m3 - m3.On("GetQPS", base.MetricEventError).Return(0) - m3.On("GetQPS", base.MetricEventComplete).Return(0) - - m3.On("GetQPS", base.MetricEventPass).Return(0) - m3.On("GetQPS", base.MetricEventBlock).Return(0) - m3.On("GetQPS", base.MetricEventPass).Return(0) - m3.On("GetQPS", base.MetricEventBlock).Return(0) - if got := tt.breaker.TryPass(tt.args.ctx); got != true { - t.Errorf("ErrorRatioCircuitBreaker.TryPass() = %v, want %v", got, true) - } - }) - } -} - -func Test_ErrorCountCircuitBreaker_Check(t *testing.T) { - type args struct { - ctx *base.EntryContext - } - m := &ReadStatMock{} - rule := &errorCountRule{ - ruleBase: ruleBase{ - Id: util.NewUuid(), - Resource: "abc01", - Strategy: ErrorCount, - RecoverTimeout: 1, - SampleCount: 2, - IntervalInMs: 1000, - }, - Threshold: 10, - } - - tests := []struct { - name string - breaker *errorCountCircuitBreaker - args args - }{ - { - name: "Test_ErrorCountCircuitBreaker_Check", - breaker: newErrorCountCircuitBreakerWithMetric(rule, m), - args: args{ - ctx: nil, - }, - }, - } - m.On("GetQPS", base.MetricEventError).Return(5) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for i := 0; i < 4; i++ { - if got := tt.breaker.TryPass(tt.args.ctx); got != true { - t.Errorf("ErrorCountCircuitBreaker.TryPass() = %v, want %v", got, true) - } - } - - m2 := &ReadStatMock{} - tt.breaker.metric = m2 - m2.On("GetQPS", base.MetricEventError).Return(11) - for i := 0; i < 10; i++ { - if got := tt.breaker.TryPass(tt.args.ctx); got != false { - t.Errorf("ErrorCountCircuitBreaker.TryPass() = %v, want %v", got, true) - } - } - time.Sleep(2 * time.Second) - - m3 := &ReadStatMock{} - tt.breaker.metric = m3 - m3.On("GetQPS", base.MetricEventError).Return(1) - if got := tt.breaker.TryPass(tt.args.ctx); got != true { - t.Errorf("ErrorCountCircuitBreaker.TryPass() = %v, want %v", got, true) - } - }) - } -} diff --git a/core/circuit_breaker/rule.go b/core/circuit_breaker/rule.go deleted file mode 100644 index d98f7227e..000000000 --- a/core/circuit_breaker/rule.go +++ /dev/null @@ -1,219 +0,0 @@ -package circuit_breaker - -import ( - "encoding/json" - "fmt" - - "github.com/alibaba/sentinel-golang/core/base" - "github.com/alibaba/sentinel-golang/util" - "github.com/pkg/errors" -) - -// The strategy of circuit breaker -// Each strategy represent one rule type -type BreakerStrategy int8 - -const ( - AverageRt BreakerStrategy = iota - ErrorRatio - ErrorCount -) - -// The base interface of circuit breaker rule -type Rule interface { - base.SentinelRule - // return the strategy type - BreakerStrategy() BreakerStrategy - // check whether the rule is valid and could be converted to corresponding circuit breaker - isApplicable() bool - // convert circuit breaker rule to circuit breaker - convert2CircuitBreaker() CircuitBreaker -} - -// The common fields of circuit breaker rule -type ruleBase struct { - // unique id - Id string `json:"id,omitempty"` - // resource name - Resource string `json:"resource"` - Strategy BreakerStrategy `json:"strategy"` - // auto recover timeout in second, all requests would be broken before auto recover - RecoverTimeout int64 `json:"recoverTimeout"` - // the base data to describe the statistic metric - SampleCount uint32 `json:"sampleCount"` - IntervalInMs uint32 `json:"intervalInMs"` -} - -func (b *ruleBase) isApplicable() bool { - if !(len(b.Resource) > 0 && b.RecoverTimeout >= 0) { - logger.Warnf("Illegal parameters,Resource=%s,RecoverTimeout=%d.", b.Resource, b.RecoverTimeout) - return false - } - if b.IntervalInMs <= 0 || b.SampleCount <= 0 { - logger.Warnf("Illegal parameters,SampleCount=%d,IntervalInMs=%d.", b.SampleCount, b.IntervalInMs) - return false - } - - if b.IntervalInMs%b.SampleCount != 0 { - logger.Warnf("Invalid parameters, SampleCount=%d,IntervalInMs=%d.", b.SampleCount, b.IntervalInMs) - return false - } - return true -} - -func (b *ruleBase) String() string { - r, err := json.Marshal(b) - if err != nil { - // fallback string - return fmt.Sprintf("ruleBase{id=%s,resource=%s, strategy=%d, RecoverTimeout=%d, SampleCount=%d, IntervalInMs=%d}, err:%+v.", - b.Id, b.Resource, b.Strategy, b.RecoverTimeout, b.SampleCount, b.IntervalInMs, errors.WithStack(err)) - } - return string(r) -} - -func (b *ruleBase) BreakerStrategy() BreakerStrategy { - return b.Strategy -} - -func (b *ruleBase) ResourceName() string { - return b.Resource -} - -// Average Rt circuit breaker rule -type averageRtRule struct { - ruleBase - // the threshold of rt(ms) - Threshold float64 `json:"threshold"` - // if average rt > threshold && the count of request exceed RtSlowRequestAmount, then trigger circuit breaker - RtSlowRequestAmount int64 `json:"rtSlowRequestAmount"` -} - -func NewAverageRtRule(resource string, recoverTimeout int64, sampleCount, intervalInMs uint32, threshold float64, rtSlowRequestAmount int64) *averageRtRule { - return &averageRtRule{ - ruleBase: ruleBase{ - Id: util.NewUuid(), - Resource: resource, - Strategy: AverageRt, - RecoverTimeout: recoverTimeout, - SampleCount: sampleCount, - IntervalInMs: intervalInMs, - }, - Threshold: threshold, - RtSlowRequestAmount: rtSlowRequestAmount, - } -} - -func (r *averageRtRule) isApplicable() bool { - if !r.ruleBase.isApplicable() { - return false - } - if !(r.BreakerStrategy() == AverageRt && r.Threshold >= 0.0 && r.RtSlowRequestAmount >= 0) { - return false - } - return true -} - -func (r *averageRtRule) String() string { - ret, err := json.Marshal(r) - if err != nil { - // feedback string - return fmt.Sprintf("averageRtRule{ruleBase:%s, threshold=%f,rRtSlowRequestAmount=%d}, err:%+v.", - r.ruleBase.String(), r.Threshold, r.RtSlowRequestAmount, errors.WithStack(err)) - } - return string(ret) -} - -func (r *averageRtRule) convert2CircuitBreaker() CircuitBreaker { - return newAverageRtCircuitBreaker(r) -} - -// Error ratio circuit breaker rule -type errorRatioRule struct { - ruleBase - Threshold float64 `json:"threshold"` - // if request count < MinRequestAmount, pass the rule checker directly. - MinRequestAmount int64 `json:"minRequestAmount"` -} - -func NewErrorRatioRule(resource string, recoverTimeout int64, sampleCount, intervalInMs uint32, threshold float64, rtSlowRequestAmount int64) *errorRatioRule { - return &errorRatioRule{ - ruleBase: ruleBase{ - Id: util.NewUuid(), - Resource: resource, - Strategy: ErrorRatio, - RecoverTimeout: recoverTimeout, - SampleCount: sampleCount, - IntervalInMs: intervalInMs, - }, - Threshold: threshold, - MinRequestAmount: rtSlowRequestAmount, - } -} - -func (r *errorRatioRule) String() string { - ret, err := json.Marshal(r) - if err != nil { - // feedback string - return fmt.Sprintf("errorRatioRule{ruleBase:%s, threshold=%f, minRequestAmount=%d}, err:%+v.", - r.ruleBase.String(), r.Threshold, r.MinRequestAmount, errors.WithStack(err)) - } - return string(ret) -} - -func (r *errorRatioRule) isApplicable() bool { - if !r.ruleBase.isApplicable() { - return false - } - if !(r.BreakerStrategy() == ErrorRatio && r.Threshold >= float64(0.0) && r.MinRequestAmount >= 0) { - return false - } - return true -} - -func (r *errorRatioRule) convert2CircuitBreaker() CircuitBreaker { - return newErrorRatioCircuitBreaker(r) -} - -// Error count circuit breaker rule -type errorCountRule struct { - ruleBase - Threshold int64 `json:"threshold"` -} - -func NewErrorCountRule(resource string, recoverTimeout int64, sampleCount, intervalInMs uint32, threshold int64) *errorCountRule { - return &errorCountRule{ - ruleBase: ruleBase{ - Id: util.NewUuid(), - Resource: resource, - Strategy: ErrorCount, - RecoverTimeout: recoverTimeout, - SampleCount: sampleCount, - IntervalInMs: intervalInMs, - }, - Threshold: threshold, - } -} - -func (r *errorCountRule) String() string { - ret, err := json.Marshal(r) - if err != nil { - // feedback string - return fmt.Sprintf("errorCountRule{ruleBase:%s, threshold=%d} err:%+v.", - r.ruleBase.String(), r.Threshold, errors.WithStack(err)) - } - return string(ret) -} - -func (r *errorCountRule) isApplicable() bool { - if !r.ruleBase.isApplicable() { - return false - } - if !(r.BreakerStrategy() == ErrorCount && r.Threshold >= 0) { - return false - } - return true -} - -func (r *errorCountRule) convert2CircuitBreaker() CircuitBreaker { - return newErrorCountCircuitBreaker(r) -} diff --git a/core/circuit_breaker/rule_manager.go b/core/circuit_breaker/rule_manager.go deleted file mode 100644 index 1938f4153..000000000 --- a/core/circuit_breaker/rule_manager.go +++ /dev/null @@ -1,125 +0,0 @@ -package circuit_breaker - -import ( - "fmt" - "strings" - "sync" - - "github.com/alibaba/sentinel-golang/logging" -) - -var ( - logger = logging.GetDefaultLogger() -) - -var ( - breakerRules = make(map[string][]Rule) - breakers = make(map[string][]CircuitBreaker) - ruleMux = &sync.RWMutex{} -) - -func GetResRules(resource string) []Rule { - ruleMux.Lock() - ret, ok := breakerRules[resource] - ruleMux.Unlock() - if !ok { - ret = make([]Rule, 0) - } - return ret -} - -// Load the newer rules to manager. -// parameter: -// rules: the newer rules, if len of rules is 0, will clear all rules of manager. -// return value: -// bool: was designed to indicate whether the internal map has been changed -// error: was designed to indicate whether occurs the error. -func LoadRules(rules []Rule) (bool, error) { - // TODO in order to avoid invalid update, should check consistent with last update rules - err := onRuleUpdate(rules) - return true, err -} - -func getResBreakers(resource string) []CircuitBreaker { - ruleMux.Lock() - ret, ok := breakers[resource] - ruleMux.Unlock() - if !ok { - ret = make([]CircuitBreaker, 0) - } - return ret -} - -// Concurrent safe to update rules -func onRuleUpdate(rules []Rule) (err error) { - defer func() { - if r := recover(); r != nil { - var ok bool - err, ok = r.(error) - if !ok { - err = fmt.Errorf("%+v", r) - } - } - }() - newBreakerRules := make(map[string][]Rule) - - for _, rule := range rules { - if !rule.isApplicable() { - logger.Warnf("Ignoring invalid breaker rule when loading new rules, %+v.", rule) - continue - } - - classification := rule.ResourceName() - ruleSet, ok := newBreakerRules[classification] - if !ok { - ruleSet = make([]Rule, 0, 1) - } - ruleSet = append(ruleSet, rule) - newBreakerRules[classification] = ruleSet - } - - newBreakers := make(map[string][]CircuitBreaker) - for k, v := range newBreakerRules { - cbs := make([]CircuitBreaker, 0, len(v)) - for _, r := range v { - cbs = append(cbs, r.convert2CircuitBreaker()) - } - newBreakers[k] = cbs - } - - ruleMux.Lock() - breakerRules = newBreakerRules - breakers = newBreakers - ruleMux.Unlock() - - logRuleUpdate(newBreakerRules) - return nil -} - -func rulesFrom(rm map[string][]Rule) []Rule { - rules := make([]Rule, 0) - if len(rm) == 0 { - return rules - } - for _, rs := range rm { - if len(rs) == 0 { - continue - } - for _, r := range rs { - if r != nil { - rules = append(rules, r) - } - } - } - return rules -} - -func logRuleUpdate(rules map[string][]Rule) { - sb := strings.Builder{} - sb.WriteString("[CircuitBreakerRuleManager] succeed to load circuit breakers:\n") - for _, rule := range rulesFrom(rules) { - sb.WriteString(rule.String()) - sb.WriteString("\n") - } - logger.Info(sb.String()) -} diff --git a/core/circuit_breaker/rule_manager_test.go b/core/circuit_breaker/rule_manager_test.go deleted file mode 100644 index 023be5184..000000000 --- a/core/circuit_breaker/rule_manager_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package circuit_breaker - -import ( - "reflect" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_isApplicableRule(t *testing.T) { - type args struct { - rule Rule - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "averageRtBreakerRule_isApplicable", - args: args{ - rule: NewAverageRtRule("abc01", 1, 2, 1000, 100, 5), - }, - want: true, - }, - { - name: "errorRatioBreakerRule_isApplicable", - args: args{ - rule: NewErrorRatioRule("abc02", 1, 2, 1000, 0.3, 5), - }, - want: true, - }, - { - name: "errorCountBreakerRule_isApplicable", - args: args{ - rule: NewErrorCountRule("abc03", 1, 2, 1000, 10), - }, - want: true, - }, - { - name: "averageRtBreakerRule_isApplicable_false", - args: args{ - rule: NewAverageRtRule("abc01", 1, 2, 1000, -1.0, 5), - }, - want: false, - }, - { - name: "errorRatioBreakerRule_isApplicable_false", - args: args{ - rule: NewErrorRatioRule("abc02", 1, 2, 1000, -0.3, 5), - }, - want: false, - }, - { - name: "errorCountBreakerRule_isApplicable_false", - args: args{ - rule: NewErrorCountRule("abc03", 1, 2, 1000, -10), - }, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.args.rule.isApplicable(); got != tt.want { - t.Errorf("RuleManager.isApplicable() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_onUpdateRules(t *testing.T) { - type args struct { - rules []Rule - } - tests := []struct { - name string - args args - }{ - { - name: "Test_onUpdateRules", - args: args{ - rules: make([]Rule, 0, 3), - }, - }, - } - - averageRtRule := &ruleMock{} - averageRtBreaker := &circuitBreakerMock{} - averageRtRule.On("isApplicable").Return(true) - averageRtRule.On("BreakerStrategy").Return(AverageRt) - averageRtRule.On("String").Return("averageRtRule") - averageRtRule.On("ResourceName").Return("a") - averageRtRule.On("convert2CircuitBreaker").Return(averageRtBreaker) - averageRtBreaker.On("getRule").Return(averageRtRule) - tests[0].args.rules = append(tests[0].args.rules, averageRtRule) - - errRatioRule := &ruleMock{} - errRatioBreaker := &circuitBreakerMock{} - errRatioRule.On("isApplicable").Return(true) - errRatioRule.On("BreakerStrategy").Return(ErrorRatio) - errRatioRule.On("String").Return("errRatioRule") - errRatioRule.On("ResourceName").Return("a") - errRatioRule.On("convert2CircuitBreaker").Return(errRatioBreaker) - errRatioBreaker.On("getRule").Return(errRatioRule) - tests[0].args.rules = append(tests[0].args.rules, errRatioRule) - - errCountRule := &ruleMock{} - errCountBreaker := &circuitBreakerMock{} - errCountRule.On("isApplicable").Return(true) - errCountRule.On("BreakerStrategy").Return(ErrorCount) - errCountRule.On("String").Return("errCountRule") - errCountRule.On("ResourceName").Return("a") - errCountRule.On("convert2CircuitBreaker").Return(errCountBreaker) - errCountBreaker.On("getRule").Return(errCountRule) - tests[0].args.rules = append(tests[0].args.rules, errCountRule) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _ = onRuleUpdate(tt.args.rules) - assert.True(t, len(breakers["a"]) == 3) - assert.True(t, len(breakerRules["a"]) == 3) - for idx, breaker := range breakers["a"] { - reflect.DeepEqual(breaker.getRule(), tests[0].args.rules[idx]) - } - }) - } -} diff --git a/core/circuit_breaker/rule_test.go b/core/circuit_breaker/rule_test.go deleted file mode 100644 index bfa0c09a8..000000000 --- a/core/circuit_breaker/rule_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package circuit_breaker - -import "github.com/stretchr/testify/mock" - -type ruleMock struct { - mock.Mock -} - -func (m *ruleMock) String() string { - args := m.Called() - return args.String(0) -} - -func (m *ruleMock) ResourceName() string { - args := m.Called() - return args.String(0) -} - -func (m *ruleMock) BreakerStrategy() BreakerStrategy { - args := m.Called() - return args.Get(0).(BreakerStrategy) -} - -func (m *ruleMock) isApplicable() bool { - args := m.Called() - return args.Bool(0) -} - -func (m *ruleMock) convert2CircuitBreaker() CircuitBreaker { - args := m.Called() - return args.Get(0).(CircuitBreaker) -} diff --git a/core/circuitbreaker/circuit_breaker.go b/core/circuitbreaker/circuit_breaker.go new file mode 100644 index 000000000..ae82aada3 --- /dev/null +++ b/core/circuitbreaker/circuit_breaker.go @@ -0,0 +1,622 @@ +package circuitbreaker + +import ( + "sync/atomic" + "unsafe" + + "github.com/alibaba/sentinel-golang/core/base" + sbase "github.com/alibaba/sentinel-golang/core/stat/base" + "github.com/alibaba/sentinel-golang/util" +) + +type State int32 + +/** + Circuit Breaker State Machine: + + switch to open based on rule + +-----------------------------------------------------------------------+ + | | + | v ++----------------+ +----------------+ Probe +----------------+ +| | | |<----------------| | +| | Probe succeed | | | | +| Closed |<------------------| HalfOpen | | Open | +| | | | Probe failed | | +| | | +---------------->| | ++----------------+ +----------------+ +----------------+ +*/ +const ( + Closed State = iota + HalfOpen + Open +) + +func (s *State) String() string { + switch s.get() { + case Closed: + return "Closed" + case HalfOpen: + return "HalfOpen" + case Open: + return "Open" + default: + return "Undefined" + } +} + +func (s *State) get() State { + statePtr := (*int32)(unsafe.Pointer(s)) + return State(atomic.LoadInt32(statePtr)) +} +func (s *State) set(update State) { + statePtr := (*int32)(unsafe.Pointer(s)) + newState := int32(update) + atomic.StoreInt32(statePtr, newState) +} +func (s *State) casState(expect State, update State) bool { + statePtr := (*int32)(unsafe.Pointer(s)) + oldState := int32(expect) + newState := int32(update) + return atomic.CompareAndSwapInt32(statePtr, oldState, newState) +} + +type StateChangeListener interface { + OnChangeToClosed(prev State, rule Rule) + + OnChangeToOpen(prev State, rule Rule, snapshot interface{}) + + OnChangeToHalfOpen(prev State, rule Rule) +} + +type CircuitBreaker interface { + BoundRule() Rule + + BoundStat() interface{} + + TryPass(ctx *base.EntryContext) bool + + CurrentState() State + // HandleCompleted handle the entry completed, Will not call HandleCompleted if request is blocked. + // rt: the response time this entry cost. + HandleCompleted(rt uint64, err error) +} + +//================================= circuitBreakerBase ==================================== +type circuitBreakerBase struct { + rule Rule + retryTimeoutMs uint32 + nextRetryTimestamp uint64 + status *State +} + +func (b *circuitBreakerBase) BoundRule() Rule { + return b.rule +} + +func (b *circuitBreakerBase) CurrentState() State { + return b.status.get() +} + +func (b *circuitBreakerBase) retryTimeoutArrived() bool { + return util.CurrentTimeMillis() >= atomic.LoadUint64(&b.nextRetryTimestamp) +} + +func (b *circuitBreakerBase) updateNextRetryTimestamp() { + atomic.StoreUint64(&b.nextRetryTimestamp, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs)) +} + +// fromClosedToOpen update circuit breaker status machine from closed to open +// Used for opening circuit breaker from closed when checking circuit breaker +// return true if succeed to update +func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool { + if b.status.casState(Closed, Open) { + b.updateNextRetryTimestamp() + for _, listener := range statusSwitchListeners { + listener.OnChangeToOpen(Closed, b.rule, snapshot) + } + return true + } + return false +} + +// fromOpenToHalfOpen update circuit breaker status machine from open to half-open +// Used for probing +// return true if succeed to update +func (b *circuitBreakerBase) fromOpenToHalfOpen() bool { + if b.status.casState(Open, HalfOpen) { + for _, listener := range statusSwitchListeners { + listener.OnChangeToHalfOpen(Open, b.rule) + } + return true + } + return false +} + +// fromHalfOpenToOpen update circuit breaker status machine from half-open to open +// Used for failing to probe +// return true if succeed to update +func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool { + if b.status.casState(HalfOpen, Open) { + b.updateNextRetryTimestamp() + for _, listener := range statusSwitchListeners { + listener.OnChangeToOpen(HalfOpen, b.rule, snapshot) + } + return true + } + return false +} + +// fromHalfOpenToOpen update circuit breaker status machine from half-open to closed +// Used for succeeding to probe +// return true if succeed to update +func (b *circuitBreakerBase) fromHalfOpenToClosed() bool { + if b.status.casState(HalfOpen, Closed) { + for _, listener := range statusSwitchListeners { + listener.OnChangeToClosed(HalfOpen, b.rule) + } + return true + } + return false +} + +//================================= slowRtCircuitBreaker ==================================== +type slowRtCircuitBreaker struct { + circuitBreakerBase + stat *slowRequestLeapArray + maxAllowedRt uint64 + maxSlowRequestRatio float64 + minRequestAmount uint64 +} + +func newSlowRtCircuitBreakerWithStat(r *slowRtRule, stat *slowRequestLeapArray) *slowRtCircuitBreaker { + status := new(State) + status.set(Closed) + return &slowRtCircuitBreaker{ + circuitBreakerBase: circuitBreakerBase{ + rule: r, + retryTimeoutMs: r.RetryTimeoutMs, + nextRetryTimestamp: 0, + status: status, + }, + stat: stat, + maxAllowedRt: r.MaxAllowedRt, + maxSlowRequestRatio: r.MaxSlowRequestRatio, + minRequestAmount: r.MinRequestAmount, + } +} + +func newSlowRtCircuitBreaker(r *slowRtRule) *slowRtCircuitBreaker { + interval := r.StatIntervalMs + stat := &slowRequestLeapArray{} + stat.data = sbase.NewLeapArray(1, interval, stat) + + return newSlowRtCircuitBreakerWithStat(r, stat) +} + +func (b *slowRtCircuitBreaker) BoundStat() interface{} { + return b.stat +} + +// TryPass check circuit breaker based on status machine of circuit breaker. +func (b *slowRtCircuitBreaker) TryPass(_ *base.EntryContext) bool { + curStatus := b.CurrentState() + if curStatus == Closed { + return true + } else if curStatus == Open { + // switch status to half-open to probe if retry timeout + if b.retryTimeoutArrived() && b.fromOpenToHalfOpen() { + return true + } + } + return false +} + +func (b *slowRtCircuitBreaker) HandleCompleted(rt uint64, err error) { + // add slow and add total + metricStat := b.stat + counter := metricStat.currentCounter() + if rt > b.maxAllowedRt { + atomic.AddUint64(&counter.slowCount, 1) + } + atomic.AddUint64(&counter.totalCount, 1) + + slowCount := uint64(0) + totalCount := uint64(0) + counters := metricStat.allCounter() + for _, c := range counters { + slowCount += atomic.LoadUint64(&c.slowCount) + totalCount += atomic.LoadUint64(&c.totalCount) + } + slowRatio := float64(slowCount) / float64(totalCount) + + // handleStateChange + curStatus := b.CurrentState() + if curStatus == Open { + return + } else if curStatus == HalfOpen { + if rt > b.maxAllowedRt { + // fail to probe + b.fromHalfOpenToOpen(1.0) + } else { + // succeed to probe + b.fromHalfOpenToClosed() + b.resetMetric() + } + return + } + + // current state is CLOSED + if totalCount < b.minRequestAmount { + return + } + + if slowRatio > b.maxSlowRequestRatio { + curStatus = b.CurrentState() + switch curStatus { + case Closed: + b.fromClosedToOpen(slowRatio) + case HalfOpen: + b.fromHalfOpenToOpen(slowRatio) + default: + } + } + return +} + +func (b *slowRtCircuitBreaker) resetMetric() { + for _, c := range b.stat.allCounter() { + c.reset() + } +} + +type slowRequestCounter struct { + slowCount uint64 + totalCount uint64 +} + +func (c *slowRequestCounter) reset() { + atomic.StoreUint64(&c.slowCount, 0) + atomic.StoreUint64(&c.totalCount, 0) +} + +type slowRequestLeapArray struct { + data *sbase.LeapArray +} + +func (s *slowRequestLeapArray) NewEmptyBucket() interface{} { + return &slowRequestCounter{ + slowCount: 0, + totalCount: 0, + } +} + +func (s *slowRequestLeapArray) ResetBucketTo(bw *sbase.BucketWrap, startTime uint64) *sbase.BucketWrap { + atomic.StoreUint64(&bw.BucketStart, startTime) + bw.Value.Store(&slowRequestCounter{ + slowCount: 0, + totalCount: 0, + }) + return bw +} + +func (s *slowRequestLeapArray) currentCounter() *slowRequestCounter { + curBucket, err := s.data.CurrentBucket(s) + if err != nil { + logger.Errorf("Failed to get current bucket, current ts=%d, err: %+v.", util.CurrentTimeMillis(), err) + return nil + } + if curBucket == nil { + logger.Error("Current bucket is nil") + return nil + } + mb := curBucket.Value.Load() + if mb == nil { + logger.Error("Current bucket atomic Value is nil") + return nil + } + counter, ok := mb.(*slowRequestCounter) + if !ok { + logger.Error("Bucket data type error") + return nil + } + return counter +} + +func (s *slowRequestLeapArray) allCounter() []*slowRequestCounter { + buckets := s.data.Values() + ret := make([]*slowRequestCounter, 0) + for _, b := range buckets { + mb := b.Value.Load() + if mb == nil { + logger.Error("Current bucket atomic Value is nil") + continue + } + counter, ok := mb.(*slowRequestCounter) + if !ok { + logger.Error("Bucket data type error") + continue + } + ret = append(ret, counter) + } + return ret +} + +//================================= errorRatioCircuitBreaker ==================================== +type errorRatioCircuitBreaker struct { + circuitBreakerBase + minRequestAmount uint64 + errorRatioThreshold float64 + + stat *errorCounterLeapArray +} + +func newErrorRatioCircuitBreakerWithStat(r *errorRatioRule, stat *errorCounterLeapArray) *errorRatioCircuitBreaker { + status := new(State) + status.set(Closed) + + return &errorRatioCircuitBreaker{ + circuitBreakerBase: circuitBreakerBase{ + rule: r, + retryTimeoutMs: r.RetryTimeoutMs, + nextRetryTimestamp: 0, + status: status, + }, + minRequestAmount: r.MinRequestAmount, + errorRatioThreshold: r.Threshold, + stat: stat, + } +} + +func newErrorRatioCircuitBreaker(r *errorRatioRule) *errorRatioCircuitBreaker { + interval := r.StatIntervalMs + stat := &errorCounterLeapArray{} + stat.data = sbase.NewLeapArray(1, interval, stat) + + return newErrorRatioCircuitBreakerWithStat(r, stat) +} + +func (b *errorRatioCircuitBreaker) BoundStat() interface{} { + return b.stat +} + +func (b *errorRatioCircuitBreaker) TryPass(_ *base.EntryContext) bool { + curStatus := b.CurrentState() + if curStatus == Closed { + return true + } else if curStatus == Open { + // switch status to half-open to probe if retry timeout + if b.retryTimeoutArrived() && b.fromOpenToHalfOpen() { + return true + } + } + return false +} + +func (b *errorRatioCircuitBreaker) HandleCompleted(rt uint64, err error) { + metricStat := b.stat + counter := metricStat.currentCounter() + if err != nil { + atomic.AddUint64(&counter.errorCount, 1) + } + atomic.AddUint64(&counter.totalCount, 1) + + errorCount := uint64(0) + totalCount := uint64(0) + counters := metricStat.allCounter() + for _, c := range counters { + errorCount += atomic.LoadUint64(&c.errorCount) + totalCount += atomic.LoadUint64(&c.totalCount) + } + errorRatio := float64(errorCount) / float64(totalCount) + + // handleStateChangeWhenThresholdExceeded + curStatus := b.CurrentState() + if curStatus == Open { + return + } + if curStatus == HalfOpen { + if err == nil { + b.fromHalfOpenToClosed() + b.resetMetric() + } else { + b.fromHalfOpenToOpen(1.0) + } + return + } + + // current state is CLOSED + if totalCount < b.minRequestAmount { + return + } + if errorRatio > b.errorRatioThreshold { + curStatus = b.CurrentState() + switch curStatus { + case Closed: + b.fromClosedToOpen(errorRatio) + case HalfOpen: + b.fromHalfOpenToOpen(errorRatio) + default: + } + } +} + +func (b *errorRatioCircuitBreaker) resetMetric() { + for _, c := range b.stat.allCounter() { + c.reset() + } +} + +type errorCounter struct { + errorCount uint64 + totalCount uint64 +} + +func (c *errorCounter) reset() { + atomic.StoreUint64(&c.errorCount, 0) + atomic.StoreUint64(&c.totalCount, 0) +} + +type errorCounterLeapArray struct { + data *sbase.LeapArray +} + +func (s *errorCounterLeapArray) NewEmptyBucket() interface{} { + return &errorCounter{ + errorCount: 0, + totalCount: 0, + } +} + +func (s *errorCounterLeapArray) ResetBucketTo(bw *sbase.BucketWrap, startTime uint64) *sbase.BucketWrap { + atomic.StoreUint64(&bw.BucketStart, startTime) + bw.Value.Store(&errorCounter{ + errorCount: 0, + totalCount: 0, + }) + return bw +} + +func (s *errorCounterLeapArray) currentCounter() *errorCounter { + curBucket, err := s.data.CurrentBucket(s) + if err != nil { + logger.Errorf("Failed to get current bucket, current ts=%d, err: %+v.", util.CurrentTimeMillis(), err) + return nil + } + if curBucket == nil { + logger.Error("Current bucket is nil") + return nil + } + mb := curBucket.Value.Load() + if mb == nil { + logger.Error("Current bucket atomic Value is nil") + return nil + } + counter, ok := mb.(*errorCounter) + if !ok { + logger.Error("Bucket data type error") + return nil + } + return counter +} + +func (s *errorCounterLeapArray) allCounter() []*errorCounter { + buckets := s.data.Values() + ret := make([]*errorCounter, 0) + for _, b := range buckets { + mb := b.Value.Load() + if mb == nil { + logger.Error("Current bucket atomic Value is nil") + continue + } + counter, ok := mb.(*errorCounter) + if !ok { + logger.Error("Bucket data type error") + continue + } + ret = append(ret, counter) + } + return ret +} + +//================================= errorCountCircuitBreaker ==================================== +type errorCountCircuitBreaker struct { + circuitBreakerBase + minRequestAmount uint64 + errorCountThreshold uint64 + + stat *errorCounterLeapArray +} + +func newErrorCountCircuitBreakerWithStat(r *errorCountRule, stat *errorCounterLeapArray) *errorCountCircuitBreaker { + status := new(State) + status.set(Closed) + + return &errorCountCircuitBreaker{ + circuitBreakerBase: circuitBreakerBase{ + rule: r, + retryTimeoutMs: r.RetryTimeoutMs, + nextRetryTimestamp: 0, + status: status, + }, + minRequestAmount: r.MinRequestAmount, + errorCountThreshold: r.Threshold, + stat: stat, + } +} + +func newErrorCountCircuitBreaker(r *errorCountRule) *errorCountCircuitBreaker { + interval := r.StatIntervalMs + stat := &errorCounterLeapArray{} + stat.data = sbase.NewLeapArray(1, interval, stat) + + return newErrorCountCircuitBreakerWithStat(r, stat) +} + +func (b *errorCountCircuitBreaker) BoundStat() interface{} { + return b.stat +} + +func (b *errorCountCircuitBreaker) TryPass(_ *base.EntryContext) bool { + curStatus := b.CurrentState() + if curStatus == Closed { + return true + } else if curStatus == Open { + // switch status to half-open to probe if retry timeout + if b.retryTimeoutArrived() && b.fromOpenToHalfOpen() { + return true + } + } + return false +} + +func (b *errorCountCircuitBreaker) HandleCompleted(rt uint64, err error) { + metricStat := b.stat + counter := metricStat.currentCounter() + if err != nil { + atomic.AddUint64(&counter.errorCount, 1) + } + atomic.AddUint64(&counter.totalCount, 1) + + errorCount := uint64(0) + totalCount := uint64(0) + counters := metricStat.allCounter() + for _, c := range counters { + errorCount += atomic.LoadUint64(&c.errorCount) + totalCount += atomic.LoadUint64(&c.totalCount) + } + // handleStateChangeWhenThresholdExceeded + curStatus := b.CurrentState() + if curStatus == Open { + return + } + if curStatus == HalfOpen { + if err == nil { + b.fromHalfOpenToClosed() + b.resetMetric() + } else { + b.fromHalfOpenToOpen(1) + } + return + } + // current state is CLOSED + if totalCount < b.minRequestAmount { + return + } + if errorCount > b.errorCountThreshold { + curStatus = b.CurrentState() + switch curStatus { + case Closed: + b.fromClosedToOpen(errorCount) + case HalfOpen: + b.fromHalfOpenToOpen(errorCount) + default: + } + } +} + +func (b *errorCountCircuitBreaker) resetMetric() { + for _, c := range b.stat.allCounter() { + c.reset() + } +} diff --git a/core/circuitbreaker/circuit_breaker_test.go b/core/circuitbreaker/circuit_breaker_test.go new file mode 100644 index 000000000..0932e0de4 --- /dev/null +++ b/core/circuitbreaker/circuit_breaker_test.go @@ -0,0 +1,57 @@ +package circuitbreaker + +import ( + "testing" + + "github.com/alibaba/sentinel-golang/core/base" + + "github.com/stretchr/testify/mock" + + "github.com/stretchr/testify/assert" +) + +type CircuitBreakerMock struct { + mock.Mock +} + +func (m *CircuitBreakerMock) BoundRule() Rule { + args := m.Called() + return args.Get(0).(Rule) +} + +func (m *CircuitBreakerMock) TryPass(ctx *base.EntryContext) bool { + args := m.Called(ctx) + return args.Bool(0) +} + +func (m *CircuitBreakerMock) CurrentState() State { + args := m.Called() + return args.Get(0).(State) +} + +// HandleCompleted handle the entry completed +// rt: the response time this entry cost. +func (m *CircuitBreakerMock) HandleCompleted(rt uint64, err error) { + m.Called(rt, err) + return +} + +func TestStatus(t *testing.T) { + t.Run("get_set", func(t *testing.T) { + status := new(State) + assert.True(t, status.get() == Closed) + + status.set(Open) + assert.True(t, status.get() == Open) + }) + + t.Run("cas", func(t *testing.T) { + status := new(State) + assert.True(t, status.get() == Closed) + + assert.True(t, status.casState(Closed, Open)) + assert.True(t, !status.casState(Closed, Open)) + status.set(HalfOpen) + assert.True(t, status.casState(HalfOpen, Open)) + }) +} diff --git a/core/circuitbreaker/rule.go b/core/circuitbreaker/rule.go new file mode 100644 index 000000000..6990aaea8 --- /dev/null +++ b/core/circuitbreaker/rule.go @@ -0,0 +1,225 @@ +package circuitbreaker + +import ( + "fmt" + + "go.uber.org/multierr" + + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/util" + "github.com/pkg/errors" +) + +// The strategy of circuit breaker +// Each strategy represent one rule type +type Strategy int8 + +const ( + SlowRt Strategy = iota + ErrorRatio + ErrorCount +) + +func (s Strategy) String() string { + switch s { + case SlowRt: + return "SlowRt" + case ErrorRatio: + return "ErrorRatio" + case ErrorCount: + return "ErrorCount" + default: + return "Undefined" + } +} + +// The base interface of circuit breaker rule +type Rule interface { + base.SentinelRule + // return the strategy type + BreakerStrategy() Strategy + // check whether the rule is valid and could be converted to corresponding circuit breaker + IsApplicable() error + // return circuit breaker stat interval + BreakerStatIntervalMs() uint32 + // check whether is consistent with new rule + IsEqualsTo(r Rule) bool + // check whether the statistic of old circuit breaker is reuseable for new circuit breaker + IsStatReusable(r Rule) bool +} + +// The common fields of circuit breaker rule +type RuleBase struct { + // unique id + Id string + // resource name + Resource string + Strategy Strategy + // auto recover timeout in ms, all requests would be broken before auto recover + RetryTimeoutMs uint32 + MinRequestAmount uint64 + + StatIntervalMs uint32 +} + +func (b *RuleBase) BreakerStatIntervalMs() uint32 { + return b.StatIntervalMs +} + +func (b *RuleBase) IsApplicable() error { + if !(len(b.Resource) > 0 && b.RetryTimeoutMs >= 0 && b.MinRequestAmount >= 0 && b.StatIntervalMs >= 0) { + return errors.Errorf("Illegal parameters, Id=%s, Resource=%s, Strategy=%d, RetryTimeoutMs=%d, MinRequestAmount=%d, StatIntervalMs=%d.", + b.Id, b.Resource, b.Strategy, b.RetryTimeoutMs, b.MinRequestAmount, b.StatIntervalMs) + } + return nil +} + +func (b *RuleBase) IsStatReusable(r Rule) bool { + return b.Resource == r.ResourceName() && b.Strategy == r.BreakerStrategy() && b.StatIntervalMs == r.BreakerStatIntervalMs() +} + +func (b *RuleBase) String() string { + // fallback string + return fmt.Sprintf("RuleBase{id=%s,resource=%s, strategy=%+v, RetryTimeoutMs=%d, MinRequestAmount=%d}", + b.Id, b.Resource, b.Strategy, b.RetryTimeoutMs, b.MinRequestAmount) +} + +func (b *RuleBase) BreakerStrategy() Strategy { + return b.Strategy +} + +func (b *RuleBase) ResourceName() string { + return b.Resource +} + +// SlowRt circuit breaker rule +type slowRtRule struct { + RuleBase + // max allowed rt in ms + MaxAllowedRt uint64 + MaxSlowRequestRatio float64 +} + +func NewSlowRtRule(resource string, intervalMs uint32, retryTimeoutMs uint32, maxAllowedRt, minRequestAmount uint64, maxSlowRequestRatio float64) *slowRtRule { + return &slowRtRule{ + RuleBase: RuleBase{ + Id: util.NewUuid(), + Resource: resource, + Strategy: SlowRt, + RetryTimeoutMs: retryTimeoutMs, + MinRequestAmount: minRequestAmount, + StatIntervalMs: intervalMs, + }, + MaxAllowedRt: maxAllowedRt, + MaxSlowRequestRatio: maxSlowRequestRatio, + } +} + +func (r *slowRtRule) IsEqualsTo(newRule Rule) bool { + newSlowRtRule, ok := newRule.(*slowRtRule) + if !ok { + return false + } + return r.Resource == newSlowRtRule.Resource && r.Strategy == newSlowRtRule.Strategy && r.RetryTimeoutMs == newSlowRtRule.RetryTimeoutMs && + r.MinRequestAmount == newSlowRtRule.MinRequestAmount && r.StatIntervalMs == newSlowRtRule.StatIntervalMs && + r.MaxAllowedRt == newSlowRtRule.MaxAllowedRt && r.MaxSlowRequestRatio == newSlowRtRule.MaxSlowRequestRatio +} + +func (r *slowRtRule) IsApplicable() error { + baseApplicableError := r.RuleBase.IsApplicable() + var slowRtError error + if !(r.MaxSlowRequestRatio >= 0.0 && r.MaxAllowedRt >= 0) { + slowRtError = errors.Errorf("Illegal parameters in slowRtRule, MaxSlowRequestRatio: %f, MaxAllowedRt: %d", r.MaxSlowRequestRatio, r.MaxAllowedRt) + } + return multierr.Append(baseApplicableError, slowRtError) +} + +func (r *slowRtRule) String() string { + return fmt.Sprintf("slowRtRule{RuleBase:%s, MaxAllowedRt=%d, MaxSlowRequestRatio=%f}", r.RuleBase.String(), r.MaxAllowedRt, r.MaxSlowRequestRatio) +} + +// Error ratio circuit breaker rule +type errorRatioRule struct { + RuleBase + Threshold float64 +} + +func NewErrorRatioRule(resource string, intervalMs uint32, retryTimeoutMs uint32, minRequestAmount uint64, maxErrorRatio float64) *errorRatioRule { + return &errorRatioRule{ + RuleBase: RuleBase{ + Id: util.NewUuid(), + Resource: resource, + Strategy: ErrorRatio, + RetryTimeoutMs: retryTimeoutMs, + MinRequestAmount: minRequestAmount, + StatIntervalMs: intervalMs, + }, + Threshold: maxErrorRatio, + } +} + +func (r *errorRatioRule) String() string { + return fmt.Sprintf("errorRatioRule{RuleBase:%s, Threshold=%f}", r.RuleBase.String(), r.Threshold) +} + +func (r *errorRatioRule) IsEqualsTo(newRule Rule) bool { + newErrorRatioRule, ok := newRule.(*errorRatioRule) + if !ok { + return false + } + return r.Resource == newErrorRatioRule.Resource && r.Strategy == newErrorRatioRule.Strategy && r.RetryTimeoutMs == newErrorRatioRule.RetryTimeoutMs && + r.MinRequestAmount == newErrorRatioRule.MinRequestAmount && r.StatIntervalMs == newErrorRatioRule.StatIntervalMs && + r.Threshold == newErrorRatioRule.Threshold +} + +func (r *errorRatioRule) IsApplicable() error { + baseApplicableError := r.RuleBase.IsApplicable() + var errorRatioRuleError error + if !(r.Threshold >= 0.0) { + errorRatioRuleError = errors.Errorf("Illegal parameters in errorRatioRule, Threshold: %f.", r.Threshold) + } + return multierr.Append(baseApplicableError, errorRatioRuleError) +} + +// Error count circuit breaker rule +type errorCountRule struct { + RuleBase + Threshold uint64 +} + +func NewErrorCountRule(resource string, intervalMs uint32, retryTimeoutMs uint32, minRequestAmount, maxErrorCount uint64) *errorCountRule { + return &errorCountRule{ + RuleBase: RuleBase{ + Id: util.NewUuid(), + Resource: resource, + Strategy: ErrorCount, + RetryTimeoutMs: retryTimeoutMs, + MinRequestAmount: minRequestAmount, + StatIntervalMs: intervalMs, + }, + Threshold: maxErrorCount, + } +} + +func (r *errorCountRule) String() string { + return fmt.Sprintf("errorCountRule{RuleBase:%s, Threshold=%d}", r.RuleBase.String(), r.Threshold) +} + +func (r *errorCountRule) IsEqualsTo(newRule Rule) bool { + newErrorCountRule, ok := newRule.(*errorCountRule) + if !ok { + return false + } + return r.Resource == newErrorCountRule.Resource && r.Strategy == newErrorCountRule.Strategy && r.RetryTimeoutMs == newErrorCountRule.RetryTimeoutMs && + r.MinRequestAmount == newErrorCountRule.MinRequestAmount && r.StatIntervalMs == newErrorCountRule.StatIntervalMs && + r.Threshold == newErrorCountRule.Threshold +} + +func (r *errorCountRule) IsApplicable() error { + baseApplicableError := r.RuleBase.IsApplicable() + var errorCountRuleError error + if !(r.Threshold >= 0) { + errorCountRuleError = errors.Errorf("Illegal parameters in errorCountRule, Threshold: %d.", r.Threshold) + } + return multierr.Append(baseApplicableError, errorCountRuleError) +} diff --git a/core/circuitbreaker/rule_manager.go b/core/circuitbreaker/rule_manager.go new file mode 100644 index 000000000..5608a1767 --- /dev/null +++ b/core/circuitbreaker/rule_manager.go @@ -0,0 +1,286 @@ +package circuitbreaker + +import ( + "fmt" + "strings" + "sync" + + "github.com/alibaba/sentinel-golang/logging" + "github.com/pkg/errors" +) + +type CircuitBreakerGenFunc func(r Rule, reuseStat interface{}) CircuitBreaker + +var ( + logger = logging.GetDefaultLogger() + cbGenFuncMap = make(map[Strategy]CircuitBreakerGenFunc) + + breakerRules = make(map[string][]Rule) + breakers = make(map[string][]CircuitBreaker) + updateMux = &sync.RWMutex{} + + statusSwitchListeners = make([]StateChangeListener, 0) +) + +func init() { + cbGenFuncMap[SlowRt] = func(r Rule, reuseStat interface{}) CircuitBreaker { + rtRule, ok := r.(*slowRtRule) + if !ok || rtRule == nil { + return nil + } + if reuseStat == nil { + return newSlowRtCircuitBreaker(rtRule) + } + stat, ok := reuseStat.(*slowRequestLeapArray) + if !ok || stat == nil { + logger.Warnf("Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*slowRequestLeapArray, in fact: %+v", stat) + return newSlowRtCircuitBreaker(rtRule) + } + return newSlowRtCircuitBreakerWithStat(rtRule, stat) + } + + cbGenFuncMap[ErrorRatio] = func(r Rule, reuseStat interface{}) CircuitBreaker { + errRatioRule, ok := r.(*errorRatioRule) + if !ok || errRatioRule == nil { + return nil + } + if reuseStat == nil { + return newErrorRatioCircuitBreaker(errRatioRule) + } + stat, ok := reuseStat.(*errorCounterLeapArray) + if !ok || stat == nil { + logger.Warnf("Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*errorCounterLeapArray, in fact: %+v", stat) + return newErrorRatioCircuitBreaker(errRatioRule) + } + return newErrorRatioCircuitBreakerWithStat(errRatioRule, stat) + } + + cbGenFuncMap[ErrorCount] = func(r Rule, reuseStat interface{}) CircuitBreaker { + errCountRule, ok := r.(*errorCountRule) + if !ok || errCountRule == nil { + return nil + } + if reuseStat == nil { + return newErrorCountCircuitBreaker(errCountRule) + } + stat, ok := reuseStat.(*errorCounterLeapArray) + if !ok || stat == nil { + logger.Warnf("Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*errorCounterLeapArray, in fact: %+v", stat) + return newErrorCountCircuitBreaker(errCountRule) + } + return newErrorCountCircuitBreakerWithStat(errCountRule, stat) + } +} + +func GetResRules(resource string) []Rule { + updateMux.RLock() + ret, ok := breakerRules[resource] + updateMux.RUnlock() + if !ok { + ret = make([]Rule, 0) + } + return ret +} + +// Load the newer rules to manager. +// rules: the newer rules, if len of rules is 0, will clear all rules of manager. +// return value: +// bool: was designed to indicate whether the internal map has been changed +// error: was designed to indicate whether occurs the error. +func LoadRules(rules []Rule) (bool, error) { + // TODO in order to avoid invalid update, should check consistent with last update rules + err := onRuleUpdate(rules) + return true, err +} + +func getResBreakers(resource string) []CircuitBreaker { + ret := make([]CircuitBreaker, 0) + updateMux.RLock() + resCBs := breakers[resource] + updateMux.RUnlock() + if len(resCBs) == 0 { + return ret + } + ret = append(ret, resCBs...) + return ret +} + +// Concurrent safe to update rules +func onRuleUpdate(rules []Rule) (err error) { + defer func() { + if r := recover(); r != nil { + var ok bool + err, ok = r.(error) + if !ok { + err = fmt.Errorf("%+v", r) + } + } + }() + newBreakerRules := make(map[string][]Rule) + + for _, rule := range rules { + if rule == nil { + continue + } + if err := rule.IsApplicable(); err != nil { + logger.Warnf("Ignoring invalid breaker rule when loading new rules,rule: %+v, err: %+v.", rule, err) + continue + } + + classification := rule.ResourceName() + ruleSet, ok := newBreakerRules[classification] + if !ok { + ruleSet = make([]Rule, 0, 1) + } + ruleSet = append(ruleSet, rule) + newBreakerRules[classification] = ruleSet + } + + newBreakers := make(map[string][]CircuitBreaker) + + updateMux.Lock() + defer updateMux.Unlock() + + for res, resRules := range newBreakerRules { + emptyCircuitBreakerList := make([]CircuitBreaker, 0, 0) + for _, r := range resRules { + oldResCbs := breakers[res] + if oldResCbs == nil { + oldResCbs = emptyCircuitBreakerList + } + equalsIdx := -1 + reuseStatIdx := -1 + for idx, cb := range oldResCbs { + oldRule := cb.BoundRule() + if oldRule.IsEqualsTo(r) { + equalsIdx = idx + break + } + if !oldRule.IsStatReusable(r) { + continue + } + if reuseStatIdx >= 0 { + // had find reuse rule. + continue + } + reuseStatIdx = idx + } + + // First check equals scenario + if equalsIdx >= 0 { + // reuse the old cb + reuseOldCb := oldResCbs[equalsIdx] + cbsOfRes, ok := newBreakers[res] + if !ok { + cbsOfRes = make([]CircuitBreaker, 0, 1) + newBreakers[res] = append(cbsOfRes, reuseOldCb) + } else { + newBreakers[res] = append(cbsOfRes, reuseOldCb) + } + // remove old cb from oldResCbs + oldResCbs = append(oldResCbs[:equalsIdx], oldResCbs[equalsIdx+1:]...) + breakers[res] = oldResCbs + continue + } + + generator := cbGenFuncMap[r.BreakerStrategy()] + if generator == nil { + logger.Warnf("Circuit Breaker Generator for %+resRules is not existed.", r.BreakerStrategy()) + continue + } + + var cb CircuitBreaker + if reuseStatIdx >= 0 { + cb = generator(r, oldResCbs[reuseStatIdx].BoundStat()) + } else { + cb = generator(r, nil) + } + if cb == nil { + logger.Warnf("Fail to generate Circuit Breaker for rule: %+resRules.", r) + continue + } + + if reuseStatIdx >= 0 { + breakers[res] = append(oldResCbs[:reuseStatIdx], oldResCbs[reuseStatIdx+1:]...) + } + cbsOfRes, ok := newBreakers[res] + if !ok { + cbsOfRes = make([]CircuitBreaker, 0, 1) + newBreakers[res] = append(cbsOfRes, cb) + } else { + newBreakers[res] = append(cbsOfRes, cb) + } + } + } + + breakerRules = newBreakerRules + breakers = newBreakers + + logRuleUpdate(newBreakerRules) + return nil +} + +func rulesFrom(rm map[string][]Rule) []Rule { + rules := make([]Rule, 0) + if len(rm) == 0 { + return rules + } + for _, rs := range rm { + if len(rs) == 0 { + continue + } + for _, r := range rs { + if r != nil { + rules = append(rules, r) + } + } + } + return rules +} + +func logRuleUpdate(rules map[string][]Rule) { + sb := strings.Builder{} + sb.WriteString("[CircuitBreakerRuleManager] succeed to load circuit breakers:\n") + for _, rule := range rulesFrom(rules) { + sb.WriteString(rule.String()) + sb.WriteString("\n") + } + logger.Info(sb.String()) +} + +func RegisterStatusSwitchListeners(listeners ...StateChangeListener) { + if len(listeners) == 0 { + return + } + updateMux.Lock() + defer updateMux.Unlock() + + statusSwitchListeners = append(statusSwitchListeners, listeners...) +} + +// SetTrafficShapingGenerator sets the traffic controller generator for the given control behavior. +// Note that modifying the generator of default control behaviors is not allowed. +func SetTrafficShapingGenerator(s Strategy, generator CircuitBreakerGenFunc) error { + if generator == nil { + return errors.New("nil generator") + } + if s >= SlowRt && s <= ErrorCount { + return errors.New("not allowed to replace the generator for default control behaviors") + } + updateMux.Lock() + defer updateMux.Unlock() + + cbGenFuncMap[s] = generator + return nil +} + +func RemoveTrafficShapingGenerator(s Strategy) error { + if s >= SlowRt && s <= ErrorCount { + return errors.New("not allowed to replace the generator for default control behaviors") + } + updateMux.Lock() + defer updateMux.Unlock() + + delete(cbGenFuncMap, s) + return nil +} diff --git a/core/circuitbreaker/rule_manager_test.go b/core/circuitbreaker/rule_manager_test.go new file mode 100644 index 000000000..aa275ca45 --- /dev/null +++ b/core/circuitbreaker/rule_manager_test.go @@ -0,0 +1,116 @@ +package circuitbreaker + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_isApplicableRule_valid(t *testing.T) { + type args struct { + rule Rule + } + tests := []struct { + name string + args args + want error + }{ + { + name: "rtRule_isApplicable", + args: args{ + rule: NewSlowRtRule("abc01", 1000, 1, 20, 5, 0.1), + }, + want: nil, + }, + { + name: "errorRatioRule_isApplicable", + args: args{ + rule: NewErrorRatioRule("abc02", 1000, 1, 5, 0.3), + }, + want: nil, + }, + { + name: "errorCountRule_isApplicable", + args: args{ + rule: NewErrorCountRule("abc03", 1000, 1, 5, 10), + }, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.args.rule.IsApplicable(); got != tt.want { + t.Errorf("RuleManager.IsApplicable() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_isApplicableRule_invalid(t *testing.T) { + t.Run("rtBreakerRule_isApplicable_false", func(t *testing.T) { + rule := NewSlowRtRule("abc01", 1000, 1, 5, 10050, -1.0) + if got := rule.IsApplicable(); got == nil { + t.Errorf("RuleManager.IsApplicable() = %v", got) + } + }) + t.Run("errorRatioRule_isApplicable_false", func(t *testing.T) { + rule := NewErrorRatioRule("abc02", 1000, 1, 5, -0.3) + if got := rule.IsApplicable(); got == nil { + t.Errorf("RuleManager.IsApplicable() = %v", got) + } + }) + t.Run("errorCountRule_isApplicable_false", func(t *testing.T) { + rule := NewErrorCountRule("", 1000, 1, 5, 0) + if got := rule.IsApplicable(); got == nil { + t.Errorf("RuleManager.IsApplicable() = %v", got) + } + }) +} + +func Test_onUpdateRules(t *testing.T) { + t.Run("Test_onUpdateRules", func(t *testing.T) { + rules := make([]Rule, 0) + rules = append(rules, NewSlowRtRule("abc01", 1000, 1, 20, 5, 0.1)) + rules = append(rules, NewErrorRatioRule("abc01", 1000, 1, 5, 0.3)) + rules = append(rules, NewErrorCountRule("abc01", 1000, 1, 5, 10)) + err := onRuleUpdate(rules) + if err != nil { + t.Fatal(err) + } + assert.True(t, len(breakers["abc01"]) == 3) + assert.True(t, len(breakerRules["abc01"]) == 3) + breakers = make(map[string][]CircuitBreaker) + breakerRules = make(map[string][]Rule) + }) +} + +func Test_onRuleUpdate(t *testing.T) { + t.Run("Test_onRuleUpdate", func(t *testing.T) { + r1 := NewSlowRtRule("abc", 1000, 1, 20, 5, 0.1) + r2 := NewErrorRatioRule("abc", 1000, 1, 5, 0.3) + r3 := NewErrorCountRule("abc", 1000, 1, 5, 10) + _, _ = LoadRules([]Rule{r1, r2, r3}) + b2 := breakers["abc"][1] + + assert.True(t, len(breakers) == 1) + assert.True(t, len(breakers["abc"]) == 3) + assert.True(t, reflect.DeepEqual(breakers["abc"][0].BoundRule(), r1)) + assert.True(t, reflect.DeepEqual(breakers["abc"][1].BoundRule(), r2)) + assert.True(t, reflect.DeepEqual(breakers["abc"][2].BoundRule(), r3)) + + r4 := NewSlowRtRule("abc", 1000, 1, 20, 5, 0.1) + r5 := NewErrorRatioRule("abc", 1000, 100, 25, 0.5) + r6 := NewErrorCountRule("abc", 100, 1, 5, 10) + r7 := NewErrorCountRule("abc", 1100, 1, 5, 10) + + _, _ = LoadRules([]Rule{r4, r5, r6, r7}) + assert.True(t, len(breakers) == 1) + newCbs := breakers["abc"] + assert.True(t, len(newCbs) == 4, "Expect:4, in fact:", len(newCbs)) + assert.True(t, reflect.DeepEqual(newCbs[0].BoundRule(), r1)) + assert.True(t, reflect.DeepEqual(newCbs[1].BoundStat(), b2.BoundStat())) + assert.True(t, reflect.DeepEqual(newCbs[2].BoundRule(), r6)) + assert.True(t, reflect.DeepEqual(newCbs[3].BoundRule(), r7)) + }) +} diff --git a/core/circuitbreaker/rule_test.go b/core/circuitbreaker/rule_test.go new file mode 100644 index 000000000..d9c31822d --- /dev/null +++ b/core/circuitbreaker/rule_test.go @@ -0,0 +1,43 @@ +package circuitbreaker + +import ( + "github.com/stretchr/testify/mock" +) + +type RuleMock struct { + mock.Mock +} + +func (m *RuleMock) String() string { + args := m.Called() + return args.String(0) +} + +func (m *RuleMock) ResourceName() string { + args := m.Called() + return args.String(0) +} + +func (m *RuleMock) BreakerStrategy() Strategy { + args := m.Called() + return args.Get(0).(Strategy) +} +func (m *RuleMock) BreakerStatIntervalMs() uint32 { + args := m.Called() + return uint32(args.Int(0)) +} + +func (m *RuleMock) IsEqualsTo(r Rule) bool { + args := m.Called(r) + return args.Bool(0) +} + +func (m *RuleMock) IsStatReusable(r Rule) bool { + args := m.Called(r) + return args.Bool(0) +} + +func (m *RuleMock) IsApplicable() error { + args := m.Called() + return args.Get(0).(error) +} diff --git a/core/circuit_breaker/slot.go b/core/circuitbreaker/slot.go similarity index 96% rename from core/circuit_breaker/slot.go rename to core/circuitbreaker/slot.go index 055bb8ec2..8fef74967 100644 --- a/core/circuit_breaker/slot.go +++ b/core/circuitbreaker/slot.go @@ -1,4 +1,4 @@ -package circuit_breaker +package circuitbreaker import ( "github.com/alibaba/sentinel-golang/core/base" diff --git a/core/circuitbreaker/stat_slot.go b/core/circuitbreaker/stat_slot.go new file mode 100644 index 000000000..6d1dc41b2 --- /dev/null +++ b/core/circuitbreaker/stat_slot.go @@ -0,0 +1,32 @@ +package circuitbreaker + +import ( + "github.com/alibaba/sentinel-golang/core/base" +) + +// MetricStatSlot add statistic metric for circuit breaker +// statistic is based on completed. +type MetricStatSlot struct { +} + +func (c *MetricStatSlot) OnEntryPassed(_ *base.EntryContext) { + // Do nothing + return +} + +func (c *MetricStatSlot) OnEntryBlocked(_ *base.EntryContext, _ *base.BlockError) { + // Do nothing + return +} + +func (c *MetricStatSlot) OnCompleted(ctx *base.EntryContext) { + if ctx.RuleCheckResult == nil || ctx.RuleCheckResult.IsBlocked() { + return + } + res := ctx.Resource.Name() + err := ctx.Err() + rt := ctx.Rt() + for _, cb := range getResBreakers(res) { + cb.HandleCompleted(rt, err) + } +} diff --git a/core/stat/base/atomic_window_wrap_array_test.go b/core/stat/base/atomic_window_wrap_array_test.go index 776f2a1d2..4921ef6c8 100644 --- a/core/stat/base/atomic_window_wrap_array_test.go +++ b/core/stat/base/atomic_window_wrap_array_test.go @@ -15,12 +15,12 @@ func Test_newAtomicBucketWrapArray_normal(t *testing.T) { type args struct { len int bucketLengthInMs uint32 - bg bucketGenerator + bg BucketGenerator } tests := []struct { name string args args - want *atomicBucketWrapArray + want *AtomicBucketWrapArray }{ { name: "Test_newAtomicBucketWrapArray_normal", @@ -35,9 +35,9 @@ func Test_newAtomicBucketWrapArray_normal(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ret := newAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + ret := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) if ret == nil || uintptr(ret.base) == uintptr(0) || ret.length != tt.args.len || ret.data == nil || len(ret.data) == 0 { - t.Errorf("newAtomicBucketWrapArray() %+v is illegal.\n", ret) + t.Errorf("NewAtomicBucketWrapArray() %+v is illegal.\n", ret) return } dataNil := false @@ -48,7 +48,7 @@ func Test_newAtomicBucketWrapArray_normal(t *testing.T) { } } if dataNil { - t.Error("newAtomicBucketWrapArray exists nil bucketWrap.") + t.Error("NewAtomicBucketWrapArray exists nil BucketWrap.") } }) @@ -59,7 +59,7 @@ func Test_atomicBucketWrapArray_elementOffset(t *testing.T) { type args struct { len int bucketLengthInMs uint32 - bg bucketGenerator + bg BucketGenerator idx int } tests := []struct { @@ -80,9 +80,9 @@ func Test_atomicBucketWrapArray_elementOffset(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - aa := newAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) if got := uintptr(aa.elementOffset(tt.args.idx)) - uintptr(aa.base); got != tt.want { - t.Errorf("atomicBucketWrapArray.elementOffset() = %v, want %v \n", got, tt.want) + t.Errorf("AtomicBucketWrapArray.elementOffset() = %v, want %v \n", got, tt.want) } }) } @@ -92,13 +92,13 @@ func Test_atomicBucketWrapArray_get(t *testing.T) { type args struct { len int bucketLengthInMs uint32 - bg bucketGenerator + bg BucketGenerator idx int } tests := []struct { name string args args - want *bucketWrap + want *BucketWrap }{ { name: "Test_atomicBucketWrapArray_get", @@ -113,10 +113,10 @@ func Test_atomicBucketWrapArray_get(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - aa := newAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) tt.want = aa.data[9] if got := aa.get(tt.args.idx); !reflect.DeepEqual(got, tt.want) { - t.Errorf("atomicBucketWrapArray.get() = %v, want %v", got, tt.want) + t.Errorf("AtomicBucketWrapArray.get() = %v, want %v", got, tt.want) } }) } @@ -126,7 +126,7 @@ func Test_atomicBucketWrapArray_compareAndSet(t *testing.T) { type args struct { len int bucketLengthInMs uint32 - bg bucketGenerator + bg BucketGenerator idx int } tests := []struct { @@ -147,31 +147,31 @@ func Test_atomicBucketWrapArray_compareAndSet(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - aa := newAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) - update := &bucketWrap{ - bucketStart: 8888888888888, - value: atomic.Value{}, + aa := NewAtomicBucketWrapArray(tt.args.len, tt.args.bucketLengthInMs, tt.args.bg) + update := &BucketWrap{ + BucketStart: 8888888888888, + Value: atomic.Value{}, } - update.value.Store(int64(666666)) + update.Value.Store(int64(666666)) except := aa.get(9) if got := aa.compareAndSet(tt.args.idx, except, update); got != tt.want { - t.Errorf("atomicBucketWrapArray.compareAndSet() = %v, want %v", got, tt.want) + t.Errorf("AtomicBucketWrapArray.compareAndSet() = %v, want %v", got, tt.want) } if !reflect.DeepEqual(aa.get(9), update) { - t.Errorf("atomicBucketWrapArray.compareAndSet() update fail") + t.Errorf("AtomicBucketWrapArray.compareAndSet() update fail") } }) } } -func taskGet(wg *sync.WaitGroup, at *atomicBucketWrapArray, t *testing.T) { +func taskGet(wg *sync.WaitGroup, at *AtomicBucketWrapArray, t *testing.T) { time.Sleep(time.Millisecond * 3) idx := rand.Int() % 20 wwPtr := at.get(idx) - vInterface := wwPtr.value.Load() + vInterface := wwPtr.Value.Load() vp, ok := vInterface.(*int64) if !ok { - t.Error("bucketWrap value assert fail.\n") + t.Error("BucketWrap Value assert fail.\n") } v := atomic.LoadInt64(vp) newV := v + 1 @@ -183,11 +183,11 @@ func taskGet(wg *sync.WaitGroup, at *atomicBucketWrapArray, t *testing.T) { } func Test_atomicBucketWrapArray_Concurrency_Get(t *testing.T) { - ret := newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}) + ret := NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}) for _, ww := range ret.data { c := new(int64) *c = 0 - ww.value.Store(c) + ww.Value.Store(c) } const GoroutineNum = 1000 wg1 := &sync.WaitGroup{} @@ -198,7 +198,7 @@ func Test_atomicBucketWrapArray_Concurrency_Get(t *testing.T) { wg1.Wait() sum := int64(0) for _, ww := range ret.data { - val := ww.value.Load() + val := ww.Value.Load() count, ok := val.(*int64) if !ok { t.Error("assert error") @@ -211,7 +211,7 @@ func Test_atomicBucketWrapArray_Concurrency_Get(t *testing.T) { t.Log("all done") } -func taskSet(wg *sync.WaitGroup, at *atomicBucketWrapArray, t *testing.T) { +func taskSet(wg *sync.WaitGroup, at *AtomicBucketWrapArray, t *testing.T) { time.Sleep(time.Millisecond * 3) idx := rand.Int() % 20 ww := at.get(idx) @@ -219,9 +219,9 @@ func taskSet(wg *sync.WaitGroup, at *atomicBucketWrapArray, t *testing.T) { *bucket = 100 val := atomic.Value{} val.Store(bucket) - replace := &bucketWrap{ - bucketStart: util.CurrentTimeMillis(), - value: val, + replace := &BucketWrap{ + BucketStart: util.CurrentTimeMillis(), + Value: val, } for !at.compareAndSet(idx, ww, replace) { ww = at.get(idx) @@ -230,11 +230,11 @@ func taskSet(wg *sync.WaitGroup, at *atomicBucketWrapArray, t *testing.T) { } func Test_atomicBucketWrapArray_Concurrency_Set(t *testing.T) { - ret := newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}) + ret := NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}) for _, ww := range ret.data { c := new(int64) *c = 0 - ww.value.Store(c) + ww.Value.Store(c) } const GoroutineNum = 1000 wg2 := &sync.WaitGroup{} @@ -245,7 +245,7 @@ func Test_atomicBucketWrapArray_Concurrency_Set(t *testing.T) { } wg2.Wait() for _, ww := range ret.data { - v := ww.value.Load() + v := ww.Value.Load() val, ok := v.(*int64) if !ok || *val != 100 { t.Error("assert error") diff --git a/core/stat/base/bucket_leap_array.go b/core/stat/base/bucket_leap_array.go index 308467a5c..ee75bf683 100644 --- a/core/stat/base/bucket_leap_array.go +++ b/core/stat/base/bucket_leap_array.go @@ -16,10 +16,20 @@ var logger = logging.GetDefaultLogger() // and MetricBucket (as the data type). The MetricBucket is used to record statistic // metrics per minimum time unit (i.e. the bucket time span). type BucketLeapArray struct { - data leapArray + data LeapArray dataType string } +func (bla *BucketLeapArray) NewEmptyBucket() interface{} { + return NewMetricBucket() +} + +func (bla *BucketLeapArray) ResetBucketTo(bw *BucketWrap, startTime uint64) *BucketWrap { + atomic.StoreUint64(&bw.BucketStart, startTime) + bw.Value.Store(NewMetricBucket()) + return bw +} + // sampleCount is the number of slots // intervalInMs is the time length of sliding window func NewBucketLeapArray(sampleCount uint32, intervalInMs uint32) *BucketLeapArray { @@ -28,7 +38,7 @@ func NewBucketLeapArray(sampleCount uint32, intervalInMs uint32) *BucketLeapArra } bucketLengthInMs := intervalInMs / sampleCount ret := &BucketLeapArray{ - data: leapArray{ + data: LeapArray{ bucketLengthInMs: bucketLengthInMs, sampleCount: sampleCount, intervalInMs: intervalInMs, @@ -36,7 +46,7 @@ func NewBucketLeapArray(sampleCount uint32, intervalInMs uint32) *BucketLeapArra }, dataType: "MetricBucket", } - arr := newAtomicBucketWrapArray(int(sampleCount), bucketLengthInMs, ret) + arr := NewAtomicBucketWrapArray(int(sampleCount), bucketLengthInMs, ret) ret.data.array = arr return ret } @@ -61,16 +71,6 @@ func (bla *BucketLeapArray) GetIntervalInSecond() float64 { return float64(bla.IntervalInMs()) / 1000.0 } -func (bla *BucketLeapArray) newEmptyBucket() interface{} { - return NewMetricBucket() -} - -func (bla *BucketLeapArray) resetBucketTo(ww *bucketWrap, startTime uint64) *bucketWrap { - atomic.StoreUint64(&ww.bucketStart, startTime) - ww.value.Store(NewMetricBucket()) - return ww -} - // Write method // It might panic func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) { @@ -80,16 +80,16 @@ func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) { func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) { curBucket, err := bla.data.currentBucketOfTime(now, bla) if err != nil { - logger.Errorf("Failed to get current bucket, current ts=%d, err: %+v.", now, errors.WithStack(err)) + logger.Errorf("Failed to get current bucket, current ts=%d, err: %+v.", now, err) return } if curBucket == nil { logger.Error("Failed to add count: current bucket is nil") return } - mb := curBucket.value.Load() + mb := curBucket.Value.Load() if mb == nil { - logger.Error("Failed to add count: current bucket atomic value is nil") + logger.Error("Failed to add count: current bucket atomic Value is nil") return } b, ok := mb.(*MetricBucket) @@ -113,9 +113,9 @@ func (bla *BucketLeapArray) CountWithTime(now uint64, event base.MetricEvent) in } count := int64(0) for _, ww := range bla.data.valuesWithTime(now) { - mb := ww.value.Load() + mb := ww.Value.Load() if mb == nil { - logger.Error("Current bucket's value is nil.") + logger.Error("Current bucket's Value is nil.") continue } b, ok := mb.(*MetricBucket) @@ -128,35 +128,35 @@ func (bla *BucketLeapArray) CountWithTime(now uint64, event base.MetricEvent) in return count } -// Read method, get all bucketWrap. -func (bla *BucketLeapArray) Values(now uint64) []*bucketWrap { +// Read method, get all BucketWrap. +func (bla *BucketLeapArray) Values(now uint64) []*BucketWrap { _, err := bla.data.currentBucketOfTime(now, bla) if err != nil { - logger.Errorf("Fail to get current(%d) bucket, err: %+v.", now, errors.WithStack(err)) + logger.Errorf("Fail to get current(%d) bucket, err: %+v.", now, err) } return bla.data.valuesWithTime(now) } -func (bla *BucketLeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*bucketWrap { +func (bla *BucketLeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap { _, err := bla.data.currentBucketOfTime(now, bla) if err != nil { - logger.Errorf("Fail to get current(%d) bucket, err: %+v.", now, errors.WithStack(err)) + logger.Errorf("Fail to get current(%d) bucket, err: %+v.", now, err) } return bla.data.ValuesConditional(now, predicate) } func (bla *BucketLeapArray) MinRt() int64 { - _, err := bla.data.currentBucket(bla) + _, err := bla.data.CurrentBucket(bla) if err != nil { - logger.Errorf("Fail to get current bucket, err: %+v.", errors.WithStack(err)) + logger.Errorf("Fail to get current bucket, err: %+v.", err) } ret := base.DefaultStatisticMaxRt - for _, v := range bla.data.values() { - mb := v.value.Load() + for _, v := range bla.data.Values() { + mb := v.Value.Load() if mb == nil { - logger.Error("Current bucket's value is nil.") + logger.Error("Current bucket's Value is nil.") continue } b, ok := mb.(*MetricBucket) diff --git a/core/stat/base/bucket_leap_array_test.go b/core/stat/base/bucket_leap_array_test.go index edb61897e..2fee9e474 100644 --- a/core/stat/base/bucket_leap_array_test.go +++ b/core/stat/base/bucket_leap_array_test.go @@ -16,21 +16,21 @@ func Test_NewBucketLeapArray(t *testing.T) { now := util.CurrentTimeMillis() br, err := slidingWindow.data.currentBucketOfTime(now, slidingWindow) - if br == nil || br.value.Load() == nil { + if br == nil || br.Value.Load() == nil { t.Errorf("Unexcepted error") return } if err != nil { t.Errorf("Unexcepted error") } - if br.bucketStart != (now - now%uint64(BucketLengthInMs)) { + if br.BucketStart != (now - now%uint64(BucketLengthInMs)) { t.Errorf("Unexcepted error, bucket length is not same") } - if br.value.Load() == nil { - t.Errorf("Unexcepted error, value is nil") + if br.Value.Load() == nil { + t.Errorf("Unexcepted error, Value is nil") } if slidingWindow.Count(base.MetricEventPass) != 0 { - t.Errorf("Unexcepted error, pass value is invalid") + t.Errorf("Unexcepted error, pass Value is invalid") } } @@ -82,7 +82,7 @@ func TestBucketLeapArray_resetBucketTo(t *testing.T) { bla := NewBucketLeapArray(SampleCount, IntervalInMs) idx := 6 oldBucketWrap := bla.data.array.get(idx) - oldBucket := oldBucketWrap.value.Load() + oldBucket := oldBucketWrap.Value.Load() if oldBucket == nil { t.Errorf("BucketLeapArray init error.") } @@ -94,8 +94,8 @@ func TestBucketLeapArray_resetBucketTo(t *testing.T) { bucket.Add(base.MetricEventBlock, 100) wantStartTime := util.CurrentTimeMillis() + 1000 - got := bla.resetBucketTo(oldBucketWrap, wantStartTime) - newBucket := got.value.Load() + got := bla.ResetBucketTo(oldBucketWrap, wantStartTime) + newBucket := got.Value.Load() if newBucket == nil { t.Errorf("got bucket is nil.") } @@ -104,9 +104,9 @@ func TestBucketLeapArray_resetBucketTo(t *testing.T) { t.Errorf("Fail to assert bucket to MetricBucket.") } if newRealBucket.Get(base.MetricEventPass) != 0 { - t.Errorf("BucketLeapArray.resetBucketTo() execute fail.") + t.Errorf("BucketLeapArray.ResetBucketTo() execute fail.") } if newRealBucket.Get(base.MetricEventBlock) != 0 { - t.Errorf("BucketLeapArray.resetBucketTo() execute fail.") + t.Errorf("BucketLeapArray.ResetBucketTo() execute fail.") } } diff --git a/core/stat/base/leap_array.go b/core/stat/base/leap_array.go index fb2f4f92a..4b2c3a19b 100644 --- a/core/stat/base/leap_array.go +++ b/core/stat/base/leap_array.go @@ -15,71 +15,71 @@ const ( PtrSize = int(8) ) -// bucketWrap represent a slot to record metrics -// In order to reduce the usage of memory, bucketWrap don't hold length of bucketWrap -// The length of bucketWrap could be seen in leapArray. +// BucketWrap represent a slot to record metrics +// In order to reduce the usage of memory, BucketWrap don't hold length of BucketWrap +// The length of BucketWrap could be seen in LeapArray. // The scope of time is [startTime, startTime+bucketLength) -// The size of bucketWrap is 24(8+16) bytes -type bucketWrap struct { +// The size of BucketWrap is 24(8+16) bytes +type BucketWrap struct { // The start timestamp of this statistic bucket wrapper. - bucketStart uint64 + BucketStart uint64 // The actual data structure to record the metrics (e.g. MetricBucket). - value atomic.Value + Value atomic.Value } -func (ww *bucketWrap) resetTo(startTime uint64) { - ww.bucketStart = startTime +func (ww *BucketWrap) resetTo(startTime uint64) { + ww.BucketStart = startTime } -func (ww *bucketWrap) isTimeInBucket(now uint64, bucketLengthInMs uint32) bool { - return ww.bucketStart <= now && now < ww.bucketStart+uint64(bucketLengthInMs) +func (ww *BucketWrap) isTimeInBucket(now uint64, bucketLengthInMs uint32) bool { + return ww.BucketStart <= now && now < ww.BucketStart+uint64(bucketLengthInMs) } func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 { return now - (now % uint64(bucketLengthInMs)) } -// atomic bucketWrap array to resolve race condition -// atomicBucketWrapArray can not append or delete element after initializing -type atomicBucketWrapArray struct { +// atomic BucketWrap array to resolve race condition +// AtomicBucketWrapArray can not append or delete element after initializing +type AtomicBucketWrapArray struct { // The base address for real data array base unsafe.Pointer // The length of slice(array), it can not be modified. length int - data []*bucketWrap + data []*BucketWrap } -// New atomicBucketWrapArray with initializing field data -// Default, automatically initialize each bucketWrap +// New AtomicBucketWrapArray with initializing field data +// Default, automatically initialize each BucketWrap // len: length of array -// bucketLengthInMs: bucket length of bucketWrap +// bucketLengthInMs: bucket length of BucketWrap // generator: generator to generate bucket -func newAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator bucketGenerator) *atomicBucketWrapArray { - ret := &atomicBucketWrapArray{ +func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator BucketGenerator) *AtomicBucketWrapArray { + ret := &AtomicBucketWrapArray{ length: len, - data: make([]*bucketWrap, len), + data: make([]*BucketWrap, len), } - // automatically initialize each bucketWrap - // tail bucketWrap of data is initialized with current time + // automatically initialize each BucketWrap + // tail BucketWrap of data is initialized with current time startTime := calculateStartTime(util.CurrentTimeMillis(), bucketLengthInMs) for i := len - 1; i >= 0; i-- { - ww := &bucketWrap{ - bucketStart: startTime, - value: atomic.Value{}, + ww := &BucketWrap{ + BucketStart: startTime, + Value: atomic.Value{}, } - ww.value.Store(generator.newEmptyBucket()) + ww.Value.Store(generator.NewEmptyBucket()) ret.data[i] = ww startTime -= uint64(bucketLengthInMs) } // calculate base address for real data array sliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data)) - ret.base = unsafe.Pointer((**bucketWrap)(unsafe.Pointer(sliHeader.Data))) + ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data))) return ret } -func (aa *atomicBucketWrapArray) elementOffset(idx int) unsafe.Pointer { +func (aa *AtomicBucketWrapArray) elementOffset(idx int) unsafe.Pointer { if idx >= aa.length && idx < 0 { panic(fmt.Sprintf("The index (%d) is out of bounds, length is %d.", idx, aa.length)) } @@ -87,37 +87,53 @@ func (aa *atomicBucketWrapArray) elementOffset(idx int) unsafe.Pointer { return unsafe.Pointer(uintptr(basePtr) + uintptr(idx*PtrSize)) } -func (aa *atomicBucketWrapArray) get(idx int) *bucketWrap { - // aa.elementOffset(idx) return the secondary pointer of bucketWrap, which is the pointer to the aa.data[idx] +func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap { + // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx] // then convert to (*unsafe.Pointer) - return (*bucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(aa.elementOffset(idx)))) + return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(aa.elementOffset(idx)))) } -func (aa *atomicBucketWrapArray) compareAndSet(idx int, except, update *bucketWrap) bool { - // aa.elementOffset(idx) return the secondary pointer of bucketWrap, which is the pointer to the aa.data[idx] +func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool { + // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx] // then convert to (*unsafe.Pointer) // update secondary pointer return atomic.CompareAndSwapPointer((*unsafe.Pointer)(aa.elementOffset(idx)), unsafe.Pointer(except), unsafe.Pointer(update)) } -// The bucketWrap leap array, -// sampleCount represent the number of bucketWrap -// intervalInMs represent the interval of leapArray. +// The BucketWrap leap array, +// sampleCount represent the number of BucketWrap +// intervalInMs represent the interval of LeapArray. // For example, bucketLengthInMs is 500ms, intervalInMs is 1min, so sampleCount is 120. -type leapArray struct { +type LeapArray struct { bucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array *atomicBucketWrapArray + array *AtomicBucketWrapArray // update lock updateLock mutex } -func (la *leapArray) currentBucket(bg bucketGenerator) (*bucketWrap, error) { +func NewLeapArray(sampleCount uint32, intervalInMs uint32, generator BucketGenerator) *LeapArray { + if intervalInMs%sampleCount != 0 { + panic(fmt.Sprintf("Invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount)) + } + if generator == nil { + panic("Invalid parameters, generator is nil.") + } + bucketLengthInMs := intervalInMs / sampleCount + return &LeapArray{ + bucketLengthInMs: bucketLengthInMs, + sampleCount: sampleCount, + intervalInMs: intervalInMs, + array: NewAtomicBucketWrapArray(int(sampleCount), intervalInMs, generator), + } +} + +func (la *LeapArray) CurrentBucket(bg BucketGenerator) (*BucketWrap, error) { return la.currentBucketOfTime(util.CurrentTimeMillis(), bg) } -func (la *leapArray) currentBucketOfTime(now uint64, bg bucketGenerator) (*bucketWrap, error) { +func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) { if now < 0 { return nil, errors.New("Current time is less than 0.") } @@ -125,55 +141,55 @@ func (la *leapArray) currentBucketOfTime(now uint64, bg bucketGenerator) (*bucke idx := la.calculateTimeIdx(now) bucketStart := calculateStartTime(now, la.bucketLengthInMs) - for { //spin to get the current bucketWrap + for { //spin to get the current BucketWrap old := la.array.get(idx) if old == nil { // because la.array.data had initiated when new la.array // theoretically, here is not reachable - newWrap := &bucketWrap{ - bucketStart: bucketStart, - value: atomic.Value{}, + newWrap := &BucketWrap{ + BucketStart: bucketStart, + Value: atomic.Value{}, } - newWrap.value.Store(bg.newEmptyBucket()) + newWrap.Value.Store(bg.NewEmptyBucket()) if la.array.compareAndSet(idx, nil, newWrap) { return newWrap, nil } else { runtime.Gosched() } - } else if bucketStart == atomic.LoadUint64(&old.bucketStart) { + } else if bucketStart == atomic.LoadUint64(&old.BucketStart) { return old, nil - } else if bucketStart > atomic.LoadUint64(&old.bucketStart) { - // current time has been next cycle of leapArray and leapArray dont't count in last cycle. - // reset bucketWrap + } else if bucketStart > atomic.LoadUint64(&old.BucketStart) { + // current time has been next cycle of LeapArray and LeapArray dont't count in last cycle. + // reset BucketWrap if la.updateLock.TryLock() { - old = bg.resetBucketTo(old, bucketStart) + old = bg.ResetBucketTo(old, bucketStart) la.updateLock.Unlock() return old, nil } else { runtime.Gosched() } - } else if bucketStart < old.bucketStart { + } else if bucketStart < old.BucketStart { // TODO: reserve for some special case (e.g. when occupying "future" buckets). - return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.bucketStart=%d.", bucketStart, old.bucketStart)) + return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart)) } } } -func (la *leapArray) calculateTimeIdx(now uint64) int { +func (la *LeapArray) calculateTimeIdx(now uint64) int { timeId := now / uint64(la.bucketLengthInMs) return int(timeId) % la.array.length } -// Get all bucketWrap between [current time -1000ms, current time] -func (la *leapArray) values() []*bucketWrap { +// Get all BucketWrap between [current time -1000ms, current time] +func (la *LeapArray) Values() []*BucketWrap { return la.valuesWithTime(util.CurrentTimeMillis()) } -func (la *leapArray) valuesWithTime(now uint64) []*bucketWrap { +func (la *LeapArray) valuesWithTime(now uint64) []*BucketWrap { if now <= 0 { - return make([]*bucketWrap, 0) + return make([]*BucketWrap, 0) } - ret := make([]*bucketWrap, 0) + ret := make([]*BucketWrap, 0) for i := 0; i < la.array.length; i++ { ww := la.array.get(i) if ww == nil || la.isBucketDeprecated(now, ww) { @@ -184,14 +200,14 @@ func (la *leapArray) valuesWithTime(now uint64) []*bucketWrap { return ret } -func (la *leapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*bucketWrap { +func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap { if now <= 0 { - return make([]*bucketWrap, 0) + return make([]*BucketWrap, 0) } - ret := make([]*bucketWrap, 0) + ret := make([]*BucketWrap, 0) for i := 0; i < la.array.length; i++ { ww := la.array.get(i) - if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.bucketStart)) { + if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) { continue } ret = append(ret, ww) @@ -199,17 +215,17 @@ func (la *leapArray) ValuesConditional(now uint64, predicate base.TimePredicate) return ret } -// Judge whether the bucketWrap is expired -func (la *leapArray) isBucketDeprecated(now uint64, ww *bucketWrap) bool { - ws := atomic.LoadUint64(&ww.bucketStart) +// Judge whether the BucketWrap is expired +func (la *LeapArray) isBucketDeprecated(now uint64, ww *BucketWrap) bool { + ws := atomic.LoadUint64(&ww.BucketStart) return (now - ws) > uint64(la.intervalInMs) } // Generic interface to generate bucket -type bucketGenerator interface { +type BucketGenerator interface { // called when timestamp entry a new slot interval - newEmptyBucket() interface{} + NewEmptyBucket() interface{} - // reset the bucketWrap, clear all data of bucketWrap - resetBucketTo(ww *bucketWrap, startTime uint64) *bucketWrap + // reset the BucketWrap, clear all data of BucketWrap + ResetBucketTo(bw *BucketWrap, startTime uint64) *BucketWrap } diff --git a/core/stat/base/leap_array_test.go b/core/stat/base/leap_array_test.go index c36f0bb1a..71a55e36e 100644 --- a/core/stat/base/leap_array_test.go +++ b/core/stat/base/leap_array_test.go @@ -30,15 +30,15 @@ func Test_bucketWrapper_Size(t *testing.T) { a7 int32 a8 int32 } - ww := &bucketWrap{ - bucketStart: util.CurrentTimeMillis(), - value: atomic.Value{}, + ww := &BucketWrap{ + BucketStart: util.CurrentTimeMillis(), + Value: atomic.Value{}, } if unsafe.Sizeof(*ww) != 24 { - t.Errorf("the size of bucketWrap is not equal 20.\n") + t.Errorf("the size of BucketWrap is not equal 20.\n") } if unsafe.Sizeof(ww) != 8 { - t.Errorf("the size of bucketWrap is not equal 20.\n") + t.Errorf("the size of BucketWrap is not equal 20.\n") } } @@ -46,18 +46,18 @@ func Test_bucketWrapper_Size(t *testing.T) { // mock.Mock //} -// mock ArrayMock and implement bucketGenerator +// mock ArrayMock and implement BucketGenerator type leapArrayMock struct { mock.Mock } -func (bla *leapArrayMock) newEmptyBucket() interface{} { +func (bla *leapArrayMock) NewEmptyBucket() interface{} { return new(int64) } -func (bla *leapArrayMock) resetBucketTo(ww *bucketWrap, startTime uint64) *bucketWrap { - ww.bucketStart = startTime - ww.value.Store(new(int64)) +func (bla *leapArrayMock) ResetBucketTo(ww *BucketWrap, startTime uint64) *BucketWrap { + ww.BucketStart = startTime + ww.Value.Store(new(int64)) return ww } @@ -66,7 +66,7 @@ func Test_leapArray_calculateTimeIdx_normal(t *testing.T) { bucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array *atomicBucketWrapArray + array *AtomicBucketWrapArray mux mutex } type args struct { @@ -84,7 +84,7 @@ func Test_leapArray_calculateTimeIdx_normal(t *testing.T) { bucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), mux: mutex{}, }, args: args{ @@ -95,7 +95,7 @@ func Test_leapArray_calculateTimeIdx_normal(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - la := &leapArray{ + la := &LeapArray{ bucketLengthInMs: tt.fields.bucketLengthInMs, sampleCount: tt.fields.sampleCount, intervalInMs: tt.fields.intervalInMs, @@ -103,7 +103,7 @@ func Test_leapArray_calculateTimeIdx_normal(t *testing.T) { updateLock: tt.fields.mux, } if got := la.calculateTimeIdx(tt.args.timeMillis); got != tt.want { - t.Errorf("leapArray.calculateTimeIdx() = %v, want %v", got, tt.want) + t.Errorf("LeapArray.calculateTimeIdx() = %v, want %v", got, tt.want) } }) } @@ -135,7 +135,7 @@ func Test_calculateStartTime_normal(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := calculateStartTime(tt.args.timeMillis, tt.args.bucketLengthInMs); got != tt.want { - t.Errorf("leapArray.calculateStartTime() = %v, want %v", got, tt.want) + t.Errorf("LeapArray.calculateStartTime() = %v, want %v", got, tt.want) } }) } @@ -146,11 +146,11 @@ func Test_leapArray_BucketStartCheck_normal(t *testing.T) { BucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array *atomicBucketWrapArray + array *AtomicBucketWrapArray mux mutex } type args struct { - bg bucketGenerator + bg BucketGenerator timeMillis uint64 } tests := []struct { @@ -165,7 +165,7 @@ func Test_leapArray_BucketStartCheck_normal(t *testing.T) { BucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), mux: mutex{}, }, args: args{ @@ -176,11 +176,11 @@ func Test_leapArray_BucketStartCheck_normal(t *testing.T) { }, } wwPtr := tests[0].fields.array.get(9) - wwPtr.bucketStart = 1576296044500 //start time of cycle 1576296040000 + wwPtr.BucketStart = 1576296044500 //start time of cycle 1576296040000 for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - la := &leapArray{ + la := &LeapArray{ bucketLengthInMs: tt.fields.BucketLengthInMs, sampleCount: tt.fields.sampleCount, intervalInMs: tt.fields.intervalInMs, @@ -189,11 +189,11 @@ func Test_leapArray_BucketStartCheck_normal(t *testing.T) { } got, err := la.currentBucketOfTime(tt.args.timeMillis, tt.args.bg) if err != nil { - t.Errorf("leapArray.currentBucketOfTime() error = %v\n", err) + t.Errorf("LeapArray.currentBucketOfTime() error = %v\n", err) return } - if got.bucketStart != tt.want { - t.Errorf("bucketStart = %v, want %v", got.bucketStart, tt.want) + if got.BucketStart != tt.want { + t.Errorf("BucketStart = %v, want %v", got.BucketStart, tt.want) } }) } @@ -204,18 +204,18 @@ func Test_leapArray_currentBucketWithTime_normal(t *testing.T) { bucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array *atomicBucketWrapArray + array *AtomicBucketWrapArray mux mutex } type args struct { - bg bucketGenerator + bg BucketGenerator timeMillis uint64 } tests := []struct { name string fields fields args args - want *bucketWrap + want *BucketWrap wantErr bool }{ { @@ -224,7 +224,7 @@ func Test_leapArray_currentBucketWithTime_normal(t *testing.T) { bucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), mux: mutex{}, }, args: args{ @@ -237,12 +237,12 @@ func Test_leapArray_currentBucketWithTime_normal(t *testing.T) { } wwPtr := tests[0].fields.array.get(9) - wwPtr.bucketStart = 1576296044500 //start time of cycle 1576296040000 + wwPtr.BucketStart = 1576296044500 //start time of cycle 1576296040000 tests[0].want = tests[0].fields.array.get(9) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - la := &leapArray{ + la := &LeapArray{ bucketLengthInMs: tt.fields.bucketLengthInMs, sampleCount: tt.fields.sampleCount, intervalInMs: tt.fields.intervalInMs, @@ -251,11 +251,11 @@ func Test_leapArray_currentBucketWithTime_normal(t *testing.T) { } got, err := la.currentBucketOfTime(tt.args.timeMillis, tt.args.bg) if (err != nil) != tt.wantErr { - t.Errorf("leapArray.currentBucketOfTime() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("LeapArray.currentBucketOfTime() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("leapArray.currentBucketOfTime() = %v, want %v", got, tt.want) + t.Errorf("LeapArray.currentBucketOfTime() = %v, want %v", got, tt.want) } }) } @@ -266,7 +266,7 @@ func Test_leapArray_valuesWithTime_normal(t *testing.T) { bucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array *atomicBucketWrapArray + array *AtomicBucketWrapArray mux mutex } type args struct { @@ -276,7 +276,7 @@ func Test_leapArray_valuesWithTime_normal(t *testing.T) { name string fields fields args args - want *bucketWrap + want *BucketWrap wantErr bool }{ { @@ -285,7 +285,7 @@ func Test_leapArray_valuesWithTime_normal(t *testing.T) { bucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), mux: mutex{}, }, args: args{ @@ -299,13 +299,13 @@ func Test_leapArray_valuesWithTime_normal(t *testing.T) { start := uint64(1576296040000) for idx := 0; idx < tests[0].fields.array.length; idx++ { ww := tests[0].fields.array.get(idx) - ww.bucketStart = start + ww.BucketStart = start start += 500 } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - la := &leapArray{ + la := &LeapArray{ bucketLengthInMs: tt.fields.bucketLengthInMs, sampleCount: tt.fields.sampleCount, intervalInMs: tt.fields.intervalInMs, @@ -317,13 +317,13 @@ func Test_leapArray_valuesWithTime_normal(t *testing.T) { find := false for i := 0; i < tests[0].fields.array.length; i++ { w := tests[0].fields.array.get(i) - if w.bucketStart == g.bucketStart { + if w.BucketStart == g.BucketStart { find = true break } } if !find { - t.Errorf("leapArray.valuesWithTime() fail") + t.Errorf("LeapArray.valuesWithTime() fail") } } }) @@ -335,12 +335,12 @@ func Test_leapArray_isBucketDeprecated_normal(t *testing.T) { bucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array *atomicBucketWrapArray + array *AtomicBucketWrapArray mux mutex } type args struct { startTime uint64 - ww *bucketWrap + ww *BucketWrap } tests := []struct { name string @@ -354,13 +354,13 @@ func Test_leapArray_isBucketDeprecated_normal(t *testing.T) { bucketLengthInMs: BucketLengthInMs, sampleCount: SampleCount, intervalInMs: IntervalInMs, - array: newAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), + array: NewAtomicBucketWrapArray(int(SampleCount), BucketLengthInMs, &leapArrayMock{}), mux: mutex{}, }, args: args{ startTime: 1576296044907, - ww: &bucketWrap{ - bucketStart: 1576296004907, + ww: &BucketWrap{ + BucketStart: 1576296004907, }, }, want: true, @@ -369,7 +369,7 @@ func Test_leapArray_isBucketDeprecated_normal(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - la := &leapArray{ + la := &LeapArray{ bucketLengthInMs: tt.fields.bucketLengthInMs, sampleCount: tt.fields.sampleCount, intervalInMs: tt.fields.intervalInMs, @@ -377,7 +377,7 @@ func Test_leapArray_isBucketDeprecated_normal(t *testing.T) { updateLock: tt.fields.mux, } if got := la.isBucketDeprecated(tt.args.startTime, tt.args.ww); got != tt.want { - t.Errorf("leapArray.isBucketDeprecated() = %v, want %v", got, tt.want) + t.Errorf("LeapArray.isBucketDeprecated() = %v, want %v", got, tt.want) } }) } diff --git a/core/stat/base/metric_bucket.go b/core/stat/base/metric_bucket.go index b94be8d83..aaacae502 100644 --- a/core/stat/base/metric_bucket.go +++ b/core/stat/base/metric_bucket.go @@ -10,7 +10,7 @@ import ( // MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span). // Note that all operations of the MetricBucket are required to be thread-safe. type MetricBucket struct { - // value of statistic + // Value of statistic counter [base.MetricEventTotal]int64 minRt int64 } diff --git a/core/stat/base/sliding_window_metric.go b/core/stat/base/sliding_window_metric.go index 1c4947fa5..f8d72b5d2 100644 --- a/core/stat/base/sliding_window_metric.go +++ b/core/stat/base/sliding_window_metric.go @@ -66,17 +66,17 @@ func (m *SlidingWindowMetric) getIntervalInSecond() float64 { return float64(m.intervalInMs) / 1000.0 } -func (m *SlidingWindowMetric) count(event base.MetricEvent, values []*bucketWrap) int64 { +func (m *SlidingWindowMetric) count(event base.MetricEvent, values []*BucketWrap) int64 { ret := int64(0) for _, ww := range values { - mb := ww.value.Load() + mb := ww.Value.Load() if mb == nil { - logger.Error("Illegal state: current bucket value is nil when summing count") + logger.Error("Illegal state: current bucket Value is nil when summing count") continue } counter, ok := mb.(*MetricBucket) if !ok { - logger.Errorf("Fail to cast data value(%+v) to MetricBucket type", mb) + logger.Errorf("Fail to cast data Value(%+v) to MetricBucket type", mb) continue } ret += counter.Get(event) @@ -112,14 +112,14 @@ func (m *SlidingWindowMetric) GetMaxOfSingleBucket(event base.MetricEvent) int64 }) var curMax int64 = 0 for _, w := range satisfiedBuckets { - mb := w.value.Load() + mb := w.Value.Load() if mb == nil { - logger.Error("Illegal state: current bucket value is nil when GetMaxOfSingleBucket") + logger.Error("Illegal state: current bucket Value is nil when GetMaxOfSingleBucket") continue } counter, ok := mb.(*MetricBucket) if !ok { - logger.Errorf("Failed to cast data value(%+v) to MetricBucket type", mb) + logger.Errorf("Failed to cast data Value(%+v) to MetricBucket type", mb) continue } v := counter.Get(event) @@ -138,14 +138,14 @@ func (m *SlidingWindowMetric) MinRT() float64 { }) minRt := base.DefaultStatisticMaxRt for _, w := range satisfiedBuckets { - mb := w.value.Load() + mb := w.Value.Load() if mb == nil { - logger.Error("Illegal state: current bucket value is nil when calculating minRT") + logger.Error("Illegal state: current bucket Value is nil when calculating minRT") continue } counter, ok := mb.(*MetricBucket) if !ok { - logger.Errorf("Failed to cast data value(%+v) to MetricBucket type", mb) + logger.Errorf("Failed to cast data Value(%+v) to MetricBucket type", mb) continue } v := counter.MinRt() @@ -169,14 +169,14 @@ func (m *SlidingWindowMetric) SecondMetricsOnCondition(predicate base.TimePredic ws := m.real.ValuesConditional(util.CurrentTimeMillis(), predicate) // Aggregate second-level MetricItem (only for stable metrics) - wm := make(map[uint64][]*bucketWrap) + wm := make(map[uint64][]*BucketWrap) for _, w := range ws { - bucketStart := atomic.LoadUint64(&w.bucketStart) + bucketStart := atomic.LoadUint64(&w.BucketStart) secStart := bucketStart - bucketStart%1000 if arr, hasData := wm[secStart]; hasData { wm[secStart] = append(arr, w) } else { - wm[secStart] = []*bucketWrap{w} + wm[secStart] = []*BucketWrap{w} } } items := make([]*base.MetricItem, 0) @@ -193,18 +193,18 @@ func (m *SlidingWindowMetric) SecondMetricsOnCondition(predicate base.TimePredic // metricItemFromBuckets aggregates multiple bucket wrappers (based on the same startTime in second) // to the single MetricItem. -func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*bucketWrap) *base.MetricItem { +func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*BucketWrap) *base.MetricItem { item := &base.MetricItem{Timestamp: ts} var allRt int64 = 0 for _, w := range ws { - mi := w.value.Load() + mi := w.Value.Load() if mi == nil { logger.Error("Get nil bucket when generating MetricItem from buckets") return nil } mb, ok := mi.(*MetricBucket) if !ok { - logger.Errorf("Failed to cast to MetricBucket type, bucket startTime: %d", w.bucketStart) + logger.Errorf("Failed to cast to MetricBucket type, bucket startTime: %d", w.BucketStart) return nil } item.PassQps += uint64(mb.Get(base.MetricEventPass)) @@ -221,15 +221,15 @@ func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*bucketWrap) return item } -func (m *SlidingWindowMetric) metricItemFromBucket(w *bucketWrap) *base.MetricItem { - mi := w.value.Load() +func (m *SlidingWindowMetric) metricItemFromBucket(w *BucketWrap) *base.MetricItem { + mi := w.Value.Load() if mi == nil { logger.Error("Get nil bucket when generating MetricItem from buckets") return nil } mb, ok := mi.(*MetricBucket) if !ok { - logger.Errorf("Fail to cast data value to MetricBucket type, bucket startTime: %d", w.bucketStart) + logger.Errorf("Fail to cast data Value to MetricBucket type, bucket startTime: %d", w.BucketStart) return nil } completeQps := mb.Get(base.MetricEventComplete) @@ -238,7 +238,7 @@ func (m *SlidingWindowMetric) metricItemFromBucket(w *bucketWrap) *base.MetricIt BlockQps: uint64(mb.Get(base.MetricEventBlock)), ErrorQps: uint64(mb.Get(base.MetricEventError)), CompleteQps: uint64(completeQps), - Timestamp: w.bucketStart, + Timestamp: w.BucketStart, } if completeQps > 0 { item.AvgRt = uint64(mb.Get(base.MetricEventRt) / completeQps) diff --git a/core/stat/stat_slot.go b/core/stat/stat_slot.go index 97d42bc29..73f231693 100644 --- a/core/stat/stat_slot.go +++ b/core/stat/stat_slot.go @@ -33,9 +33,10 @@ func (s *StatisticSlot) OnCompleted(ctx *base.EntryContext) { return } rt := util.CurrentTimeMillis() - ctx.StartTime() - s.recordCompleteFor(ctx.StatNode, ctx.Input.AcquireCount, rt) + ctx.PutRt(rt) + s.recordCompleteFor(ctx.StatNode, ctx.Input.AcquireCount, rt, ctx.Err()) if ctx.Resource.FlowType() == base.Inbound { - s.recordCompleteFor(InboundNode(), ctx.Input.AcquireCount, rt) + s.recordCompleteFor(InboundNode(), ctx.Input.AcquireCount, rt, ctx.Err()) } } @@ -54,10 +55,13 @@ func (s *StatisticSlot) recordBlockFor(sn base.StatNode, count uint32) { sn.AddMetric(base.MetricEventBlock, uint64(count)) } -func (s *StatisticSlot) recordCompleteFor(sn base.StatNode, count uint32, rt uint64) { +func (s *StatisticSlot) recordCompleteFor(sn base.StatNode, count uint32, rt uint64, err error) { if sn == nil { return } + if err != nil { + sn.AddMetric(base.MetricEventError, uint64(count)) + } sn.AddMetric(base.MetricEventRt, rt) sn.AddMetric(base.MetricEventComplete, uint64(count)) sn.DecreaseGoroutineNum() diff --git a/example/circuitbreaker/circuit_breaker_example.go b/example/circuitbreaker/circuit_breaker_example.go new file mode 100644 index 000000000..305c38be5 --- /dev/null +++ b/example/circuitbreaker/circuit_breaker_example.go @@ -0,0 +1,77 @@ +package main + +import ( + "errors" + "fmt" + "log" + "math/rand" + "time" + + "github.com/alibaba/sentinel-golang/api" + "github.com/alibaba/sentinel-golang/core/circuitbreaker" + "github.com/alibaba/sentinel-golang/util" +) + +type stateChangeTestListener struct { +} + +func (s *stateChangeTestListener) OnChangeToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { + fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.BreakerStrategy(), prev.String(), util.CurrentTimeMillis()) +} + +func (s *stateChangeTestListener) OnChangeToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { + fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.BreakerStrategy(), prev.String(), snapshot, util.CurrentTimeMillis()) +} + +func (s *stateChangeTestListener) OnChangeToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { + fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.BreakerStrategy(), prev.String(), util.CurrentTimeMillis()) +} + +func main() { + err := api.InitDefault() + if err != nil { + log.Fatal(err) + } + ch := make(chan struct{}) + circuitbreaker.RegisterStatusSwitchListeners(&stateChangeTestListener{}) + + _, err = circuitbreaker.LoadRules([]circuitbreaker.Rule{ + circuitbreaker.NewSlowRtRule("abc", 10000, 3000, 50, 10, 0.5), + circuitbreaker.NewErrorRatioRule("abc", 10000, 3000, 10, 0.5), + }) + if err != nil { + log.Fatal(err) + } + + go func() { + for { + e, b := api.Entry("abc") + if b != nil { + fmt.Println("g1blocked") + time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) + } else { + if rand.Uint64()%20 > 9 { + e.SetError(errors.New("biz error")) + } + fmt.Println("g1passed") + time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond) + e.Exit() + } + } + }() + go func() { + for { + e, b := api.Entry("abc") + if b != nil { + fmt.Println("g2blocked") + time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) + } else { + fmt.Println("g2passed") + time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond) + e.Exit() + } + } + }() + + <-ch +} diff --git a/go.mod b/go.mod index e292d1600..aab81d8e7 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/shirou/gopsutil v2.19.12+incompatible github.com/stretchr/testify v1.4.0 go.uber.org/multierr v1.5.0 - golang.org/x/tools v0.0.0-20200428021058-7ae4988eb4d9 // indirect + golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b // indirect google.golang.org/grpc v1.22.1 gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index de5ac84bb..22a103ecf 100644 --- a/go.sum +++ b/go.sum @@ -428,11 +428,9 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200428021058-7ae4988eb4d9 h1:nSSuQTxk9hjYf2koTs9mHZtYm2pu7Yt8WuIeKOrCWNI= -golang.org/x/tools v0.0.0-20200428021058-7ae4988eb4d9/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.0.0-20180829000535-087779f1d2c9/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -446,12 +444,10 @@ gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= -gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= @@ -469,7 +465,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= istio.io/gogo-genproto v0.0.0-20190124151557-6d926a6e6feb/go.mod h1:eIDJ6jNk/IeJz6ODSksHl5Aiczy5JUq6vFhJWI5OtiI= k8s.io/api v0.0.0-20180806132203-61b11ee65332/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA=