diff --git a/cluster/loadbalance/p2c/loadbalance.go b/cluster/loadbalance/p2c/loadbalance.go index f673cbb332..6f3aa379ea 100644 --- a/cluster/loadbalance/p2c/loadbalance.go +++ b/cluster/loadbalance/p2c/loadbalance.go @@ -98,6 +98,7 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker, invocation protocol return nil } + // TODO(justxuewei): It should have a strategy to invalidate the metrics due to timeout. remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing) if err != nil { if errors.Is(err, metrics.ErrMetricsNotFound) { diff --git a/filter/adaptivesvc/limiter/hill_climbing.go b/filter/adaptivesvc/limiter/hill_climbing.go index 070a6dc735..296d6e0d4a 100644 --- a/filter/adaptivesvc/limiter/hill_climbing.go +++ b/filter/adaptivesvc/limiter/hill_climbing.go @@ -45,8 +45,9 @@ const ( var ( initialLimitation uint64 = 50 maxLimitation uint64 = 500 - radicalPeriod uint64 = 1000 - stablePeriod uint64 = 32000 + + radicalPeriod = 1000 * time.Millisecond + stablePeriod = 32000 * time.Millisecond ) // HillClimbing is a limiter using HillClimbing algorithm @@ -59,18 +60,18 @@ type HillClimbing struct { mutex *sync.Mutex // nextUpdateTime = lastUpdatedTime + updateInterval - updateInterval *atomic.Uint64 + updateInterval *atomic.Duration lastUpdatedTime *atomic.Time - // indicators of the current round - successCounter *atomic.Uint64 + // metrics of the current round + transactionNum *atomic.Uint64 rttAvg *atomic.Float64 - // indicators of history - bestConcurrency *atomic.Uint64 + // best metrics in the history + bestMaxCapacity *atomic.Float64 bestRTTAvg *atomic.Float64 bestLimitation *atomic.Uint64 - bestSuccessRate *atomic.Uint64 + bestTPS *atomic.Uint64 } func NewHillClimbing() Limiter { @@ -80,14 +81,14 @@ func NewHillClimbing() Limiter { inflight: new(atomic.Uint64), limitation: atomic.NewUint64(initialLimitation), mutex: new(sync.Mutex), - updateInterval: atomic.NewUint64(radicalPeriod), + updateInterval: atomic.NewDuration(radicalPeriod), lastUpdatedTime: atomic.NewTime(time.Now()), - successCounter: new(atomic.Uint64), + transactionNum: new(atomic.Uint64), rttAvg: new(atomic.Float64), - bestConcurrency: new(atomic.Uint64), - bestRTTAvg: new(atomic.Float64), + bestMaxCapacity: new(atomic.Float64), + bestRTTAvg: atomic.NewFloat64(math.MaxFloat64), bestLimitation: new(atomic.Uint64), - bestSuccessRate: new(atomic.Uint64), + bestTPS: new(atomic.Uint64), } return l @@ -139,7 +140,7 @@ func (u *HillClimbingUpdater) DoUpdate() error { }() VerboseDebugf("[HillClimbingUpdater] A request finished, the limiter will be updated, seq: %d.", u.seq) - rtt := uint64(time.Now().Sub(u.startTime)) + rtt := uint64(time.Now().Sub(u.startTime).Milliseconds()) inflight := u.limiter.Inflight() option, err := u.getOption(rtt, inflight) @@ -162,22 +163,26 @@ func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, erro lastUpdatedTime := u.limiter.lastUpdatedTime.Load() updateInterval := u.limiter.updateInterval.Load() rttAvg := u.limiter.rttAvg.Load() - successCounter := u.limiter.successCounter.Load() + transactionNum := u.limiter.transactionNum.Load() limitation := u.limiter.limitation.Load() - if now.Sub(lastUpdatedTime) > time.Duration(updateInterval) || - rttAvg == 0 { - // Current req is at the next round or no rttAvg. + // the current option is expired + if now.Before(lastUpdatedTime) { + return option, nil + } + + if now.Sub(lastUpdatedTime) > updateInterval || rttAvg == 0 { + // the current req is on the next round or no rttAvg. - // FIXME(justxuewei): If all requests in one round - // not receive responses, rttAvg will be 0, and - // concurrency will be 0 as well, the actual - // concurrency, however, is not 0. - concurrency := float64(successCounter) * rttAvg / float64(updateInterval) + // FIXME(justxuewei): If all requests in one round not receive responses, rttAvg will be 0, and maxCapacity will + // be 0 as well, the actual maxCapacity, however, is not 0. + maxCapacity := float64(transactionNum) * float64(updateInterval.Milliseconds()) / rttAvg + VerboseDebugf("[HillClimbingUpdater] maxCapacity: %f, transactionNum: %d, rttAvg: %f, bestRTTAvg: %f, "+ + "updateInterval: %d", + maxCapacity, transactionNum, rttAvg, u.limiter.bestRTTAvg.Load(), updateInterval.Milliseconds()) - // Consider extending limitation if concurrent is - // about to reach the limitation. - if uint64(concurrency*1.5) > limitation { + // Consider extending limitation if concurrent is about to reach the limitation. + if u.limiter.bestRTTAvg.Load() == math.MaxFloat64 || uint64(maxCapacity*1.5) > limitation { if updateInterval == radicalPeriod { option = HillClimbingOptionExtendPlus } else { @@ -185,22 +190,29 @@ func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, erro } } - successRate := uint64(1000.0 * float64(successCounter) / float64(updateInterval)) + tps := uint64(1000.0 * float64(transactionNum) / float64(updateInterval.Milliseconds())) + VerboseDebugf("[HillClimbingUpdater] The TPS is %d, transactionNum: %d, updateInterval: %d.", + tps, transactionNum, updateInterval) - if successRate > u.limiter.bestSuccessRate.Load() { - // successRate is the best in the history, update - // all best-indicators. - u.limiter.bestSuccessRate.Store(successRate) + if tps > u.limiter.bestTPS.Load() { + VerboseDebugf("[HillClimbingUpdater] The best TPS is updated from %d to %d.", + u.limiter.bestTPS.Load(), tps) + // tps is the best in the history, update + // all best metrics. + u.limiter.bestTPS.Store(tps) u.limiter.bestRTTAvg.Store(rttAvg) - u.limiter.bestConcurrency.Store(uint64(concurrency)) + u.limiter.bestMaxCapacity.Store(maxCapacity) u.limiter.bestLimitation.Store(u.limiter.limitation.Load()) - VerboseDebugf("[HillClimbingUpdater] Best-indicators are up-to-date, "+ - "seq: %d, bestSuccessRate: %d, bestRTTAvg: %.4f, bestConcurrency: %d,"+ - " bestLimitation: %d.", u.seq, u.limiter.bestSuccessRate.Load(), - u.limiter.bestRTTAvg.Load(), u.limiter.bestConcurrency.Load(), + VerboseDebugf("[HillClimbingUpdater] Best-metrics are up-to-date, "+ + "seq: %d, bestTPS: %d, bestRTTAvg: %.4f, bestMaxCapacity: %d,"+ + " bestLimitation: %d.", u.seq, u.limiter.bestTPS.Load(), + u.limiter.bestRTTAvg.Load(), u.limiter.bestMaxCapacity.Load(), u.limiter.bestLimitation.Load()) } else { - if u.shouldShrink(successCounter, uint64(concurrency), successRate, rttAvg) { + VerboseDebugf("[HillClimbingUpdater] The best TPS is not updated, best TPS is %d, "+ + "the current TPS is %d", + u.limiter.bestTPS.Load(), tps) + if u.shouldShrink(transactionNum, maxCapacity, tps, rttAvg) { if updateInterval == radicalPeriod { option = HillClimbingOptionShrinkPlus } else { @@ -209,84 +221,98 @@ func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, erro // shrinking limitation means the process of adjusting // limitation goes to stable, so extends the update // interval to avoid adjusting frequently. - u.limiter.updateInterval.Store(minUint64(updateInterval*2, stablePeriod)) + u.limiter.updateInterval.Store(minDuration(updateInterval*2, stablePeriod)) } } - // reset indicators for the new round - u.limiter.successCounter.Store(0) + // reset metrics for the new round + u.limiter.transactionNum.Store(0) u.limiter.rttAvg.Store(float64(rtt)) u.limiter.lastUpdatedTime.Store(time.Now()) - VerboseDebugf("[HillClimbingUpdater] A new round is applied, all indicators are reset.") + VerboseDebugf("[HillClimbingUpdater] A new round is applied, all metrics are reset.") } else { - // still in the current round + // still on the current round - u.limiter.successCounter.Add(1) + u.limiter.transactionNum.Add(1) // ra = (ra * c + r) / (c + 1), where ra denotes rttAvg, - // c denotes successCounter, r denotes rtt. - u.limiter.rttAvg.Store((rttAvg*float64(successCounter) + float64(rtt)) / float64(successCounter+1)) + // c denotes transactionNum, r denotes rtt. + u.limiter.rttAvg.Store((rttAvg*float64(transactionNum) + float64(rtt)) / float64(transactionNum+1)) option = HillClimbingOptionDoNothing } return option, nil } -func (u *HillClimbingUpdater) shouldShrink(counter, concurrency, successRate uint64, rttAvg float64) bool { - bestSuccessRate := u.limiter.bestSuccessRate.Load() +func (u *HillClimbingUpdater) shouldShrink(transactionNum uint64, maxCapacity float64, tps uint64, rttAvg float64) bool { + //bestTPS := u.limiter.bestTPS.Load() + bestMaxCapacity := u.limiter.bestMaxCapacity.Load() bestRTTAvg := u.limiter.bestRTTAvg.Load() - diff := bestSuccessRate - successRate - diffPct := uint64(100.0 * float64(successRate) / float64(bestSuccessRate)) + diff := bestMaxCapacity - maxCapacity + diffPct := uint64(100.0 * diff / bestMaxCapacity) + + VerboseDebugf("[HillClimbingUpdater] shouldShrink maxCapacity diff: %f, diffPct: %d.", diff, diffPct) if diff <= 300 && diffPct <= 10 { // diff is acceptable, shouldn't shrink return false } - if concurrency > bestSuccessRate || rttAvg > bestRTTAvg { - // The unacceptable diff dues to too large - // concurrency or rttAvg. - concDiff := concurrency - bestSuccessRate - concDiffPct := uint64(100.0 * float64(concurrency) / float64(bestSuccessRate)) - rttAvgDiff := rttAvg - bestRTTAvg - rttAvgPctDiff := uint64(100.0 * rttAvg / bestRTTAvg) + if diff > 0 || rttAvg > bestRTTAvg { + // The unacceptable diff dues to too large maxCapacity or rttAvg. + rttAvgDiff := uint64(rttAvg - bestRTTAvg) + rttAvgDiffPct := uint64(100.0 * rttAvg / bestRTTAvg) - // TODO(justxuewei): Hard-coding here is not proper, but - // it should refactor after testing. + // TODO(justxuewei): Hard-coding here is not proper, but it should refactor after testing. var ( rttAvgDiffThreshold uint64 - rttAvgPctDiffThreshold uint64 + rttAvgDiffPctThreshold uint64 ) if bestRTTAvg < 5 { rttAvgDiffThreshold = 3 - rttAvgPctDiffThreshold = 80 + rttAvgDiffPctThreshold = 80 } else if bestRTTAvg < 10 { rttAvgDiffThreshold = 2 - rttAvgPctDiffThreshold = 30 + rttAvgDiffPctThreshold = 30 } else if bestRTTAvg < 50 { rttAvgDiffThreshold = 5 - rttAvgPctDiffThreshold = 20 + rttAvgDiffPctThreshold = 20 } else if bestRTTAvg < 100 { rttAvgDiffThreshold = 10 - rttAvgPctDiffThreshold = 10 + rttAvgDiffPctThreshold = 10 } else { rttAvgDiffThreshold = 20 - rttAvgPctDiffThreshold = 5 + rttAvgDiffPctThreshold = 5 } - return (concDiffPct > 10 && concDiff > 5) && (uint64(rttAvgDiff) > rttAvgDiffThreshold || rttAvgPctDiff >= rttAvgPctDiffThreshold) + VerboseDebugf("[HillClimbingUpdater] shouldShrink bestRTTAvg: %d, rttAvgDiff: %d, rttAvgDiffPct: %d, "+ + "rttAvgDiffThreshold: %d, rttAvgDiffPctThreshold: %d.", bestRTTAvg, rttAvgDiff, rttAvgDiffPct, + rttAvgDiffPctThreshold, rttAvgDiffPctThreshold) + + return (diffPct > 10 && diff > 5) && + (rttAvgDiff > rttAvgDiffThreshold || rttAvgDiffPct >= rttAvgDiffPctThreshold) } return false } func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error { + if option == HillClimbingOptionDoNothing { + VerboseDebugf("[HillClimbingUpdater] The option is do nothing, the limitation will not be updated.") + return nil + } + limitation := float64(u.limiter.limitation.Load()) oldLimitation := limitation bestLimitation := float64(u.limiter.bestLimitation.Load()) + updateInterval := u.limiter.updateInterval.Load() alpha := 1.5 * math.Log(limitation) beta := 0.8 * math.Log(limitation) - logUpdateInterval := math.Log2(float64(u.limiter.updateInterval.Load()) / 1000.0) + logUpdateInterval := math.Max(1.0, math.Log2(float64(updateInterval.Milliseconds())/1000.0)) + + VerboseDebugf("[HillClimbingUpdater] Before calculating new limitation, option: %d, limitation: %f, "+ + "bestLimitation: %f, alpha: %f, beta: %f, logUpdateInterval: %f, updateInterval: %d", option, limitation, + bestLimitation, alpha, beta, logUpdateInterval, updateInterval.Milliseconds()) switch option { case HillClimbingOptionExtendPlus: diff --git a/filter/adaptivesvc/limiter/limiter.go b/filter/adaptivesvc/limiter/limiter.go index cdc730ed12..82424fc99a 100644 --- a/filter/adaptivesvc/limiter/limiter.go +++ b/filter/adaptivesvc/limiter/limiter.go @@ -34,9 +34,14 @@ const ( type Limiter interface { Inflight() uint64 Remaining() uint64 + // Acquire inspects the current status of the system: + // - if reaches the limitation, reject the request immediately. + // - if not, grant this request and return an Updater defined below. Acquire() (Updater, error) } type Updater interface { + // DoUpdate is called once an invocation is finished, it tells Updater that the invocation is finished, and please + // update the Remaining, Inflight parameters of the Limiter. DoUpdate() error } diff --git a/filter/adaptivesvc/limiter/utils.go b/filter/adaptivesvc/limiter/utils.go index fb446ae26f..e24427e233 100644 --- a/filter/adaptivesvc/limiter/utils.go +++ b/filter/adaptivesvc/limiter/utils.go @@ -17,6 +17,10 @@ package limiter +import ( + "time" +) + import ( "dubbo.apache.org/dubbo-go/v3/common/logger" ) @@ -63,3 +67,10 @@ func minUint64(lhs, rhs uint64) uint64 { } return rhs } + +func minDuration(lhs, rhs time.Duration) time.Duration { + if lhs < rhs { + return lhs + } + return rhs +}