diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index c58422a8f91..71be14a5f7f 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) const ( @@ -57,7 +58,7 @@ const ( dormantPeriod = time.Minute defaultThrottleTTLMinutes = 60 - defaultThrottleRatio = 1.0 + DefaultThrottleRatio = 1.0 shardStoreName = "shard" selfStoreName = "self" @@ -108,6 +109,14 @@ func init() { rand.Seed(time.Now().UnixNano()) } +// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that +// it can be mocked in unit tests +type throttlerTopoService interface { + GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) + FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) + GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) +} + // Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data, // aggregates, reads inventory, provides information, etc. type Throttler struct { @@ -123,7 +132,7 @@ type Throttler struct { env tabletenv.Env pool *connpool.Pool tabletTypeFunc func() topodatapb.TabletType - ts *topo.Server + ts throttlerTopoService srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter @@ -446,7 +455,7 @@ func (throttler *Throttler) Open() error { throttler.pool.Open(throttler.env.Config().DB.AppWithDB(), throttler.env.Config().DB.DbaWithDB(), throttler.env.Config().DB.AppDebugWithDB()) atomic.StoreInt64(&throttler.isOpen, 1) - throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), defaultThrottleRatio) + throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio) if throttlerConfigViaTopo { log.Infof("Throttler: throttler-config-via-topo detected") @@ -606,8 +615,11 @@ func (throttler *Throttler) Operate(ctx context.Context) { throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) recentCheckTicker := addTicker(time.Second) + tmClient := tmclient.NewTabletManagerClient() + go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") + defer tmClient.Close() for _, t := range tickers { defer t.Stop() // since we just started the tickers now, speed up the ticks by forcing an immediate tick @@ -765,8 +777,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error { var throttleMetricFunc func() *mysql.MySQLThrottleMetric if clusterName == selfStoreName { + // Throttler is probing its own tablet's metrics: throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe) } else { + // Throttler probing other tablets: throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, clusterName, probe) } throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc) @@ -780,7 +794,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error { // refreshMySQLInventory will re-structure the inventory based on reading config settings func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { - // distribute the query/threshold from the throttler down to the cluster settings and from there to the probes metricsQuery := throttler.GetMetricsQuery() metricsThreshold := throttler.MetricsThreshold.Load() @@ -822,13 +835,20 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { } if clusterName == selfStoreName { - // special case: just looking at this tablet's MySQL server + // special case: just looking at this tablet's MySQL server. // We will probe this "cluster" (of one server) is a special way. addInstanceKey("", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes) throttler.mysqlClusterProbesChan <- clusterProbes return } if atomic.LoadInt64(&throttler.isLeader) == 0 { + // This tablet may have used to be the primary, but it isn't now. It may have a recollection + // of previous clusters it used to probe. It may have recollection of specific probes for such clusters. + // This now ensures any existing cluster probes are overrridden with an empty list of probes. + // `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn + // be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in + // updateMySQLClusterProbes(). + throttler.mysqlClusterProbesChan <- clusterProbes // not the leader (primary tablet)? Then no more work for us. return } @@ -934,7 +954,7 @@ func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, rati expireAt = now.Add(defaultThrottleTTLMinutes * time.Minute) } if ratio < 0 { - ratio = defaultThrottleRatio + ratio = DefaultThrottleRatio } appThrottle = base.NewAppThrottle(appName, expireAt, ratio) } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go new file mode 100644 index 00000000000..2c4f3545153 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -0,0 +1,192 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package throttle + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/patrickmn/go-cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +const ( + waitForProbesTimeout = 30 * time.Second +) + +type FakeTopoServer struct { +} + +func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) { + tablet := &topo.TabletInfo{ + Tablet: &topodatapb.Tablet{ + Alias: alias, + Hostname: "127.0.0.1", + MysqlHostname: "127.0.0.1", + MysqlPort: 3306, + PortMap: map[string]int32{"vt": 5000}, + Type: topodatapb.TabletType_REPLICA, + }, + } + return tablet, nil +} + +func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) { + aliases := []*topodatapb.TabletAlias{ + {Cell: "zone1", Uid: 100}, + {Cell: "zone2", Uid: 101}, + } + return aliases, nil +} + +func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + ks := &topodatapb.SrvKeyspace{} + return ks, nil +} + +type FakeHeartbeatWriter struct { +} + +func (w FakeHeartbeatWriter) RequestHeartbeats() { +} + +func TestIsAppThrottled(t *testing.T) { + throttler := Throttler{ + throttledApps: cache.New(cache.NoExpiration, 0), + heartbeatWriter: FakeHeartbeatWriter{}, + } + assert.False(t, throttler.IsAppThrottled("app1")) + assert.False(t, throttler.IsAppThrottled("app2")) + assert.False(t, throttler.IsAppThrottled("app3")) + assert.False(t, throttler.IsAppThrottled("app4")) + // + throttler.ThrottleApp("app1", time.Now().Add(time.Hour), DefaultThrottleRatio) + throttler.ThrottleApp("app2", time.Now(), DefaultThrottleRatio) + throttler.ThrottleApp("app3", time.Now().Add(time.Hour), DefaultThrottleRatio) + throttler.ThrottleApp("app4", time.Now().Add(time.Hour), 0) + assert.False(t, throttler.IsAppThrottled("app2")) // expired + assert.True(t, throttler.IsAppThrottled("app3")) + assert.False(t, throttler.IsAppThrottled("app4")) // ratio is zero + // + throttler.UnthrottleApp("app1") + throttler.UnthrottleApp("app2") + throttler.UnthrottleApp("app3") + throttler.UnthrottleApp("app4") + assert.False(t, throttler.IsAppThrottled("app1")) + assert.False(t, throttler.IsAppThrottled("app2")) + assert.False(t, throttler.IsAppThrottled("app3")) + assert.False(t, throttler.IsAppThrottled("app4")) +} + +// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which +// is called periodically in actual throttler. For a given cluster name, it generates a list of probes +// the throttler will use to check metrics. +// On a "self" cluster, that list is expect to probe the tablet itself. +// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a +// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty. +func TestRefreshMySQLInventory(t *testing.T) { + metricsQuery := "select 1" + config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{ + selfStoreName: {}, + "ks1": {}, + "ks2": {}, + } + clusters := config.Settings().Stores.MySQL.Clusters + for _, s := range clusters { + s.MetricQuery = metricsQuery + s.ThrottleThreshold = &atomic.Uint64{} + s.ThrottleThreshold.Store(1) + } + + throttler := &Throttler{ + mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), + mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), + ts: &FakeTopoServer{}, + mysqlInventory: mysql.NewInventory(), + } + throttler.metricsQuery.Store(metricsQuery) + throttler.initThrottleTabletTypes() + + validateClusterProbes := func(t *testing.T, ctx context.Context) { + testName := fmt.Sprintf("leader=%v", atomic.LoadInt64(&throttler.isLeader)) + t.Run(testName, func(t *testing.T) { + // validateProbesCount expectes number of probes according to cluster name and throttler's leadership status + validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) { + if clusterName == selfStoreName { + assert.Equal(t, 1, len(*probes)) + } else if atomic.LoadInt64(&throttler.isLeader) > 0 { + assert.NotZero(t, len(*probes)) + } else { + assert.Empty(t, *probes) + } + } + t.Run("waiting for probes", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout) + defer cancel() + numClusterProbesResults := 0 + for { + select { + case probes := <-throttler.mysqlClusterProbesChan: + // Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does + // not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as + // read from it. We do not compete here with any other goroutine. + assert.NotNil(t, probes) + + throttler.updateMySQLClusterProbes(ctx, probes) + + numClusterProbesResults++ + validateProbesCount(t, probes.ClusterName, probes.InstanceProbes) + + if numClusterProbesResults == len(clusters) { + // Achieved our goal + return + } + case <-ctx.Done(): + assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters)) + } + } + }) + t.Run("validating probes", func(t *testing.T) { + for clusterName := range clusters { + probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName] + require.True(t, ok) + validateProbesCount(t, clusterName, probes) + } + }) + }) + } + // + ctx := context.Background() + + t.Run("initial, not leader", func(t *testing.T) { + atomic.StoreInt64(&throttler.isLeader, 0) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) + + t.Run("promote", func(t *testing.T) { + atomic.StoreInt64(&throttler.isLeader, 1) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) + + t.Run("demote, expect cleanup", func(t *testing.T) { + atomic.StoreInt64(&throttler.isLeader, 0) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) +}