diff --git a/go/mysql/constants.go b/go/mysql/constants.go index bcbfd6ae931..03edba8d907 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -368,6 +368,7 @@ const ( ERServerShutdown = 1053 // not found + ERDbDropExists = 1008 ERCantFindFile = 1017 ERFormNotFound = 1029 ERKeyNotFound = 1032 @@ -379,7 +380,6 @@ const ( ERNoSuchTable = 1146 ERNonExistingTableGrant = 1147 ERKeyDoesNotExist = 1176 - ERDbDropExists = 1008 // permissions ERDBAccessDenied = 1044 @@ -407,6 +407,7 @@ const ( ERRowIsReferenced = 1217 ERCantUpdateWithReadLock = 1223 ERNoDefault = 1230 + ERMasterFatalReadingBinlog = 1236 EROperandColumns = 1241 ERSubqueryNo1Row = 1242 ERWarnDataOutOfRange = 1264 @@ -415,19 +416,18 @@ const ( EROptionPreventsStatement = 1290 ERDuplicatedValueInType = 1291 ERSPDoesNotExist = 1305 + ERNoDefaultForField = 1364 + ErSPNotVarArg = 1414 ERRowIsReferenced2 = 1451 ErNoReferencedRow2 = 1452 - ErSPNotVarArg = 1414 ERInnodbReadOnly = 1874 - ERMasterFatalReadingBinlog = 1236 - ERNoDefaultForField = 1364 // already exists + ERDbCreateExists = 1007 ERTableExists = 1050 ERDupEntry = 1062 ERFileExists = 1086 ERUDFExists = 1125 - ERDbCreateExists = 1007 // aborted ERGotSignal = 1078 @@ -513,7 +513,11 @@ const ( ERWrongFKDef = 1239 ERKeyRefDoNotMatchTableRef = 1240 ERCyclicReference = 1245 + ERIllegalReference = 1247 + ERDerivedMustHaveAlias = 1248 + ERTableNameNotAllowedHere = 1250 ERCollationCharsetMismatch = 1253 + ERWarnDataTruncated = 1265 ERCantAggregate2Collations = 1267 ERCantAggregate3Collations = 1270 ERCantAggregateNCollations = 1271 @@ -527,16 +531,13 @@ const ( ERInvalidOnUpdate = 1294 ERUnknownTimeZone = 1298 ERInvalidCharacterString = 1300 - ERIllegalReference = 1247 - ERDerivedMustHaveAlias = 1248 - ERTableNameNotAllowedHere = 1250 ERQueryInterrupted = 1317 ERTruncatedWrongValueForField = 1366 ERIllegalValueForType = 1367 ERDataTooLong = 1406 ErrWrongValueForType = 1411 - ERWarnDataTruncated = 1265 ERForbidSchemaChange = 1450 + ERWrongValue = 1525 ERDataOutOfRange = 1690 ERInvalidJSONText = 3140 ERInvalidJSONTextInParams = 3141 @@ -545,7 +546,6 @@ const ( ERInvalidCastToJSON = 3147 ERJSONValueTooBig = 3150 ERJSONDocumentTooDeep = 3157 - ERWrongValue = 1525 // max execution time exceeded ERQueryTimeout = 3024 diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index adfc52ef70b..24f4f30e9e4 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -21,7 +21,7 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace customer", "Stop writes on keyspace product, tables [Lead,Lead-1,customer,db_order_test]:", "/ Keyspace product, Shard 0 at Position", - "Wait for VReplication on stopped streams to catchup for upto 30s", + "Wait for VReplication on stopped streams to catchup for up to 30s", "Create reverse replication workflow p2c_reverse", "Create journal entries on source databases", "Enable writes on keyspace customer tables [Lead,Lead-1,customer,db_order_test]", @@ -60,7 +60,7 @@ var dryRunResultsSwitchWritesM2m3 = []string{ "Stop writes on keyspace merchant-type, tables [/.*]:", "/ Keyspace merchant-type, Shard -80 at Position", "/ Keyspace merchant-type, Shard 80- at Position", - "Wait for VReplication on stopped streams to catchup for upto 30s", + "Wait for VReplication on stopped streams to catchup for up to 30s", "Create reverse replication workflow m2m3_reverse", "Create journal entries on source databases", "Enable writes on keyspace merchant-type tables [/.*]", diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 3c9d4319f00..0ea23fafbca 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2180,22 +2180,28 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl if err != nil { return err } - s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflowName) + s += fmt.Sprintf("The following vreplication streams exist for workflow %s.%s:\n\n", target, workflowName) for ksShard := range res.ShardStatuses { statuses := res.ShardStatuses[ksShard].PrimaryReplicationStatuses for _, st := range statuses { - now := time.Now().Nanosecond() msg := "" - updateLag := int64(now) - st.TimeUpdated - if updateLag > 0*1e9 { - msg += " Vstream may not be running." - } - txLag := int64(now) - st.TransactionTimestamp - msg += fmt.Sprintf(" VStream Lag: %ds.", txLag/1e9) - if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0 - msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC)) + if st.State == "Error" { + msg += fmt.Sprintf(": %s.", st.Message) + } else if st.Pos == "" { + msg += ". VStream has not started." + } else { + now := time.Now().Nanosecond() + updateLag := int64(now) - st.TimeUpdated + if updateLag > 0*1e9 { + msg += ". VStream may not be running" + } + txLag := int64(now) - st.TransactionTimestamp + msg += fmt.Sprintf(". VStream Lag: %ds.", txLag/1e9) + if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0 + msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC)) + } } - s += fmt.Sprintf("id=%d on %s: Status: %s.%s\n", st.ID, ksShard, st.State, msg) + s += fmt.Sprintf("id=%d on %s: Status: %s%s\n", st.ID, ksShard, st.State, msg) } } wr.Logger().Printf("\n%s\n", s) @@ -2319,9 +2325,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl if err != nil { return err } - if copyProgress == nil { - wr.Logger().Printf("\nCopy Completed.\n") - } else { + if copyProgress != nil { wr.Logger().Printf("\nCopy Progress (approx):\n") var tables []string for table := range *copyProgress { @@ -2346,7 +2350,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl wr.Logger().Printf("\n%s\n", s) } return printDetails() - } if *dryRun { @@ -2390,7 +2393,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl for { select { case <-ctx.Done(): - errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String()) return case <-ticker.C: totalStreams, startedStreams, workflowErrors, err := wf.GetStreamCount() @@ -2419,9 +2421,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl return nil } wr.Logger().Printf("%d%% ... ", 100*progress.started/progress.total) + case <-timedCtx.Done(): + wr.Logger().Printf("\nThe workflow did not start within %s. The workflow may simply be slow to start or there may be an issue.\n", + (*timeout).String()) + wr.Logger().Printf("Check the status using the 'Workflow %s show' client command for details.\n", ksWorkflow) + return fmt.Errorf("timed out waiting for workflow to start") case err := <-errCh: wr.Logger().Error(err) - cancelTimedCtx() return err case wfErrs := <-wfErrCh: wr.Logger().Printf("Found problems with the streams created for this workflow:\n") diff --git a/go/vt/vtctl/vtctl_env_test.go b/go/vt/vtctl/vtctl_env_test.go new file mode 100644 index 00000000000..570088b9d13 --- /dev/null +++ b/go/vt/vtctl/vtctl_env_test.go @@ -0,0 +1,219 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtctl + +import ( + "context" + "fmt" + "sync" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" + "vitess.io/vitess/go/vt/vttablet/tmclient" + "vitess.io/vitess/go/vt/wrangler" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" +) + +type testVTCtlEnv struct { + wr *wrangler.Wrangler + topoServ *topo.Server + cell string + tabletType topodatapb.TabletType + tmc *testVTCtlTMClient + cmdlog *logutil.MemoryLogger + + mu sync.Mutex + tablets map[int]*testVTCtlTablet +} + +// vtctlEnv has to be a global for RegisterDialer to work. +var vtctlEnv *testVTCtlEnv + +func init() { + tabletconn.RegisterDialer("VTCtlTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + vtctlEnv.mu.Lock() + defer vtctlEnv.mu.Unlock() + if qs, ok := vtctlEnv.tablets[int(tablet.Alias.Uid)]; ok { + return qs, nil + } + return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) + }) +} + +//---------------------------------------------- +// testVTCtlEnv + +func newTestVTCtlEnv() *testVTCtlEnv { + tabletconntest.SetProtocol("go.vt.vtctl.vtctl_env_test", "VTCtlTest") + cellName := "cell1" + env := &testVTCtlEnv{ + tablets: make(map[int]*testVTCtlTablet), + topoServ: memorytopo.NewServer(cellName), + cell: cellName, + tabletType: topodatapb.TabletType_REPLICA, + tmc: newTestVTCtlTMClient(), + cmdlog: logutil.NewMemoryLogger(), + } + env.wr = wrangler.NewTestWrangler(env.cmdlog, env.topoServ, env.tmc) + return env +} + +func (env *testVTCtlEnv) close() { + env.mu.Lock() + defer env.mu.Unlock() + for _, t := range env.tablets { + env.topoServ.DeleteTablet(context.Background(), t.tablet.Alias) + } + env.tablets = nil + env.cmdlog.Clear() + env.tmc.clearResults() + env.topoServ.Close() + env.wr = nil +} + +func (env *testVTCtlEnv) addTablet(id int, keyspace, shard string, keyRange *topodatapb.KeyRange, tabletType topodatapb.TabletType) *testVTCtlTablet { + env.mu.Lock() + defer env.mu.Unlock() + ctx := context.Background() + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: keyRange, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + env.tablets[id] = newTestVTCtlTablet(tablet) + if err := env.topoServ.InitTablet(ctx, tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + panic(err) + } + if tabletType == topodatapb.TabletType_PRIMARY { + _, err := env.topoServ.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = tablet.Alias + return nil + }) + if err != nil { + panic(err) + } + emptySrvVSchema := &vschemapb.SrvVSchema{ + RoutingRules: &vschemapb.RoutingRules{}, + ShardRoutingRules: &vschemapb.ShardRoutingRules{}, + } + if err = env.topoServ.UpdateSrvVSchema(ctx, env.cell, emptySrvVSchema); err != nil { + panic(err) + } + } + return env.tablets[id] +} + +//---------------------------------------------- +// testVTCtlTablet + +type testVTCtlTablet struct { + queryservice.QueryService + tablet *topodatapb.Tablet +} + +func newTestVTCtlTablet(tablet *topodatapb.Tablet) *testVTCtlTablet { + return &testVTCtlTablet{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + } +} + +func (tvt *testVTCtlTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: tvt.tablet.Keyspace, + Shard: tvt.tablet.Shard, + TabletType: tvt.tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) +} + +//---------------------------------------------- +// testVTCtlTMClient + +type testVTCtlTMClient struct { + tmclient.TabletManagerClient + vrQueries map[int]map[string]*querypb.QueryResult + dbaQueries map[int]map[string]*querypb.QueryResult +} + +func newTestVTCtlTMClient() *testVTCtlTMClient { + return &testVTCtlTMClient{ + vrQueries: make(map[int]map[string]*querypb.QueryResult), + dbaQueries: make(map[int]map[string]*querypb.QueryResult), + } +} + +func (tmc *testVTCtlTMClient) setVRResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queries, ok := tmc.vrQueries[int(tablet.Alias.Uid)] + if !ok { + queries = make(map[string]*querypb.QueryResult) + tmc.vrQueries[int(tablet.Alias.Uid)] = queries + } + queries[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testVTCtlTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] + if !ok { + return nil, fmt.Errorf("query %q not found for VReplicationExec() on tablet %d", query, tablet.Alias.Uid) + } + return result, nil +} + +func (tmc *testVTCtlTMClient) setDBAResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queries, ok := tmc.dbaQueries[int(tablet.Alias.Uid)] + if !ok { + queries = make(map[string]*querypb.QueryResult) + tmc.dbaQueries[int(tablet.Alias.Uid)] = queries + } + queries[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testVTCtlTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) { + result, ok := tmc.dbaQueries[int(tablet.Alias.Uid)][string(req.Query)] + if !ok { + return nil, fmt.Errorf("query %q not found for ExecuteFetchAsDba() on tablet %d", req.Query, tablet.Alias.Uid) + } + return result, nil +} + +func (tmc *testVTCtlTMClient) clearResults() { + tmc.vrQueries = make(map[int]map[string]*querypb.QueryResult) + tmc.dbaQueries = make(map[int]map[string]*querypb.QueryResult) +} diff --git a/go/vt/vtctl/vtctl_test.go b/go/vt/vtctl/vtctl_test.go new file mode 100644 index 00000000000..9b085cac5e3 --- /dev/null +++ b/go/vt/vtctl/vtctl_test.go @@ -0,0 +1,257 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtctl + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/spf13/pflag" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/wrangler" +) + +// TestMoveTables tests the the MoveTables client command +// via the commandVRWorkflow() cmd handler. +// This currently only tests the Progress action (which is +// a parent of the Show action) but it can be used to test +// other actions as well. +func TestMoveTables(t *testing.T) { + vrID := 1 + shard := "0" + sourceKs := "sourceks" + targetKs := "targetks" + table := "customer" + wf := "testwf" + ksWf := fmt.Sprintf("%s.%s", targetKs, wf) + minTableSize := 16384 // a single 16KiB InnoDB page + ctx := context.Background() + env := newTestVTCtlEnv() + defer env.close() + source := env.addTablet(100, sourceKs, shard, &topodatapb.KeyRange{}, topodatapb.TabletType_PRIMARY) + target := env.addTablet(200, targetKs, shard, &topodatapb.KeyRange{}, topodatapb.TabletType_PRIMARY) + sourceCol := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"%s" filter:"select * from %s"}}`, + sourceKs, shard, table, table) + bls := &binlogdatapb.BinlogSource{ + Keyspace: sourceKs, + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }}, + }, + } + now := time.Now().UTC().Unix() + expectGlobalResults := func() { + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select id, source, message, cell, tablet_types, workflow_type, workflow_sub_type from _vt.vreplication where workflow='%s' and db_name='vt_%s'", + wf, targetKs), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types|workflow_type|workflow_sub_type", + "int64|varchar|varchar|varchar|varchar|int64|int64"), + fmt.Sprintf("%d|%s||%s|primary|%d|%d", + vrID, sourceCol, env.cell, binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowSubType_None), + ), + ) + } + + tests := []struct { + name string + workflowType wrangler.VReplicationWorkflowType + args []string + expectResults func() + want string + }{ + { + name: "NotStarted", + workflowType: wrangler.MoveTablesWorkflow, + args: []string{"Progress", ksWf}, + expectResults: func() { + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)", + vrID, vrID), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|lastpk", + "varchar|varbinary"), + fmt.Sprintf("%s|", table), + ), + ) + env.tmc.setDBAResults( + target.tablet, + fmt.Sprintf("select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d", + vrID), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name", + "varchar"), + table, + ), + ) + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags, workflow_type, workflow_sub_type from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + targetKs, wf), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|varchar|int64|int64"), + fmt.Sprintf("%d|%s|||0|Running|vt_%s|0|0|0|0||||%d|%d", + vrID, bls, sourceKs, binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowSubType_None), + ), + ) + env.tmc.setDBAResults( + target.tablet, + fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')", + targetKs, table), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|table_rows|data_length", + "varchar|int64|int64"), + fmt.Sprintf("%s|0|%d", table, minTableSize), + ), + ) + env.tmc.setDBAResults( + source.tablet, + fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')", + sourceKs, table), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|table_rows|data_length", + "varchar|int64|int64"), + fmt.Sprintf("%s|10|%d", table, minTableSize), + ), + ) + }, + want: fmt.Sprintf("\nCopy Progress (approx):\n\n\ncustomer: rows copied 0/10 (0%%), size copied 16384/16384 (100%%)\n\n\n\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Copying. VStream has not started.\n\n\n", + ksWf, vrID, shard, env.cell, target.tablet.Alias.Uid), + }, + { + name: "Error", + workflowType: wrangler.MoveTablesWorkflow, + args: []string{"Progress", ksWf}, + expectResults: func() { + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)", + vrID, vrID), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|lastpk", + "varchar|varbinary"), + fmt.Sprintf("%s|", table), + ), + ) + env.tmc.setDBAResults( + target.tablet, + fmt.Sprintf("select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d", + vrID), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name", + "varchar"), + table, + ), + ) + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags, workflow_type, workflow_sub_type from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + targetKs, wf), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|varchar|int64|int64"), + fmt.Sprintf("%d|%s|||0|Error|vt_%s|0|0|0|0||Duplicate entry '6' for key 'customer.PRIMARY' (errno 1062) (sqlstate 23000) during query: insert into customer(customer_id,email) values (6,'mlord@planetscale.com')||%d|%d", + vrID, bls, sourceKs, binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowSubType_None), + ), + ) + env.tmc.setDBAResults( + target.tablet, + fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')", + targetKs, table), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|table_rows|data_length", + "varchar|int64|int64"), + fmt.Sprintf("%s|5|%d", table, minTableSize), + ), + ) + env.tmc.setDBAResults( + source.tablet, + fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')", + sourceKs, table), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table_name|table_rows|data_length", + "varchar|int64|int64"), + fmt.Sprintf("%s|10|%d", table, minTableSize), + ), + ) + }, + want: fmt.Sprintf("\nCopy Progress (approx):\n\n\ncustomer: rows copied 5/10 (50%%), size copied 16384/16384 (100%%)\n\n\n\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Error: Duplicate entry '6' for key 'customer.PRIMARY' (errno 1062) (sqlstate 23000) during query: insert into customer(customer_id,email) values (6,'mlord@planetscale.com').\n\n\n", + ksWf, vrID, shard, env.cell, target.tablet.Alias.Uid), + }, + { + name: "Running", + workflowType: wrangler.MoveTablesWorkflow, + args: []string{"Progress", ksWf}, + expectResults: func() { + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)", + vrID, vrID), + &sqltypes.Result{}, + ) + env.tmc.setDBAResults( + target.tablet, + fmt.Sprintf("select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d", + vrID), + &sqltypes.Result{}, + ) + env.tmc.setVRResults( + target.tablet, + fmt.Sprintf("select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags, workflow_type, workflow_sub_type from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + targetKs, wf), + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|varchar|int64|int64"), + fmt.Sprintf("%d|%s|MySQL56/4ec30b1e-8ee2-11ed-a1eb-0242ac120002:1-15||0|Running|vt_%s|%d|%d|%d|0||||%d|%d", + vrID, bls, sourceKs, now, now, now, binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowSubType_None), + ), + ) + }, + want: fmt.Sprintf("/\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Running. VStream Lag: .* Tx time: .*", + ksWf, vrID, shard, env.cell, target.tablet.Alias.Uid), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + subFlags := pflag.NewFlagSet("test", pflag.ContinueOnError) + expectGlobalResults() + tt.expectResults() + err := commandVRWorkflow(ctx, env.wr, subFlags, tt.args, tt.workflowType) + require.NoError(t, err) + if strings.HasPrefix(tt.want, "/") { + require.Regexp(t, tt.want[1:], env.cmdlog.String()) + } else { + require.Equal(t, tt.want, env.cmdlog.String()) + } + env.cmdlog.Clear() + env.tmc.clearResults() + }) + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 7425427ac53..29caca5c3e3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -40,6 +40,13 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +const ( + // How many times to retry tablet selection before we + // give up and return an error message that the user + // can see and act upon if needed. + tabletPickerRetries = 5 +) + // controller is created by Engine. Members are initialized upfront. // There is no mutex within a controller becaust its members are // either read-only or self-synchronized. @@ -206,7 +213,9 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { var tablet *topodatapb.Tablet if ct.source.GetExternalMysql() == "" { log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) - tablet, err = ct.tabletPicker.PickForStreaming(ctx) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err = ct.tabletPicker.PickForStreaming(tpCtx) if err != nil { select { case <-ctx.Done(): diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index 2a689c9b2ec..0eaa6ea8b50 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -141,28 +141,63 @@ func isUnrecoverableError(err error) bool { } switch sqlErr.Num { case - mysql.ERWarnDataOutOfRange, + // in case-insensitive alphabetical order + mysql.ERAccessDeniedError, + mysql.ERBadFieldError, + mysql.ERBadNullError, + mysql.ERCantDropFieldOrKey, + mysql.ERDataOutOfRange, mysql.ERDataTooLong, - mysql.ERWarnDataTruncated, - mysql.ERTruncatedWrongValue, - mysql.ERTruncatedWrongValueForField, + mysql.ERDBAccessDenied, + mysql.ERDupEntry, + mysql.ERDupFieldName, + mysql.ERDupKeyName, + mysql.ERDupUnique, + mysql.ERFeatureDisabled, + mysql.ERFunctionNotDefined, mysql.ERIllegalValueForType, - mysql.ErrWrongValueForType, + mysql.ERInvalidCastToJSON, + mysql.ERInvalidJSONBinaryData, + mysql.ERInvalidJSONCharset, + mysql.ERInvalidJSONText, + mysql.ERInvalidJSONTextInParams, + mysql.ERJSONDocumentTooDeep, + mysql.ERJSONValueTooBig, + mysql.ERNoDefault, + mysql.ERNoDefaultForField, + mysql.ERNonUniq, + mysql.ERNonUpdateableTable, + mysql.ERNoSuchTable, + mysql.ERNotAllowedCommand, + mysql.ERNotSupportedYet, + mysql.EROptionPreventsStatement, + mysql.ERParseError, + mysql.ERPrimaryCantHaveNull, mysql.ErrCantCreateGeometryObject, mysql.ErrGISDataWrongEndianess, + mysql.ErrNonPositiveRadius, mysql.ErrNotImplementedForCartesianSRS, mysql.ErrNotImplementedForProjectedSRS, - mysql.ErrNonPositiveRadius, - mysql.ERBadNullError, - mysql.ERDupEntry, - mysql.ERNoDefaultForField, - mysql.ERInvalidJSONText, - mysql.ERInvalidJSONTextInParams, - mysql.ERInvalidJSONBinaryData, - mysql.ERInvalidJSONCharset, - mysql.ERInvalidCastToJSON, - mysql.ERJSONValueTooBig, - mysql.ERJSONDocumentTooDeep: + mysql.ErrWrongValueForType, + mysql.ERSPDoesNotExist, + mysql.ERSpecifiedAccessDenied, + mysql.ERSyntaxError, + mysql.ERTooBigRowSize, + mysql.ERTooBigSet, + mysql.ERTruncatedWrongValue, + mysql.ERTruncatedWrongValueForField, + mysql.ERUnknownCollation, + mysql.ERUnknownProcedure, + mysql.ERUnknownTable, + mysql.ERWarnDataOutOfRange, + mysql.ERWarnDataTruncated, + mysql.ERWrongFKDef, + mysql.ERWrongFieldSpec, + mysql.ERWrongParamCountToProcedure, + mysql.ERWrongParametersToProcedure, + mysql.ERWrongUsage, + mysql.ERWrongValue, + mysql.ERWrongValueCountOnRow: log.Errorf("Got unrecoverable error: %v", sqlErr) return true } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index e55341d8b65..7027bc391b5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1653,6 +1653,7 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, qh.Expect( "alter table t1 add column val2 varchar(128)", "/update _vt.vreplication set message='Duplicate", + "/update _vt.vreplication set state='Error', message='Duplicate", )) cancel() diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index 48033aa791d..4ef0ea1d026 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -189,7 +189,7 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *workflow.Strea } func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error { - dr.drLog.Log(fmt.Sprintf("Wait for VReplication on stopped streams to catchup for upto %v", filteredReplicationWaitTime)) + dr.drLog.Log(fmt.Sprintf("Wait for VReplication on stopped streams to catchup for up to %v", filteredReplicationWaitTime)) return nil } diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 3d7257a6cf9..97ae25d9c98 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -992,7 +992,7 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) { "Lock keyspace ks2", "Stop writes on keyspace ks1, tables [t1,t2]:", "\tKeyspace ks1, Shard 0 at Position MariaDB/5-456-892", - "Wait for VReplication on stopped streams to catchup for upto 1s", + "Wait for VReplication on stopped streams to catchup for up to 1s", "Create reverse replication workflow test_reverse", "Create journal entries on source databases", "Enable writes on keyspace ks2 tables [t1,t2]",