From cc833cd8dd8fdc3e1fd1c3e64b65c414f9fff840 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 6 Sep 2023 17:59:56 +0300 Subject: [PATCH 1/7] Tablet throttler: empty list of probes on non-leader Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 2b9570a9b6e..dfda62a00cd 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -602,10 +602,10 @@ func (throttler *Throttler) Operate(ctx context.Context) { recentCheckTicker := addTicker(time.Second) tmClient := tmclient.NewTabletManagerClient() - defer tmClient.Close() go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") + defer tmClient.Close() for _, t := range tickers { defer t.Stop() // since we just started the tickers now, speed up the ticks by forcing an immediate tick @@ -786,8 +786,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm var throttleMetricFunc func() *mysql.MySQLThrottleMetric if clusterName == selfStoreName { + // Throttler probing its own tablet's metrics throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe) } else { + // Throttler probing other tablets throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, tmClient, clusterName, probe) } throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc) @@ -801,7 +803,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm // refreshMySQLInventory will re-structure the inventory based on reading config settings func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { - // distribute the query/threshold from the throttler down to the cluster settings and from there to the probes metricsQuery := throttler.GetMetricsQuery() metricsThreshold := throttler.MetricsThreshold.Load() @@ -851,6 +852,9 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { return } if !throttler.isLeader.Load() { + // This tablet may have used to being the primary. Ensure the probes for this cluster are + // overwritten with an empty list + throttler.mysqlClusterProbesChan <- clusterProbes // not the leader (primary tablet)? Then no more work for us. return } From e9ad8c1b731f47918f3548e8d635c5264d25d366 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 7 Sep 2023 08:35:30 +0300 Subject: [PATCH 2/7] unit tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler.go | 10 +- .../tabletserver/throttle/throttler_test.go | 131 ++++++++++++++++++ 2 files changed, 140 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index dfda62a00cd..6b55f742065 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -110,6 +110,14 @@ const ( ThrottleCheckSelf ) +// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that +// it can be mocked in unit tests +type throttlerTopoService interface { + GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) + FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) + GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) +} + // Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data, // aggregates, reads inventory, provides information, etc. type Throttler struct { @@ -125,7 +133,7 @@ type Throttler struct { env tabletenv.Env pool *connpool.Pool tabletTypeFunc func() topodatapb.TabletType - ts *topo.Server + ts throttlerTopoService srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 2995e03b654..2997c98ed6a 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -7,13 +7,53 @@ package throttle import ( + "context" + "fmt" + "sync/atomic" "testing" "time" "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +type FakeTopoServer struct { +} + +func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) { + tablet := &topo.TabletInfo{ + Tablet: &topodatapb.Tablet{ + Alias: alias, + Hostname: "127.0.0.1", + MysqlHostname: "127.0.0.1", + MysqlPort: 3306, + PortMap: map[string]int32{"vt": 5000}, + Type: topodatapb.TabletType_REPLICA, + }, + } + return tablet, nil +} + +func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) { + aliases := []*topodatapb.TabletAlias{ + {Cell: "zone1", Uid: 100}, + {Cell: "zone2", Uid: 101}, + } + return aliases, nil +} + +func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + ks := &topodatapb.SrvKeyspace{} + return ks, nil +} + type FakeHeartbeatWriter struct { } @@ -50,6 +90,7 @@ func TestIsAppThrottled(t *testing.T) { } func TestIsAppExempted(t *testing.T) { + throttler := Throttler{ throttledApps: cache.New(cache.NoExpiration, 0), heartbeatWriter: FakeHeartbeatWriter{}, @@ -75,3 +116,93 @@ func TestIsAppExempted(t *testing.T) { throttler.UnthrottleApp("schema-tracker") // meaningless. App is statically exempted assert.True(t, throttler.IsAppExempted("schema-tracker")) } + +func TestRefreshMySQLInventory(t *testing.T) { + metricsQuery := "select 1" + config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{ + selfStoreName: {}, + "ks1": {}, + "ks2": {}, + } + clusters := config.Settings().Stores.MySQL.Clusters + for _, s := range clusters { + s.MetricQuery = metricsQuery + s.ThrottleThreshold = &atomic.Uint64{} + s.ThrottleThreshold.Store(1) + } + + throttler := &Throttler{ + mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), + mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), + ts: &FakeTopoServer{}, + mysqlInventory: mysql.NewInventory(), + } + throttler.metricsQuery.Store(metricsQuery) + throttler.initThrottleTabletTypes() + + validateClusterProbes := func(t *testing.T, ctx context.Context) { + testName := fmt.Sprintf("leader=%t", throttler.isLeader.Load()) + t.Run(testName, func(t *testing.T) { + t.Run("waiting for probes", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + numClusterProbesResults := 0 + for { + select { + case probes := <-throttler.mysqlClusterProbesChan: + assert.NotNil(t, probes) + + throttler.updateMySQLClusterProbes(ctx, probes) + + numClusterProbesResults++ + if probes.ClusterName == selfStoreName { + assert.Equal(t, 1, len(*probes.InstanceProbes)) + } else if throttler.isLeader.Load() { + assert.NotZero(t, len(*probes.InstanceProbes)) + } else { + assert.Empty(t, *probes.InstanceProbes) + } + if numClusterProbesResults == len(clusters) { + return + } + case <-ctx.Done(): + assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters)) + } + } + }) + t.Run("validating probes", func(t *testing.T) { + for clusterName := range clusters { + probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName] + require.True(t, ok) + if clusterName == selfStoreName { + assert.Equal(t, 1, len(*probes)) + } else if throttler.isLeader.Load() { + assert.NotZero(t, len(*probes)) + } else { + assert.Empty(t, *probes) + } + } + }) + }) + } + // + ctx := context.Background() + + t.Run("initial, not leader", func(t *testing.T) { + throttler.isLeader.Store(false) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) + + t.Run("promote", func(t *testing.T) { + throttler.isLeader.Store(true) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) + + t.Run("demote, expect cleanup", func(t *testing.T) { + throttler.isLeader.Store(false) + throttler.refreshMySQLInventory(ctx) + validateClusterProbes(t, ctx) + }) +} From 247134cb266dd41ebb1cf1d0554e644ee2d4622c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 7 Sep 2023 08:37:22 +0300 Subject: [PATCH 3/7] punctuation Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 6b55f742065..bbb91978bcc 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -794,10 +794,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm var throttleMetricFunc func() *mysql.MySQLThrottleMetric if clusterName == selfStoreName { - // Throttler probing its own tablet's metrics + // Throttler is probing its own tablet's metrics: throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe) } else { - // Throttler probing other tablets + // Throttler probing other tablets: throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, tmClient, clusterName, probe) } throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc) From 03ece2193abc4396f39389496965f353bf3acbdf Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 7 Sep 2023 08:41:50 +0300 Subject: [PATCH 4/7] clarification Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index bbb91978bcc..4d8ebfbf37a 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -853,15 +853,19 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { } if clusterName == selfStoreName { - // special case: just looking at this tablet's MySQL server + // special case: just looking at this tablet's MySQL server. // We will probe this "cluster" (of one server) is a special way. addInstanceKey(nil, "", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes) throttler.mysqlClusterProbesChan <- clusterProbes return } if !throttler.isLeader.Load() { - // This tablet may have used to being the primary. Ensure the probes for this cluster are - // overwritten with an empty list + // This tablet may have used to being the primary, but it isn't now. It may have a recollection + // of previous clusters it used to probe. It may have recollection of cpecific probes for such clusters. + // This now ensures any existing cluster probes are overrridden with an empty list of probes. + // `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn + // be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in + // updateMySQLClusterProbes(). throttler.mysqlClusterProbesChan <- clusterProbes // not the leader (primary tablet)? Then no more work for us. return From c76ff362804d6a0771e7176a859987f49c269865 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 7 Sep 2023 18:44:44 +0300 Subject: [PATCH 5/7] Update go/vt/vttablet/tabletserver/throttle/throttler.go Co-authored-by: Matt Lord Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 4d8ebfbf37a..42c403a2295 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -860,8 +860,8 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { return } if !throttler.isLeader.Load() { - // This tablet may have used to being the primary, but it isn't now. It may have a recollection - // of previous clusters it used to probe. It may have recollection of cpecific probes for such clusters. + // This tablet may have used to be the primary, but it isn't now. It may have a recollection + // of previous clusters it used to probe. It may have recollection of specific probes for such clusters. // This now ensures any existing cluster probes are overrridden with an empty list of probes. // `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn // be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in From dd5ed790610d4160199d1009cd82c74b80068233 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 11 Sep 2023 09:59:01 +0300 Subject: [PATCH 6/7] small refactorl adding comments; increasing timeout Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler_test.go | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 2997c98ed6a..8005509ded0 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -24,6 +24,10 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +const ( + waitForProbesTimeout = 30 * time.Second +) + type FakeTopoServer struct { } @@ -117,6 +121,12 @@ func TestIsAppExempted(t *testing.T) { assert.True(t, throttler.IsAppExempted("schema-tracker")) } +// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which +// is called periodically in actual throttler. For a given cluster name, it generates a list of probes +// the throttler will use to check metrics. +// On a "self" cluster, that list is expect to probe the tablet itself. +// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a +// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty. func TestRefreshMySQLInventory(t *testing.T) { metricsQuery := "select 1" config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{ @@ -143,8 +153,18 @@ func TestRefreshMySQLInventory(t *testing.T) { validateClusterProbes := func(t *testing.T, ctx context.Context) { testName := fmt.Sprintf("leader=%t", throttler.isLeader.Load()) t.Run(testName, func(t *testing.T) { + // validateProbesCount expectes number of probes according to cluster name and throttler's leadership status + validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) { + if clusterName == selfStoreName { + assert.Equal(t, 1, len(*probes)) + } else if throttler.isLeader.Load() { + assert.NotZero(t, len(*probes)) + } else { + assert.Empty(t, *probes) + } + } t.Run("waiting for probes", func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout) defer cancel() numClusterProbesResults := 0 for { @@ -155,14 +175,10 @@ func TestRefreshMySQLInventory(t *testing.T) { throttler.updateMySQLClusterProbes(ctx, probes) numClusterProbesResults++ - if probes.ClusterName == selfStoreName { - assert.Equal(t, 1, len(*probes.InstanceProbes)) - } else if throttler.isLeader.Load() { - assert.NotZero(t, len(*probes.InstanceProbes)) - } else { - assert.Empty(t, *probes.InstanceProbes) - } + validateProbesCount(t, probes.ClusterName, probes.InstanceProbes) + if numClusterProbesResults == len(clusters) { + // Achieved our goal return } case <-ctx.Done(): @@ -174,13 +190,7 @@ func TestRefreshMySQLInventory(t *testing.T) { for clusterName := range clusters { probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName] require.True(t, ok) - if clusterName == selfStoreName { - assert.Equal(t, 1, len(*probes)) - } else if throttler.isLeader.Load() { - assert.NotZero(t, len(*probes)) - } else { - assert.Empty(t, *probes) - } + validateProbesCount(t, clusterName, probes) } }) }) From 6336ad6a7598648f85e247bf282262c2cbe7a78a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:02:50 +0300 Subject: [PATCH 7/7] clarification Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 8005509ded0..c47466df522 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -170,6 +170,9 @@ func TestRefreshMySQLInventory(t *testing.T) { for { select { case probes := <-throttler.mysqlClusterProbesChan: + // Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does + // not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as + // read from it. We do not compete here with any other goroutine. assert.NotNil(t, probes) throttler.updateMySQLClusterProbes(ctx, probes)