-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Tablet throttler: throttled app configuration via vtctl UpdateThrottlerConfig
#13351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66d2c0d
d5b04e0
8290c6b
f127649
3684635
6010db7
dbed3bf
1a20570
8a3256f
332569d
18c16d6
a37ec02
22be670
de54ced
903dfde
dfde7dd
7df25f1
79dc0dd
b0d1180
0df77d7
b795070
d375287
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,6 @@ package vrepl | |
| import ( | ||
| "flag" | ||
| "fmt" | ||
| "io" | ||
| "os" | ||
| "path" | ||
| "strings" | ||
|
|
@@ -150,6 +149,12 @@ var ( | |
| ` | ||
| ) | ||
|
|
||
| const ( | ||
| customThreshold = 5 | ||
| throttlerEnabledTimeout = 60 * time.Second | ||
| noCustomQuery = "" | ||
| ) | ||
|
|
||
| func TestMain(m *testing.M) { | ||
| defer cluster.PanicHandler(nil) | ||
| flag.Parse() | ||
|
|
@@ -192,7 +197,6 @@ func TestMain(m *testing.M) { | |
| if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { | ||
| return 1, err | ||
| } | ||
|
|
||
| vtgateInstance := clusterInstance.NewVtgateInstance() | ||
| // Start vtgate | ||
| if err := vtgateInstance.Setup(); err != nil { | ||
|
|
@@ -216,29 +220,6 @@ func TestMain(m *testing.M) { | |
|
|
||
| } | ||
|
|
||
| // direct per-tablet throttler API instruction | ||
| func throttleResponse(tablet *cluster.Vttablet, path string) (respBody string, err error) { | ||
| apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.VttabletProcess.TabletHostname, tablet.HTTPPort, path) | ||
| resp, err := httpClient.Get(apiURL) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| defer resp.Body.Close() | ||
| b, err := io.ReadAll(resp.Body) | ||
| respBody = string(b) | ||
| return respBody, err | ||
| } | ||
|
|
||
| // direct per-tablet throttler API instruction | ||
| func throttleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) { | ||
| return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp)) | ||
| } | ||
|
|
||
| // direct per-tablet throttler API instruction | ||
| func unthrottleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) { | ||
| return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp)) | ||
| } | ||
|
|
||
| func TestSchemaChange(t *testing.T) { | ||
| defer cluster.PanicHandler(t) | ||
|
|
||
|
|
@@ -257,16 +238,34 @@ func TestSchemaChange(t *testing.T) { | |
| err := clusterInstance.WaitForTabletsToHealthyInVtgate() | ||
| require.NoError(t, err) | ||
|
|
||
| _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "") | ||
| require.NoError(t, err) | ||
| t.Run("WaitForSrvKeyspace", func(t *testing.T) { | ||
| for _, ks := range clusterInstance.Keyspaces { | ||
| t.Run(ks.Name, func(t *testing.T) { | ||
| err := throttler.WaitForSrvKeyspace(clusterInstance, cell, ks.Name) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| require.NoError(t, err) | ||
| }) | ||
| } | ||
| }) | ||
| t.Run("updating throttler config", func(t *testing.T) { | ||
| _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, noCustomQuery) | ||
| require.NoError(t, err) | ||
| }) | ||
|
|
||
| for _, ks := range clusterInstance.Keyspaces { | ||
| for _, shard := range ks.Shards { | ||
| for _, tablet := range shard.Vttablets { | ||
| throttler.WaitForThrottlerStatusEnabled(t, tablet, true, nil, extendedMigrationWait) | ||
| } | ||
| t.Run("checking throttler config", func(t *testing.T) { | ||
| for _, ks := range clusterInstance.Keyspaces { | ||
| t.Run(ks.Name, func(t *testing.T) { | ||
| for _, shard := range ks.Shards { | ||
| t.Run(shard.Name, func(t *testing.T) { | ||
| for _, tablet := range shard.Vttablets { | ||
| t.Run(tablet.Alias, func(t *testing.T) { | ||
| throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: customThreshold}, throttlerEnabledTimeout) | ||
| }) | ||
| } | ||
| }) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| }) | ||
|
|
||
| testWithInitialSchema(t) | ||
| t.Run("alter non_online", func(t *testing.T) { | ||
|
|
@@ -412,18 +411,9 @@ func TestSchemaChange(t *testing.T) { | |
| var uuid string | ||
|
|
||
| func() { | ||
| for _, shard := range shards { | ||
| // technically we only need to throttle on a REPLICA, because that's the | ||
| // vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't | ||
| // change the essence of this test. | ||
| for _, tablet := range shard.Vttablets { | ||
| body, err := throttleApp(tablet, throttlerapp.VStreamerName) | ||
| defer unthrottleApp(tablet, throttlerapp.VStreamerName) | ||
|
|
||
| assert.NoError(t, err) | ||
| assert.Contains(t, body, throttlerapp.VStreamerName) | ||
| } | ||
| } | ||
| _, err := throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.VStreamerName) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| defer throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.VStreamerName) | ||
| require.NoError(t, err) | ||
|
|
||
| uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) | ||
| _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning) | ||
|
|
@@ -520,24 +510,10 @@ func TestSchemaChange(t *testing.T) { | |
| t.Run(fmt.Sprintf("PlannedReparentShard via throttling %d/2", (currentPrimaryTabletIndex+1)), func(t *testing.T) { | ||
|
|
||
| insertRows(t, 2) | ||
| for i := range shards { | ||
| var body string | ||
| var err error | ||
| switch i { | ||
| case 0: | ||
| // this is the shard where we run PRS | ||
| // Use per-tablet throttling API | ||
| body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) | ||
| defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) | ||
| case 1: | ||
| // no PRS on this shard | ||
| // Use per-tablet throttling API | ||
| body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) | ||
| defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) | ||
| } | ||
| assert.NoError(t, err) | ||
| assert.Contains(t, body, throttlerapp.OnlineDDLName) | ||
| } | ||
| _, err = throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.OnlineDDLName) | ||
| assert.NoError(t, err) | ||
| defer throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.OnlineDDLName) | ||
|
|
||
| uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) | ||
|
|
||
| t.Run("wait for migration to run", func(t *testing.T) { | ||
|
|
@@ -585,22 +561,8 @@ func TestSchemaChange(t *testing.T) { | |
| onlineddl.PrintQueryResult(os.Stdout, rs) | ||
| }) | ||
| t.Run("unthrottle", func(t *testing.T) { | ||
| for i := range shards { | ||
| var body string | ||
| var err error | ||
| switch i { | ||
| case 0: | ||
| // this is the shard where we run PRS | ||
| // Use per-tablet throttling API | ||
| body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) | ||
| case 1: | ||
| // no PRS on this shard | ||
| // Use per-tablet throttling API | ||
| body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) | ||
| } | ||
| assert.NoError(t, err) | ||
| assert.Contains(t, body, throttlerapp.OnlineDDLName) | ||
| } | ||
| _, err = throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.OnlineDDLName) | ||
| assert.NoError(t, err) | ||
| }) | ||
| t.Run("expect completion", func(t *testing.T) { | ||
| _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) | ||
|
|
@@ -818,36 +780,28 @@ func TestSchemaChange(t *testing.T) { | |
| // - two shards as opposed to one | ||
| // - tablet throttling | ||
| t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) { | ||
| // shard 0 will run normally, shard 1 will be throttled | ||
| defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) | ||
| t.Run("throttle shard 1", func(t *testing.T) { | ||
| body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) | ||
| assert.NoError(t, err) | ||
| assert.Contains(t, body, throttlerapp.OnlineDDLName) | ||
| }) | ||
| // shard 0 will run normally, shard 1 will be postponed | ||
|
|
||
| var uuid string | ||
| t.Run("run migrations, expect 1st to complete, 2nd to be running", func(t *testing.T) { | ||
| uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) | ||
| t.Run("run migrations, expect running on both shards", func(t *testing.T) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that most of the changes in this file helped to deflake the tests? Maybe it's another cherry-pick?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the new design, it's impossible to tell vitess "throttle one shard but not this other shard". Previously, it was possible since we hit a shard's So the new design means we need to change the tests. And we don't need to test something that we don't support... |
||
| uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess --postpone-launch", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) | ||
| onlineddl.CheckLaunchMigration(t, &vtParams, shards[0:1], uuid, "-80", true) | ||
| { | ||
| status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[:1], uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) | ||
| fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) | ||
| onlineddl.CheckMigrationStatus(t, &vtParams, shards[:1], uuid, schema.OnlineDDLStatusComplete) | ||
| } | ||
| { | ||
| // shard 1 is throttled | ||
| status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[1:], uuid, normalMigrationWait, schema.OnlineDDLStatusRunning) | ||
| status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[1:], uuid, normalMigrationWait, schema.OnlineDDLStatusQueued) | ||
| fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) | ||
| onlineddl.CheckMigrationStatus(t, &vtParams, shards[1:], uuid, schema.OnlineDDLStatusRunning) | ||
| onlineddl.CheckMigrationStatus(t, &vtParams, shards[1:], uuid, schema.OnlineDDLStatusQueued) | ||
| } | ||
| }) | ||
| t.Run("check cancel migration", func(t *testing.T) { | ||
| onlineddl.CheckCancelAllMigrations(t, &vtParams, 1) | ||
| }) | ||
| t.Run("unthrottle shard 1", func(t *testing.T) { | ||
| body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) | ||
| assert.NoError(t, err) | ||
| assert.Contains(t, body, throttlerapp.OnlineDDLName) | ||
| t.Run("launch-all", func(t *testing.T) { | ||
| onlineddl.CheckLaunchAllMigrations(t, &vtParams, 0) | ||
| }) | ||
| var revertUUID string | ||
| t.Run("issue revert migration", func(t *testing.T) { | ||
|
|
@@ -859,12 +813,12 @@ func TestSchemaChange(t *testing.T) { | |
| revertUUID = row.AsString("uuid", "") | ||
| assert.NotEmpty(t, revertUUID) | ||
| }) | ||
| t.Run("expect one revert successful, another failed", func(t *testing.T) { | ||
| t.Run("migrations were cancelled, revert should impossible", func(t *testing.T) { | ||
| { | ||
| // shard 0 migration was complete. Revert should be successful | ||
| status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[:1], revertUUID, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) | ||
| fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) | ||
| onlineddl.CheckMigrationStatus(t, &vtParams, shards[:1], revertUUID, schema.OnlineDDLStatusComplete) | ||
| onlineddl.CheckMigrationStatus(t, &vtParams, shards[:1], revertUUID, schema.OnlineDDLStatusFailed) | ||
| } | ||
| { | ||
| // shard 0 migration was cancelled. Revert should not be possible | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why this isn't also part of
throttledAppRule?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThrottledAppRulehas anExpiresAt, which is a declarative and absolute value to when the rule expires. However, for the user it's more convenient to speak about duration. If I'm having a problem right now in production and I want to throttle an app, I want to throttle it for, say, the next 4 hours. So I specify4h. Vitess then computes and stores the absoluteExpiresAt.Does that make sense?