Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -501,7 +502,7 @@ func testScheduler(t *testing.T) {
testTableSequentialTimes(t, t1uuid, t2uuid)
})

t.Run("ALTER both tables, elligible for concurrenct", func(t *testing.T) {
t.Run("ALTER both tables, elligible for concurrent", func(t *testing.T) {
// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
Expand Down Expand Up @@ -536,9 +537,11 @@ func testScheduler(t *testing.T) {
})
testTableCompletionTimes(t, t2uuid, t1uuid)
})
t.Run("ALTER both tables, elligible for concurrenct, with throttling", func(t *testing.T) {
t.Run("ALTER both tables, elligible for concurrent, with throttling", func(t *testing.T) {
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a "wait-for", because onlineddl.ThrottleAllMigrations is now an async flow.


// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
Expand All @@ -555,6 +558,7 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
})

t.Run("check ready to complete (before)", func(t *testing.T) {
for _, uuid := range []string{t1uuid, t2uuid} {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
Expand Down Expand Up @@ -607,6 +611,8 @@ func testScheduler(t *testing.T) {

testTableCompletionTimes(t, t2uuid, t1uuid)
})
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)

t.Run("REVERT both tables concurrent, postponed", func(t *testing.T) {
t1uuid = testRevertMigration(t, createRevertParams(t1uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true))
t2uuid = testRevertMigration(t, createRevertParams(t2uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true))
Expand Down
65 changes: 30 additions & 35 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
ThrottledAppsTimeout = 60 * time.Second
)

// VtgateExecQuery runs a query on VTGate using given query params
func VtgateExecQuery(t *testing.T, vtParams *mysql.ConnParams, query string, expectError string) *sqltypes.Result {
t.Helper()
Expand Down Expand Up @@ -313,16 +316,35 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {

// CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

found := false
for _, row := range r.Named().Rows {
if throttlerApp.Equals(row.AsString("app", "")) {
found = true
ctx, cancel := context.WithTimeout(context.Background(), ThrottledAppsTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

appFound := false
for _, row := range r.Named().Rows {
if throttlerApp.Equals(row.AsString("app", "")) {
appFound = true
}
}
if appFound == expectFind {
// we're all good
return
}

select {
case <-ctx.Done():
assert.Failf(t, "CheckThrottledApps timed out waiting for %v to be in throttled status '%v'", throttlerApp.String(), expectFind)
return
case <-ticker.C:
}
}
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found)
}

// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp
Expand Down Expand Up @@ -350,33 +372,6 @@ func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid st
return
}

// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as enabled.
func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, timeout time.Duration) {
jsonPath := "IsEnabled"
url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort)

ctx, cancel := context.WithTimeout(context.Background(), throttlerConfigTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
body := getHTTPBody(url)
val, err := jsonparser.GetBoolean([]byte(body), jsonPath)
require.NoError(t, err)
if val {
return
}
select {
case <-ctx.Done():
t.Error("timeout waiting for tablet's throttler status to be enabled")
return
case <-ticker.C:
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here is just code I found was unused.

func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
Expand Down
26 changes: 12 additions & 14 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,23 @@ func UnthrottleApp(clusterInstance *cluster.LocalProcessCluster, throttlerApp th
return throttleApp(clusterInstance, throttlerApp, false)
}

// ThrottleAppAndWaitUntilTabletsConfirm
func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) {
res, err := throttleApp(clusterInstance, throttlerApp, true)
if err != nil {
return res, err
}
func WaitUntilTabletsConfirmThrottledApp(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name, expectThrottled bool) {
for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
WaitForThrottledApp(t, tablet, throttlerApp, true, ConfigTimeout)
WaitForThrottledApp(t, tablet, throttlerApp, expectThrottled, ConfigTimeout)
}
}
}
}

// ThrottleAppAndWaitUntilTabletsConfirm
func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) {
res, err := throttleApp(clusterInstance, throttlerApp, true)
if err != nil {
return res, err
}
WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, true)
return res, nil
}

Expand All @@ -232,13 +236,7 @@ func UnthrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *clus
if err != nil {
return res, err
}
for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
WaitForThrottledApp(t, tablet, throttlerApp, false, ConfigTimeout)
}
}
}
WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are just a simple refactor into a function. Not really related to the overall changes in the PR.

return res, nil
}

Expand Down
18 changes: 18 additions & 0 deletions go/vt/proto/topodata/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions go/vt/proto/vttime/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) er
panic("implement me")
}

func (t *noopVCursor) ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error {
panic("implement me")
}

func (t *noopVCursor) ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
panic("implement me")
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

Expand Down Expand Up @@ -111,6 +112,8 @@ type (
ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error)
// SetExec takes in k,v pair and use executor to set them in topo metadata.
SetExec(ctx context.Context, name string, value string) error
// ThrottleApp sets a ThrottlerappRule in topo
ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error

// CanUseSetVar returns true if system_settings can use SET_VAR hint.
CanUseSetVar() bool
Expand Down
89 changes: 89 additions & 0 deletions go/vt/vtgate/engine/throttle_app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ Primitive = (*ThrottleApp)(nil)

// ThrottleApp represents the instructions to perform an online schema change via vtctld
type ThrottleApp struct {
Keyspace *vindexes.Keyspace
ThrottledAppRule *topodatapb.ThrottledAppRule

noTxNeeded

noInputs
}

func (v *ThrottleApp) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "ThrottleApp",
Keyspace: v.Keyspace,
Other: map[string]any{
"appName": v.ThrottledAppRule.Name,
"expireAt": v.ThrottledAppRule.ExpiresAt,
"ratio": v.ThrottledAppRule.Ratio,
},
}
}

// RouteType implements the Primitive interface
func (v *ThrottleApp) RouteType() string {
return "ThrottleApp"
}

// GetKeyspaceName implements the Primitive interface
func (v *ThrottleApp) GetKeyspaceName() string {
return v.Keyspace.Name
}

// GetTableName implements the Primitive interface
func (v *ThrottleApp) GetTableName() string {
return ""
}

// TryExecute implements the Primitive interface
func (v *ThrottleApp) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
if err := vcursor.ThrottleApp(ctx, v.ThrottledAppRule); err != nil {
return nil, err
}
return &sqltypes.Result{}, nil
}

// TryStreamExecute implements the Primitive interface
func (v *ThrottleApp) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := v.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return err
}
return callback(results)
}

// GetFields implements the Primitive interface
func (v *ThrottleApp) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] GetFields is not reachable")
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat
case sqlparser.DDLStatement:
return buildGeneralDDLPlan(ctx, query, stmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
case *sqlparser.AlterMigration:
return buildAlterMigrationPlan(query, vschema, enableOnlineDDL)
return buildAlterMigrationPlan(query, stmt, vschema, enableOnlineDDL)
case *sqlparser.RevertMigration:
return buildRevertMigrationPlan(query, stmt, vschema, enableOnlineDDL)
case *sqlparser.ShowMigrationLogs:
Expand Down
Loading