Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HSDEV-10004] - GCC to use a single bucket manager for bitrate and backoff #14

Merged
merged 16 commits into from
Jul 3, 2024
5 changes: 3 additions & 2 deletions pkg/gcc/delay_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type DelayStats struct {
TargetBitrate int
ReceivedBitrate int
LatestRTT time.Duration
BucketStatus string
}

type now func() time.Time
Expand Down Expand Up @@ -53,7 +54,7 @@ type delayControllerConfig struct {
rateControllerOptions *RateControllerBucketsOptions
}

func newDelayController(c delayControllerConfig) *delayController {
func newDelayController(c delayControllerConfig, bitrateControlBucketsManager *Manager) *delayController {
ackPipe := make(chan []cc.Acknowledgment)
ackRatePipe := make(chan []cc.Acknowledgment)

Expand All @@ -67,7 +68,7 @@ func newDelayController(c delayControllerConfig) *delayController {
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_delay_controller"),
}

rateController := newRateControllerBuckets(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, c.rateControllerOptions, func(ds DelayStats) {
rateController := newRateControllerBuckets(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, c.rateControllerOptions, bitrateControlBucketsManager, func(ds DelayStats) {
delayController.log.Infof("delaystats: %v", ds)
if delayController.onUpdateCallback != nil {
delayController.onUpdateCallback(ds)
Expand Down
98 changes: 64 additions & 34 deletions pkg/gcc/loss_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,63 @@ import (
type LossStats struct {
TargetBitrate int
AverageLoss float64
BucketStatus string
}

type lossBasedBandwidthEstimator struct {
lock sync.Mutex
maxBitrate int
minBitrate int
bitrate int
averageLoss float64
lastLossUpdate time.Time
lastIncrease time.Time
lastDecrease time.Time
options LossBasedBandwidthEstimatorOptions
log logging.LeveledLogger
lock sync.Mutex
maxBitrate int
minBitrate int
bitrate int
currentBucketStatus string
lastBucketUpdateBitrate uint64
averageLoss float64
lastLossUpdate time.Time
lastIncrease time.Time
lastDecrease time.Time
options LossBasedBandwidthEstimatorOptions
bitrateControlBucketsManager *Manager
log logging.LeveledLogger
}

type LossBasedBandwidthEstimatorOptions struct {
IncreaseLossThreshold float64
IncreaseTimeThreshold time.Duration
IncreaseBitrateChange int
IncreaseFactor float64
DecreaseLossThreshold float64
DecreaseTimeThreshold time.Duration
DecreaseBitrateChange int
DecreaseFactor float64
}

func newLossBasedBWE(initialBitrate int, minBitrate int, maxBitrate int, options *LossBasedBandwidthEstimatorOptions) *lossBasedBandwidthEstimator {
func newLossBasedBWE(initialBitrate int, minBitrate int, maxBitrate int, options *LossBasedBandwidthEstimatorOptions, bitrateControlBucketsManager *Manager) *lossBasedBandwidthEstimator {
if options == nil {
// constants from
// https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-6
defaultOptions := LossBasedBandwidthEstimatorOptions{
IncreaseLossThreshold: 0.02,
IncreaseTimeThreshold: 200 * time.Millisecond,
IncreaseBitrateChange: 250000,
IncreaseFactor: 1.05,
DecreaseLossThreshold: 0.1,
DecreaseTimeThreshold: 200 * time.Millisecond,
DecreaseBitrateChange: 250000,
DecreaseFactor: 1.0,
}
options = &defaultOptions
}

return &lossBasedBandwidthEstimator{
lock: sync.Mutex{},
maxBitrate: maxBitrate,
minBitrate: minBitrate,
bitrate: initialBitrate,
averageLoss: 0,
lastLossUpdate: time.Time{},
lastIncrease: time.Time{},
lastDecrease: time.Time{},
options: *options,
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_loss_controller"),
lock: sync.Mutex{},
maxBitrate: maxBitrate,
minBitrate: minBitrate,
bitrate: initialBitrate,
currentBucketStatus: "",
lastBucketUpdateBitrate: uint64(initialBitrate),
averageLoss: 0,
lastLossUpdate: time.Time{},
lastIncrease: time.Time{},
lastDecrease: time.Time{},
options: *options,
bitrateControlBucketsManager: bitrateControlBucketsManager,
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_loss_controller"),
}
}

Expand All @@ -76,11 +83,15 @@ func (e *lossBasedBandwidthEstimator) getEstimate(wantedRate int) LossStats {
if e.bitrate <= 0 {
e.bitrate = clampInt(wantedRate, e.minBitrate, e.maxBitrate)
}

e.bitrate = minInt(wantedRate, e.bitrate)

latestBitrate, _ := e.bitrateControlBucketsManager.getBucket(uint64(e.bitrate))

return LossStats{
TargetBitrate: e.bitrate,
TargetBitrate: int(latestBitrate),
AverageLoss: e.averageLoss,
BucketStatus: e.currentBucketStatus,
}
}

Expand All @@ -106,14 +117,33 @@ func (e *lossBasedBandwidthEstimator) updateLossEstimate(results []cc.Acknowledg
increaseLoss := math.Max(e.averageLoss, lossRatio)
decreaseLoss := math.Min(e.averageLoss, lossRatio)

if increaseLoss < e.options.IncreaseLossThreshold && time.Since(e.lastIncrease) > e.options.IncreaseTimeThreshold {
e.log.Infof("loss controller increasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", e.averageLoss, decreaseLoss, increaseLoss)
e.lastIncrease = time.Now()
e.bitrate = clampInt(int(e.bitrate+e.options.IncreaseBitrateChange), e.minBitrate, e.maxBitrate)
} else if decreaseLoss > e.options.DecreaseLossThreshold && time.Since(e.lastDecrease) > e.options.DecreaseTimeThreshold {
e.log.Infof("loss controller decreasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", e.averageLoss, decreaseLoss, increaseLoss)
e.lastDecrease = time.Now()
e.bitrate = clampInt(int(e.bitrate-e.options.DecreaseBitrateChange), e.minBitrate, e.maxBitrate)
e.currentBucketStatus = ""

if increaseLoss < e.options.IncreaseLossThreshold {
if time.Since(e.lastIncrease) > e.options.IncreaseTimeThreshold {
e.log.Infof("loss controller increasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v, currentBitrate: %v", e.averageLoss, decreaseLoss, increaseLoss, e.bitrate)
suggestedTarget := clampInt(int(e.options.IncreaseFactor*float64(e.bitrate)), e.minBitrate, e.maxBitrate)
currentBitrateBucket, _ := e.bitrateControlBucketsManager.getBucket(uint64(e.bitrate))
newBitrateBucket, _ := e.bitrateControlBucketsManager.getBucket(uint64(suggestedTarget))
if currentBitrateBucket != newBitrateBucket {
err := e.bitrateControlBucketsManager.CanIncreaseToBitrate(currentBitrateBucket, newBitrateBucket)
if err == nil {
e.lastIncrease = time.Now()
e.bitrate = suggestedTarget
} else {
e.currentBucketStatus = err.Error()
}
} else {
e.lastIncrease = time.Now()
e.bitrate = suggestedTarget
}
}
} else if decreaseLoss > e.options.DecreaseLossThreshold {
if time.Since(e.lastDecrease) > e.options.DecreaseTimeThreshold {
e.log.Infof("loss controller decreasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v, currentBitrate: %v", e.averageLoss, decreaseLoss, increaseLoss, e.bitrate)
e.lastDecrease = time.Now()
e.bitrate = clampInt(int(float64(e.bitrate)*(1.0-e.options.DecreaseFactor*decreaseLoss)), e.minBitrate, e.maxBitrate)
}
}
}

Expand Down
74 changes: 45 additions & 29 deletions pkg/gcc/rate_controller_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type RateControllerBucketsOptions struct {
DecreaseTimeThreshold time.Duration
IncreaseBitrateChange int
DecreaseBitrateChange int
BitrateControlBuckets *BitrateControlBucketsConfig
}

type rateControllerBuckets struct {
Expand All @@ -38,28 +37,21 @@ type rateControllerBuckets struct {
rateControllerOptions *RateControllerBucketsOptions

bitrateControlBucketsManager *Manager
currentBucketStatus string
lastBucketUpdateBitrate uint64
}

func newRateControllerBuckets(now now, initialTargetBitrate, minBitrate, maxBitrate int, rateControllerOptions *RateControllerBucketsOptions, dsw func(DelayStats)) *rateControllerBuckets {
func newRateControllerBuckets(now now, initialTargetBitrate, minBitrate, maxBitrate int, rateControllerOptions *RateControllerBucketsOptions, bitrateControlBucketsManager *Manager, dsw func(DelayStats)) *rateControllerBuckets {
if rateControllerOptions == nil {
defaultOptions := RateControllerBucketsOptions{
IncreaseTimeThreshold: 100 * time.Millisecond,
DecreaseTimeThreshold: 100 * time.Millisecond,
IncreaseBitrateChange: 250000,
DecreaseBitrateChange: 250000,
BitrateControlBuckets: &BitrateControlBucketsConfig{
BitrateStableThreshold: 5 * 25,
HandleUnstableBitrateGracePeriodSec: 2,
BitrateBucketIncrement: 250000,
BackoffDurationsSec: []float64{0, 0, 15, 30, 60},
},
}
rateControllerOptions = &defaultOptions
}

manager := NewManager(rateControllerOptions.BitrateControlBuckets)
manager.InitializeBuckets(uint64(maxBitrate))

return &rateControllerBuckets{
now: now,
initialTargetBitrate: initialTargetBitrate,
Expand All @@ -73,7 +65,9 @@ func newRateControllerBuckets(now now, initialTargetBitrate, minBitrate, maxBitr
lastState: stateIncrease,
latestRTT: 0,
latestReceivedRate: 0,
bitrateControlBucketsManager: manager,
bitrateControlBucketsManager: bitrateControlBucketsManager,
currentBucketStatus: "",
lastBucketUpdateBitrate: uint64(initialTargetBitrate),

rateControllerOptions: rateControllerOptions,
lastIncrease: time.Time{},
Expand All @@ -93,6 +87,13 @@ func (c *rateControllerBuckets) updateRTT(rtt time.Duration) {
c.latestRTT = rtt
}

func (c *rateControllerBuckets) updateBitrate(bitrate int) {
c.lock.Lock()
defer c.lock.Unlock()

c.target = minInt(bitrate, c.target)
}

func (c *rateControllerBuckets) onDelayStats(ds DelayStats) {
now := time.Now()

Expand All @@ -105,24 +106,40 @@ func (c *rateControllerBuckets) onDelayStats(ds DelayStats) {
c.delayStats = ds
c.delayStats.State = c.delayStats.State.transition(ds.Usage)

if c.delayStats.State == stateHold {
c.bitrateControlBucketsManager.HandleBitrateNormal(uint64(c.target))
return
}

var next DelayStats

c.lock.Lock()

c.currentBucketStatus = ""

switch c.delayStats.State {
case stateHold:
// should never occur due to check above, but makes the linter happy
case stateIncrease:
c.bitrateControlBucketsManager.HandleBitrateNormal(uint64(c.target))
next = DelayStats{
Measurement: c.delayStats.Measurement,
Estimate: c.delayStats.Estimate,
Threshold: c.delayStats.Threshold,
LastReceiveDelta: c.delayStats.LastReceiveDelta,
Usage: c.delayStats.Usage,
State: c.delayStats.State,
TargetBitrate: c.target,
ReceivedBitrate: c.latestReceivedRate,
LatestRTT: c.latestRTT,
BucketStatus: c.currentBucketStatus,
}

case stateIncrease:
suggestedTarget := clampInt(c.increase(now), c.minBitrate, c.maxBitrate)
err := c.bitrateControlBucketsManager.CanIncreaseToBitrate(uint64(c.target), uint64(suggestedTarget))
if err == nil {
currentBitrateBucket, _ := c.bitrateControlBucketsManager.getBucket(uint64(c.target))
newBitrateBucket, _ := c.bitrateControlBucketsManager.getBucket(uint64(suggestedTarget))
if currentBitrateBucket != newBitrateBucket {
err := c.bitrateControlBucketsManager.CanIncreaseToBitrate(uint64(c.target), uint64(suggestedTarget))
if err == nil {
c.target = suggestedTarget
currentBitrateBucket = newBitrateBucket
} else {
c.currentBucketStatus = err.Error()
}
} else {
c.target = suggestedTarget
}

Expand All @@ -133,17 +150,15 @@ func (c *rateControllerBuckets) onDelayStats(ds DelayStats) {
LastReceiveDelta: c.delayStats.LastReceiveDelta,
Usage: c.delayStats.Usage,
State: c.delayStats.State,
TargetBitrate: c.target,
TargetBitrate: int(currentBitrateBucket),
ReceivedBitrate: c.latestReceivedRate,
LatestRTT: c.latestRTT,
BucketStatus: c.currentBucketStatus,
}

case stateDecrease:
suggestedTarget := clampInt(c.decrease(now), c.minBitrate, c.maxBitrate)
if suggestedTarget != c.target {
c.bitrateControlBucketsManager.HandleBitrateDecrease(uint64(c.target))
c.target = suggestedTarget
}
c.target = clampInt(c.decrease(now), c.minBitrate, c.maxBitrate)
latestBitrate, _ := c.bitrateControlBucketsManager.getBucket(uint64(c.target))

next = DelayStats{
Measurement: c.delayStats.Measurement,
Expand All @@ -152,9 +167,10 @@ func (c *rateControllerBuckets) onDelayStats(ds DelayStats) {
LastReceiveDelta: c.delayStats.LastReceiveDelta,
Usage: c.delayStats.Usage,
State: c.delayStats.State,
TargetBitrate: c.target,
TargetBitrate: int(latestBitrate),
ReceivedBitrate: c.latestReceivedRate,
LatestRTT: c.latestRTT,
BucketStatus: c.currentBucketStatus,
}
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/gcc/rate_controller_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@
t0 = t0.Add(100 * time.Millisecond)
return t0
}
manager := NewManager(&BitrateControlBucketsConfig{
BitrateStableThreshold: 5 * 25,
HandleUnstableBitrateGracePeriodSec: 2,
BitrateBucketIncrement: 250000,
BackoffDurationsSec: []float64{0, 0, 15, 30, 60},
})
manager.InitializeBuckets(uint64(maxBitrate))

Check failure on line 54 in pkg/gcc/rate_controller_buckets_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
out := make(chan DelayStats)
dc := newRateControllerBuckets(mockNoFn, 100_000, 1_000, 50_000_000, nil, func(ds DelayStats) {
dc := newRateControllerBuckets(mockNoFn, 100_000, 1_000, 50_000_000, nil, manager, func(ds DelayStats) {
out <- ds
})
in := make(chan DelayStats)
Expand Down
Loading
Loading