Skip to content

Commit f1981c7

Browse files
authored
Abstract concurrency-related operations as ConcurrencyStat interface and polish StatNode (#283)
1 parent ce00014 commit f1981c7

File tree

8 files changed

+30
-39
lines changed

8 files changed

+30
-39
lines changed

core/base/stat.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,19 @@ type WriteStat interface {
3838
AddCount(event MetricEvent, count int64)
3939
}
4040

41+
type ConcurrencyStat interface {
42+
CurrentConcurrency() int32
43+
IncreaseConcurrency()
44+
DecreaseConcurrency()
45+
}
46+
4147
// StatNode holds real-time statistics for resources.
4248
type StatNode interface {
4349
MetricItemRetriever
4450

4551
ReadStat
4652
WriteStat
47-
48-
CurrentGoroutineNum() int32
49-
IncreaseGoroutineNum()
50-
DecreaseGoroutineNum()
51-
52-
Reset()
53+
ConcurrencyStat
5354

5455
// GenerateReadStat generates the readonly metric statistic based on resource level global statistic
5556
// If parameters, sampleCount and intervalInMs, are not suitable for resource level global statistic, return (nil, error)

core/base/stat_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,17 @@ func (m *StatNodeMock) MinRT() float64 {
5050
return float64(args.Int(0))
5151
}
5252

53-
func (m *StatNodeMock) CurrentGoroutineNum() int32 {
53+
func (m *StatNodeMock) CurrentConcurrency() int32 {
5454
args := m.Called()
5555
return int32(args.Int(0))
5656
}
5757

58-
func (m *StatNodeMock) IncreaseGoroutineNum() {
58+
func (m *StatNodeMock) IncreaseConcurrency() {
5959
m.Called()
6060
return
6161
}
6262

63-
func (m *StatNodeMock) DecreaseGoroutineNum() {
64-
m.Called()
65-
return
66-
}
67-
68-
func (m *StatNodeMock) Reset() {
63+
func (m *StatNodeMock) DecreaseConcurrency() {
6964
m.Called()
7065
return
7166
}

core/isolation/slot.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func checkPass(ctx *base.EntryContext) (bool, *Rule, uint32) {
4444
for _, rule := range getRulesOfResource(ctx.Resource.Name()) {
4545
threshold := rule.Threshold
4646
if rule.MetricType == Concurrency {
47-
if cur := statNode.CurrentGoroutineNum(); cur >= 0 {
47+
if cur := statNode.CurrentConcurrency(); cur >= 0 {
4848
curCount = uint32(cur)
4949
} else {
5050
curCount = 0

core/stat/base_node.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type BaseStatNode struct {
1212
sampleCount uint32
1313
intervalMs uint32
1414

15-
goroutineNum int32
15+
concurrency int32
1616

1717
arr *sbase.BucketLeapArray
1818
metric *sbase.SlidingWindowMetric
@@ -22,11 +22,11 @@ func NewBaseStatNode(sampleCount uint32, intervalInMs uint32) *BaseStatNode {
2222
la := sbase.NewBucketLeapArray(config.GlobalStatisticSampleCountTotal(), config.GlobalStatisticIntervalMsTotal())
2323
metric, _ := sbase.NewSlidingWindowMetric(sampleCount, intervalInMs, la)
2424
return &BaseStatNode{
25-
goroutineNum: 0,
26-
sampleCount: sampleCount,
27-
intervalMs: intervalInMs,
28-
arr: la,
29-
metric: metric,
25+
concurrency: 0,
26+
sampleCount: sampleCount,
27+
intervalMs: intervalInMs,
28+
arr: la,
29+
metric: metric,
3030
}
3131
}
3232

@@ -66,21 +66,16 @@ func (n *BaseStatNode) MinRT() float64 {
6666
return float64(n.metric.MinRT())
6767
}
6868

69-
func (n *BaseStatNode) CurrentGoroutineNum() int32 {
70-
return atomic.LoadInt32(&(n.goroutineNum))
69+
func (n *BaseStatNode) CurrentConcurrency() int32 {
70+
return atomic.LoadInt32(&(n.concurrency))
7171
}
7272

73-
func (n *BaseStatNode) IncreaseGoroutineNum() {
74-
atomic.AddInt32(&(n.goroutineNum), 1)
73+
func (n *BaseStatNode) IncreaseConcurrency() {
74+
atomic.AddInt32(&(n.concurrency), 1)
7575
}
7676

77-
func (n *BaseStatNode) DecreaseGoroutineNum() {
78-
atomic.AddInt32(&(n.goroutineNum), -1)
79-
}
80-
81-
func (n *BaseStatNode) Reset() {
82-
// TODO: this should be thread-safe, or error may occur
83-
panic("to be implemented")
77+
func (n *BaseStatNode) DecreaseConcurrency() {
78+
atomic.AddInt32(&(n.concurrency), -1)
8479
}
8580

8681
func (n *BaseStatNode) GenerateReadStat(sampleCount uint32, intervalInMs uint32) (base.ReadStat, error) {

core/stat/stat_slot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (s *Slot) recordPassFor(sn base.StatNode, count uint32) {
4747
if sn == nil {
4848
return
4949
}
50-
sn.IncreaseGoroutineNum()
50+
sn.IncreaseConcurrency()
5151
sn.AddCount(base.MetricEventPass, int64(count))
5252
}
5353

@@ -67,5 +67,5 @@ func (s *Slot) recordCompleteFor(sn base.StatNode, count uint32, rt uint64, err
6767
}
6868
sn.AddCount(base.MetricEventRt, int64(rt))
6969
sn.AddCount(base.MetricEventComplete, int64(count))
70-
sn.DecreaseGoroutineNum()
70+
sn.DecreaseConcurrency()
7171
}

core/system/slot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s *AdaptiveSlot) doCheckRule(rule *Rule) (bool, float64) {
4949
res := qps < threshold
5050
return res, qps
5151
case Concurrency:
52-
n := float64(stat.InboundNode().CurrentGoroutineNum())
52+
n := float64(stat.InboundNode().CurrentConcurrency())
5353
res := n < threshold
5454
return res, n
5555
case AvgRT:
@@ -78,7 +78,7 @@ func (s *AdaptiveSlot) doCheckRule(rule *Rule) (bool, float64) {
7878
}
7979

8080
func checkBbrSimple() bool {
81-
concurrency := stat.InboundNode().CurrentGoroutineNum()
81+
concurrency := stat.InboundNode().CurrentConcurrency()
8282
minRt := stat.InboundNode().MinRT()
8383
maxComplete := stat.InboundNode().GetMaxAvg(base.MetricEventComplete)
8484
if concurrency > 1 && float64(concurrency) > maxComplete*minRt/1000.0 {

core/system/slot_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ func TestDoCheckRuleConcurrency(t *testing.T) {
5151
})
5252

5353
t.Run("FalseConcurrency", func(t *testing.T) {
54-
stat.InboundNode().IncreaseGoroutineNum()
54+
stat.InboundNode().IncreaseConcurrency()
5555
isOK, v := sas.doCheckRule(rule)
5656
assert.True(t, util.Float64Equals(float64(1.0), v))
5757
assert.Equal(t, false, isOK)
58-
stat.InboundNode().DecreaseGoroutineNum()
58+
stat.InboundNode().DecreaseConcurrency()
5959
})
6060
}
6161

example/hotspot_param_flow/concurrency/hotspot_params_concurrency_example.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func main() {
4141
go func() {
4242
node := stat.GetOrCreateResourceNode("abc", base.ResTypeCommon)
4343
for {
44-
logging.Info(fmt.Sprintf("current concurrency:%d", node.CurrentGoroutineNum()))
44+
logging.Info(fmt.Sprintf("current concurrency:%d", node.CurrentConcurrency()))
4545
time.Sleep(time.Duration(100) * time.Millisecond)
4646
}
4747
}()

0 commit comments

Comments
 (0)