Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,31 @@ func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag
return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now)
}

func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) {
return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc)
}

func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) {
// Verify input parameters.
if maxRate < 0 {
return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRate)
config := NewMaxReplicationLagModuleConfig(maxReplicationLag)
config.MaxReplicationLagSec = maxReplicationLag

return newThrottlerFromConfig(manager, name, unit, threadCount, maxRate, config, nowFunc)

}

func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) {
err := maxReplicationLagModuleConfig.Verify()
if err != nil {
return nil, fmt.Errorf("invalid max replication lag config: %w", err)
}
if maxReplicationLag < 0 {
return nil, fmt.Errorf("maxReplicationLag must be >= 0: %v", maxReplicationLag)
if maxRateModuleMaxRate < 0 {
return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRateModuleMaxRate)
}
Copy link
Contributor

@shlomi-noach shlomi-noach Mar 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the maxReplicationLag sanity check removed intentionally? I believe we should keep validating that the value is non-negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. this was removed intentionally. Instead, we call MaxReplicationLagModuleConfig.Verify() here, which already imposes stronger constraints on both target and max replication lag.


// Enable the configured modules.
maxRateModule := NewMaxRateModule(maxRate)
maxRateModule := NewMaxRateModule(maxRateModuleMaxRate)
actualRateHistory := newAggregatedIntervalHistory(1024, 1*time.Second, threadCount)
maxReplicationLagModule, err := NewMaxReplicationLagModule(NewMaxReplicationLagModuleConfig(maxReplicationLag), actualRateHistory, nowFunc)
maxReplicationLagModule, err := NewMaxReplicationLagModule(maxReplicationLagModuleConfig, actualRateHistory, nowFunc)
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type txThrottlerState struct {
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error)
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
Expand All @@ -210,8 +210,8 @@ func resetTxThrottlerFactories() {
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag)
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

Expand Down Expand Up @@ -285,12 +285,15 @@ func (t *TxThrottler) Throttle() (result bool) {
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
TxThrottlerName,
"TPS", /* unit */
1, /* threadCount */
throttler.MaxRateModuleDisabled, /* maxRate */
config.throttlerConfig.MaxReplicationLagSec /* maxReplicationLag */)
maxReplicationLagModuleConfig,
)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestEnabledThrottler(t *testing.T) {
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
assert.Equal(t, 1, threadCount)
return mockThrottler, nil
}
Expand Down