diff --git a/go/mysql/constants.go b/go/mysql/constants.go index 415af39e761..96ee0fad8e3 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -672,6 +672,47 @@ func IsConnLostDuringQuery(err error) bool { return false } +// IsEphemeralError returns true if the error is ephemeral and the caller should +// retry if possible. Note: non-SQL errors are always treated as ephemeral. +func IsEphemeralError(err error) bool { + if sqlErr, ok := err.(*SQLError); ok { + en := sqlErr.Number() + switch en { + case + CRConnectionError, + CRConnHostError, + CRMalformedPacket, + CRNamedPipeStateError, + CRServerLost, + CRSSLConnectionError, + ERCantCreateThread, + ERDiskFull, + ERForcingClose, + ERGotSignal, + ERHostIsBlocked, + ERLockTableFull, + ERInnodbReadOnly, + ERInternalError, + ERLockDeadlock, + ERLockWaitTimeout, + EROutOfMemory, + EROutOfResources, + EROutOfSortMemory, + ERQueryInterrupted, + ERServerIsntAvailable, + ERServerShutdown, + ERTooManyUserConnections, + ERUnknownError, + ERUserLimitReached: + return true + default: + return false + } + } + // If it's not an sqlError then we assume it's ephemeral + return true +} + // IsTooManyConnectionsErr returns true if the error is due to too many connections. func IsTooManyConnectionsErr(err error) bool { if sqlErr, ok := err.(*SQLError); ok { diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index a716183017d..c32d62f1eb1 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -514,10 +514,11 @@ func GetShardReplicationPositions(t *testing.T, clusterInstance *cluster.LocalPr } func WaitForReplicationToStart(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName, shardName string, tabletCnt int, doPrint bool) { - tck := time.NewTicker(500 * time.Millisecond) + tkr := time.NewTicker(500 * time.Millisecond) + defer tkr.Stop() for { select { - case <-tck.C: + case <-tkr.C: strArray := GetShardReplicationPositions(t, clusterInstance, KeyspaceName, shardName, true) if len(strArray) == tabletCnt && strings.Contains(strArray[0], "primary") { // primary first return @@ -575,13 +576,13 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces // WaitForReplicationPosition waits for tablet B to catch up to the replication position of tablet A. func WaitForReplicationPosition(t *testing.T, tabletA *cluster.Vttablet, tabletB *cluster.Vttablet) error { posA, _ := cluster.GetPrimaryPosition(t, *tabletA, Hostname) - timeout := time.Now().Add(5 * time.Second) + timeout := time.Now().Add(replicationWaitTimeout) for time.Now().Before(timeout) { posB, _ := cluster.GetPrimaryPosition(t, *tabletB, Hostname) if positionAtLeast(t, tabletB, posA, posB) { return nil } - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) } return fmt.Errorf("failed to catch up on replication position") } diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 4618373a60a..691cba85509 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -31,47 +31,66 @@ type testCase struct { tables string workflow string tabletBaseID int - resume bool // test resume functionality with this workflow - resumeInsert string // if testing resume, what new rows should be diff'd - testCLIErrors bool // test CLI errors against this workflow + autoRetryError bool // if true, test auto retry on error against this workflow + // If testing auto retry on error, what new rows should be diff'd. These rows must have a PK > all initial rows. + retryInsert string + resume bool // test resume functionality with this workflow + // If testing resume, what new rows should be diff'd. These rows must have a PK > all initial rows and retry rows. + resumeInsert string + testCLIErrors bool // test CLI errors against this workflow } +const ( + sqlSimulateError = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'error', vdt.state = 'error', vd.completed_at = NULL, + vd.last_error = 'vttablet: rpc error: code = Unknown desc = (errno 1213) (sqlstate 40001): Deadlock found when trying to get lock; try restarting transaction' + where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id` + sqlAnalyzeTable = `analyze table %s` +) + var testCases = []*testCase{ { - name: "MoveTables/unsharded to two shards", - workflow: "p1c2", - typ: "MoveTables", - sourceKs: "product", - targetKs: "customer", - sourceShards: "0", - targetShards: "-80,80-", - tabletBaseID: 200, - tables: "customer,Lead,Lead-1", - resume: true, - resumeInsert: `insert into customer(cid, name, typ) values(12345678, 'Testy McTester', 'soho')`, - testCLIErrors: true, // test for errors in the simplest workflow + name: "MoveTables/unsharded to two shards", + workflow: "p1c2", + typ: "MoveTables", + sourceKs: "product", + targetKs: "customer", + sourceShards: "0", + targetShards: "-80,80-", + tabletBaseID: 200, + tables: "customer,Lead,Lead-1", + autoRetryError: true, + retryInsert: `insert into customer(cid, name, typ) values(91234, 'Testy McTester', 'soho')`, + resume: true, + resumeInsert: `insert into customer(cid, name, typ) values(92234, 'Testy McTester (redux)', 'enterprise')`, + testCLIErrors: true, // test for errors in the simplest workflow }, { - name: "Reshard Merge/split 2 to 3", - workflow: "c2c3", - typ: "Reshard", - sourceKs: "customer", - targetKs: "customer", - sourceShards: "-80,80-", - targetShards: "-40,40-a0,a0-", - tabletBaseID: 400, - resume: true, - resumeInsert: `insert into customer(cid, name, typ) values(987654321, 'Testy McTester Jr.', 'enterprise'), (987654322, 'Testy McTester III', 'enterprise')`, + name: "Reshard Merge/split 2 to 3", + workflow: "c2c3", + typ: "Reshard", + sourceKs: "customer", + targetKs: "customer", + sourceShards: "-80,80-", + targetShards: "-40,40-a0,a0-", + tabletBaseID: 400, + autoRetryError: true, + retryInsert: `insert into customer(cid, name, typ) values(93234, 'Testy McTester Jr', 'enterprise'), (94234, 'Testy McTester II', 'enterprise')`, + resume: true, + resumeInsert: `insert into customer(cid, name, typ) values(95234, 'Testy McTester III', 'enterprise')`, }, { - name: "Reshard/merge 3 to 1", - workflow: "c3c1", - typ: "Reshard", - sourceKs: "customer", - targetKs: "customer", - sourceShards: "-40,40-a0,a0-", - targetShards: "0", - tabletBaseID: 700, + name: "Reshard/merge 3 to 1", + workflow: "c3c1", + typ: "Reshard", + sourceKs: "customer", + targetKs: "customer", + sourceShards: "-40,40-a0,a0-", + targetShards: "0", + tabletBaseID: 700, + autoRetryError: true, + retryInsert: `insert into customer(cid, name, typ) values(96234, 'Testy McTester IV', 'enterprise')`, + resume: true, + resumeInsert: `insert into customer(cid, name, typ) values(97234, 'Testy McTester V', 'enterprise'), (98234, 'Testy McTester VI', 'enterprise')`, }, } @@ -105,12 +124,13 @@ func TestVDiff2(t *testing.T) { verifyClusterHealth(t, vc) insertInitialData(t) - // Insert null and empty enum values for testing vdiff comparisons for those values. // If we add this to the initial data list, the counts in several other tests will need to change query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')` execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) + generateMoreCustomers(t, sourceKs, 100) + _, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts) require.NoError(t, err) for _, shard := range targetShards { @@ -149,16 +169,17 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) for _, shard := range arrTargetShards { tab := vc.getPrimaryTablet(t, tc.targetKs, shard) catchup(t, tab, tc.workflow, tc.typ) + updateTableStats(t, tab, tc.tables) // need to do this in order to test progress reports } + vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil) + if tc.autoRetryError { + testAutoRetryError(t, tc, cells[0].Name) + } + if tc.resume { - expectedRows := int64(0) - if tc.resumeInsert != "" { - res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.resumeInsert) - expectedRows = int64(res.RowsAffected) - } - vdiff2Resume(t, tc.targetKs, tc.workflow, cells[0].Name, expectedRows) + testResume(t, tc, cells[0].Name) } // This is done here so that we have a valid workflow to test the commands against @@ -224,3 +245,69 @@ func testNoOrphanedData(t *testing.T, keyspace, workflow string, shards []string } }) } + +func testResume(t *testing.T, tc *testCase, cells string) { + t.Run("Resume", func(t *testing.T) { + ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) + + // confirm the last VDiff is in the expected completed state + uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false) + jsonOutput := getVDiffInfo(output) + require.Equal(t, "completed", jsonOutput.State) + // save the number of rows compared in previous runs + rowsCompared := jsonOutput.RowsCompared + ogTime := time.Now() // the completed_at should be later than this after resuming + + expectedNewRows := int64(0) + if tc.resumeInsert != "" { + res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.resumeInsert) + expectedNewRows = int64(res.RowsAffected) + } + expectedRows := rowsCompared + expectedNewRows + + // confirm that the VDiff was resumed, able to complete, and we compared the + // expected number of rows in total (original run and resume) + uuid, _ = performVDiff2Action(t, ksWorkflow, cells, "resume", uuid, false) + info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, ogTime) + require.False(t, info.HasMismatch) + require.Equal(t, expectedRows, info.RowsCompared) + }) +} + +func testAutoRetryError(t *testing.T, tc *testCase, cells string) { + t.Run("Auto retry on error", func(t *testing.T) { + ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) + + // confirm the last VDiff is in the expected completed state + uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false) + jsonOutput := getVDiffInfo(output) + require.Equal(t, "completed", jsonOutput.State) + // save the number of rows compared in the first run + rowsCompared := jsonOutput.RowsCompared + ogTime := time.Now() // the completed_at should be later than this upon retry + + // create new data since original VDiff run -- if requested -- to confirm that the rows + // compared is cumulative + expectedNewRows := int64(0) + if tc.retryInsert != "" { + res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.retryInsert) + expectedNewRows = int64(res.RowsAffected) + } + expectedRows := rowsCompared + expectedNewRows + + // update the VDiff to simulate an ephemeral error having occurred + for _, shard := range strings.Split(tc.targetShards, ",") { + tab := vc.getPrimaryTablet(t, tc.targetKs, shard) + res, err := tab.QueryTabletWithDB(fmt.Sprintf(sqlSimulateError, encodeString(uuid)), "vt_"+tc.targetKs) + require.NoError(t, err) + // should have updated the vdiff record and at least one vdiff_table record + require.GreaterOrEqual(t, int(res.RowsAffected), 2) + } + + // confirm that the VDiff was retried, able to complete, and we compared the expected + // number of rows in total (original run and retry) + info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, ogTime) + require.False(t, info.HasMismatch) + require.Equal(t, expectedRows, info.RowsCompared) + }) +} diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 7005e645fc2..ab25f0c6e24 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -26,14 +26,16 @@ import ( "github.com/buger/jsonparser" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" + vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/wrangler" ) const ( - vdiffTimeout = time.Second * 60 - tsFormat = "2006-01-02 15:04:05" + vdiffTimeout = time.Second * 90 // we can leverage auto retry on error with this longer-than-usual timeout ) var ( @@ -46,7 +48,7 @@ func vdiff(t *testing.T, keyspace, workflow, cells string, v1, v2 bool, wantV2Re doVDiff1(t, ksWorkflow, cells) } if v2 { - vdiff2(t, keyspace, workflow, cells, wantV2Result) + doVdiff2(t, keyspace, workflow, cells, wantV2Result) } } @@ -84,6 +86,8 @@ func doVDiff1(t *testing.T, ksWorkflow, cells string) { func waitForVDiff2ToComplete(t *testing.T, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo { var info *vdiffInfo + first := true + previousProgress := vdiff2.ProgressReport{} ch := make(chan bool) go func() { for { @@ -93,28 +97,36 @@ func waitForVDiff2ToComplete(t *testing.T, ksWorkflow, cells, uuid string, compl if info.State == "completed" { if !completedAtMin.IsZero() { ca := info.CompletedAt - completedAt, _ := time.Parse(tsFormat, ca) + completedAt, _ := time.Parse(vdiff2.TimestampFormat, ca) if !completedAt.After(completedAtMin) { continue } } ch <- true return - } else if info.State == "error" { - ch <- false - return + } else if info.State == "started" { // test the progress report + // The ETA should always be in the future -- when we're able to estimate + // it -- and the progress percentage should only increase. + // The timstamp format allows us to compare them lexicographically. + // We don't test that the ETA always increases as it can decrease based on how + // quickly we're doing work. + if info.Progress.ETA != "" { + require.GreaterOrEqual(t, info.Progress.ETA, time.Now().Format(vdiff2.TimestampFormat)) + } + if !first { + require.GreaterOrEqual(t, info.Progress.Percentage, previousProgress.Percentage) + } + previousProgress.Percentage = info.Progress.Percentage + first = false } } }() select { - case good := <-ch: - if !good { - require.FailNow(t, "VDiff encountered an error") - } + case <-ch: return info case <-time.After(vdiffTimeout): - require.FailNowf(t, "VDiff never completed: %s", uuid) + require.FailNow(t, fmt.Sprintf("VDiff never completed for UUID %s", uuid)) return nil } } @@ -125,16 +137,14 @@ type expectedVDiff2Result struct { hasMismatch bool } -func vdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result) { +func doVdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result) { ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow) t.Run(fmt.Sprintf("vdiff2 %s", ksWorkflow), func(t *testing.T) { - uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "create", "", false) + uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "create", "", false, "--auto-retry") info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, time.Time{}) require.Equal(t, workflow, info.Workflow) require.Equal(t, keyspace, info.Keyspace) - // I'm not sure if we always have rows in every table - //require.Greater(t, info.RowsCompared, int64(0)) if want != nil { require.Equal(t, want.state, info.State) require.Equal(t, strings.Join(want.shards, ","), info.Shards) @@ -150,19 +160,6 @@ func vdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2 }) } -func vdiff2Resume(t *testing.T, keyspace, workflow, cells string, expectedRows int64) { - ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow) - startTs := time.Now() - uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false) - uuid, _ = performVDiff2Action(t, ksWorkflow, cells, "resume", uuid, false) - info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, startTs) - require.False(t, info.HasMismatch) - completedTs, err := time.Parse(tsFormat, info.CompletedAt) - require.NoError(t, err) - require.Greater(t, completedTs, startTs) - require.Equal(t, expectedRows, info.RowsCompared) -} - func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) { var err error if len(extraFlags) > 0 { @@ -190,6 +187,7 @@ type vdiffInfo struct { StartedAt string CompletedAt string HasMismatch bool + Progress vdiff2.ProgressReport } func getVDiffInfo(jsonStr string) *vdiffInfo { @@ -203,7 +201,8 @@ func getVDiffInfo(jsonStr string) *vdiffInfo { info.StartedAt, _ = jsonparser.GetString(json, "StartedAt") info.CompletedAt, _ = jsonparser.GetString(json, "CompletedAt") info.HasMismatch, _ = jsonparser.GetBoolean(json, "HasMismatch") - + info.Progress.Percentage, _ = jsonparser.GetFloat(json, "Progress", "Percentage") + info.Progress.ETA, _ = jsonparser.GetString(json, "Progress", "ETA") return &info } @@ -212,3 +211,46 @@ func encodeString(in string) string { sqltypes.NewVarChar(in).EncodeSQL(&buf) return buf.String() } + +// updateTableStats runs ANALYZE TABLE on each table involved in the workflow. +// You should execute this if you leverage table information from e.g. +// information_schema.tables in your test. +func updateTableStats(t *testing.T, tablet *cluster.VttabletProcess, tables string) { + dbName := "vt_" + tablet.Keyspace + tableList := strings.Split(strings.TrimSpace(tables), ",") + if len(tableList) == 0 { + // we need to get all of the tables in the keyspace + res, err := tablet.QueryTabletWithDB("show tables", dbName) + require.NoError(t, err) + for _, row := range res.Rows { + tableList = append(tableList, row[0].String()) + } + } + for _, table := range tableList { + table = strings.TrimSpace(table) + if table != "" { + res, err := tablet.QueryTabletWithDB(fmt.Sprintf(sqlAnalyzeTable, sqlescape.EscapeID(table)), dbName) + require.NoError(t, err) + require.Equal(t, 1, len(res.Rows)) + } + } +} + +// generateMoreCustomers creates additional test data for better tests +// when needed. +func generateMoreCustomers(t *testing.T, keyspace string, numCustomers int64) { + log.Infof("Generating more test data with an additional %d customers", numCustomers) + res := execVtgateQuery(t, vtgateConn, keyspace, "select max(cid) from customer") + startingID, _ := res.Rows[0][0].ToInt64() + insert := strings.Builder{} + insert.WriteString("insert into customer(cid, name, typ) values ") + i := int64(0) + for i < numCustomers { + i++ + insert.WriteString(fmt.Sprintf("(%d, 'Testy (Bot) McTester', 'soho')", startingID+i)) + if i != numCustomers { + insert.WriteString(", ") + } + } + execVtgateQuery(t, vtgateConn, keyspace, insert.String()) +} diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go index 6cd4db8e2b4..6eb493c410b 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go @@ -5078,7 +5078,7 @@ type VDiffCoreOptions struct { unknownFields protoimpl.UnknownFields Tables string `protobuf:"bytes,1,opt,name=tables,proto3" json:"tables,omitempty"` - Resumable bool `protobuf:"varint,2,opt,name=resumable,proto3" json:"resumable,omitempty"` + AutoRetry bool `protobuf:"varint,2,opt,name=auto_retry,json=autoRetry,proto3" json:"auto_retry,omitempty"` MaxRows int64 `protobuf:"varint,3,opt,name=max_rows,json=maxRows,proto3" json:"max_rows,omitempty"` Checksum bool `protobuf:"varint,4,opt,name=checksum,proto3" json:"checksum,omitempty"` SamplePct int64 `protobuf:"varint,5,opt,name=sample_pct,json=samplePct,proto3" json:"sample_pct,omitempty"` @@ -5125,9 +5125,9 @@ func (x *VDiffCoreOptions) GetTables() string { return "" } -func (x *VDiffCoreOptions) GetResumable() bool { +func (x *VDiffCoreOptions) GetAutoRetry() bool { if x != nil { - return x.Resumable + return x.AutoRetry } return false } @@ -5736,42 +5736,42 @@ var file_tabletmanagerdata_proto_rawDesc = []byte{ 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x64, 0x65, 0x62, 0x75, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, - 0x22, 0x81, 0x02, 0x0a, 0x10, 0x56, 0x44, 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, + 0x22, 0x82, 0x02, 0x0a, 0x10, 0x56, 0x44, 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1c, 0x0a, - 0x09, 0x72, 0x65, 0x73, 0x75, 0x6d, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x09, 0x72, 0x65, 0x73, 0x75, 0x6d, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6d, - 0x61, 0x78, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x6d, - 0x61, 0x78, 0x52, 0x6f, 0x77, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, - 0x75, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, - 0x75, 0x6d, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5f, 0x70, 0x63, 0x74, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x50, 0x63, - 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x73, 0x65, 0x63, - 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x74, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x38, 0x0a, 0x19, 0x6d, 0x61, - 0x78, 0x5f, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x74, 0x6f, 0x5f, - 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, 0x6d, - 0x61, 0x78, 0x45, 0x78, 0x74, 0x72, 0x61, 0x52, 0x6f, 0x77, 0x73, 0x54, 0x6f, 0x43, 0x6f, 0x6d, - 0x70, 0x61, 0x72, 0x65, 0x22, 0xf2, 0x01, 0x0a, 0x0c, 0x56, 0x44, 0x69, 0x66, 0x66, 0x4f, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x5f, - 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, - 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, - 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x50, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x0c, 0x63, 0x6f, 0x72, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, - 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0b, - 0x63, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, 0x72, - 0x65, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, - 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x70, - 0x6f, 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x72, 0x65, 0x70, 0x6f, - 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x30, 0x5a, 0x2e, 0x76, 0x69, 0x74, - 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, - 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, - 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1d, 0x0a, + 0x0a, 0x61, 0x75, 0x74, 0x6f, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x09, 0x61, 0x75, 0x74, 0x6f, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x19, 0x0a, 0x08, + 0x6d, 0x61, 0x78, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, + 0x6d, 0x61, 0x78, 0x52, 0x6f, 0x77, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, + 0x73, 0x75, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, + 0x73, 0x75, 0x6d, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5f, 0x70, 0x63, + 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x50, + 0x63, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x73, 0x65, + 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x38, 0x0a, 0x19, 0x6d, + 0x61, 0x78, 0x5f, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x74, 0x6f, + 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, + 0x6d, 0x61, 0x78, 0x45, 0x78, 0x74, 0x72, 0x61, 0x52, 0x6f, 0x77, 0x73, 0x54, 0x6f, 0x43, 0x6f, + 0x6d, 0x70, 0x61, 0x72, 0x65, 0x22, 0xf2, 0x01, 0x0a, 0x0c, 0x56, 0x44, 0x69, 0x66, 0x66, 0x4f, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, + 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, + 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, + 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x50, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x0c, 0x63, 0x6f, 0x72, 0x65, 0x5f, 0x6f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x61, 0x62, + 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, + 0x44, 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x0b, 0x63, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, + 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, + 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, + 0x70, 0x6f, 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x72, 0x65, 0x70, + 0x6f, 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x30, 0x5a, 0x2e, 0x76, 0x69, + 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, + 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go index 9ae708f0989..76276759e05 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go @@ -4552,9 +4552,9 @@ func (m *VDiffCoreOptions) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i-- dAtA[i] = 0x18 } - if m.Resumable { + if m.AutoRetry { i-- - if m.Resumable { + if m.AutoRetry { dAtA[i] = 1 } else { dAtA[i] = 0 @@ -6407,7 +6407,7 @@ func (m *VDiffCoreOptions) SizeVT() (n int) { if l > 0 { n += 1 + l + sov(uint64(l)) } - if m.Resumable { + if m.AutoRetry { n += 2 } if m.MaxRows != 0 { @@ -15906,7 +15906,7 @@ func (m *VDiffCoreOptions) UnmarshalVT(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Resumable", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field AutoRetry", wireType) } var v int for shift := uint(0); ; shift += 7 { @@ -15923,7 +15923,7 @@ func (m *VDiffCoreOptions) UnmarshalVT(dAtA []byte) error { break } } - m.Resumable = bool(v != 0) + m.AutoRetry = bool(v != 0) case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field MaxRows", wireType) diff --git a/go/vt/vtctl/vdiff2.go b/go/vt/vtctl/vdiff2.go index e4fa3b5b25a..cc3ed29a178 100644 --- a/go/vt/vtctl/vdiff2.go +++ b/go/vt/vtctl/vdiff2.go @@ -58,7 +58,7 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl subFlags.StringVar(&format, "format", "text", "Format of report") // "json" or "text" maxExtraRowsToCompare := subFlags.Int64("max_extra_rows_to_compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.") - resumable := subFlags.Bool("resumable", false, "Should this vdiff retry in case of recoverable errors, not yet implemented") + autoRetry := subFlags.Bool("auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors") checksum := subFlags.Bool("checksum", false, "Use row-level checksums to compare, not yet implemented") samplePct := subFlags.Int64("sample_pct", 100, "How many rows to sample, not yet implemented") verbose := subFlags.Bool("verbose", false, "Show verbose vdiff output in summaries") @@ -106,7 +106,7 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl }, CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ Tables: *tables, - Resumable: *resumable, + AutoRetry: *autoRetry, MaxRows: *maxRows, Checksum: *checksum, SamplePct: *samplePct, @@ -210,25 +210,33 @@ type vdiffSummary struct { CompletedAt string `json:"CompletedAt,omitempty"` TableSummaryMap map[string]vdiffTableSummary `json:"TableSummary,omitempty"` Reports map[string]map[string]vdiff.DiffReport `json:"Reports,omitempty"` + Errors map[string]string `json:"Errors,omitempty"` + Progress *vdiff.ProgressReport `json:"Progress,omitempty"` } const ( summaryTextTemplate = ` VDiff Summary for {{.Keyspace}}.{{.Workflow}} ({{.UUID}}) State: {{.State}} +{{if .Errors}} +{{- range $shard, $error := .Errors}} + Error: (shard {{$shard}}) {{$error}} +{{- end}} +{{end}} RowsCompared: {{.RowsCompared}} HasMismatch: {{.HasMismatch}} StartedAt: {{.StartedAt}} -CompletedAt: {{.CompletedAt}} -{{ range $table := .TableSummaryMap}} +{{if (eq .State "started")}}Progress: {{printf "%.2f" .Progress.Percentage}}%%{{if .Progress.ETA}}, ETA: {{.Progress.ETA}}{{end}}{{end}} +{{if .CompletedAt}}CompletedAt: {{.CompletedAt}}{{end}} +{{range $table := .TableSummaryMap}} Table {{$table.TableName}}: State: {{$table.State}} ProcessedRows: {{$table.RowsCompared}} MatchingRows: {{$table.MatchingRows}} -{{ if $table.MismatchedRows}} MismatchedRows: {{$table.MismatchedRows}}{{ end }} -{{ if $table.ExtraRowsSource}} ExtraRowsSource: {{$table.ExtraRowsSource}}{{ end }} -{{ if $table.ExtraRowsTarget}} ExtraRowsTarget: {{$table.ExtraRowsTarget}}{{ end }} -{{ end }} +{{if $table.MismatchedRows}} MismatchedRows: {{$table.MismatchedRows}}{{end}} +{{if $table.ExtraRowsSource}} ExtraRowsSource: {{$table.ExtraRowsSource}}{{end}} +{{if $table.ExtraRowsTarget}} ExtraRowsTarget: {{$table.ExtraRowsTarget}}{{end}} +{{end}} Use "--format=json" for more detailed output. ` @@ -403,13 +411,27 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st HasMismatch: false, Shards: "", Reports: make(map[string]map[string]vdiff.DiffReport), + Errors: make(map[string]string), + Progress: nil, } var tableSummaryMap map[string]vdiffTableSummary var reports map[string]map[string]vdiff.DiffReport - first := true + // Keep a tally of the states across all tables and shards + tableStateCounts := map[vdiff.VDiffState]int{ + vdiff.UnknownState: 0, + vdiff.PendingState: 0, + vdiff.StartedState: 0, + vdiff.ErrorState: 0, + vdiff.CompletedState: 0, + } + // Keep a tally of the shards that have been marked as completed + completedShards := 0 + // Keep a tally of the approximate total rows to process as we'll use this for our progress report + totalRowsToCompare := int64(0) var shards []string for shard, resp := range output.Responses { + first := true if resp != nil && resp.Output != nil { shards = append(shards, shard) qr := sqltypes.Proto3ToResult(resp.Output) @@ -431,10 +453,20 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st if ca := row.AsString("completed_at", ""); summary.CompletedAt == "" || ca > summary.CompletedAt { summary.CompletedAt = ca } + // If we had an error on the shard, then let's add that to the summary. + if le := row.AsString("last_error", ""); le != "" { + summary.Errors[shard] = le + } + // Keep track of how many shards are marked as completed. We check this combined + // with the shard.table states to determine the VDiff summary state. + if vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", ""))) == vdiff.CompletedState { + completedShards++ + } } { // Global VDiff summary updates that take into account the per table details per shard summary.RowsCompared += row.AsInt64("rows_compared", 0) + totalRowsToCompare += row.AsInt64("table_rows", 0) // If we had a mismatch on any table on any shard then the global VDiff summary does too if mm, _ := row.ToBool("has_mismatch"); mm { @@ -455,6 +487,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st ts := tableSummaryMap[table] // This is the shard level VDiff table state sts := vdiff.VDiffState(strings.ToLower(row.AsString("table_state", ""))) + tableStateCounts[sts]++ // The error state must be sticky, and we should not override any other // known state with completed. @@ -468,7 +501,6 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st default: if ts.State != vdiff.ErrorState { ts.State = sts - } } @@ -491,21 +523,42 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st reports[table][shard] = dr tableSummaryMap[table] = ts - - // The global VDiff summary needs to align with the table states across all - // shards. It should progress from pending->started->completed with error at any point - // being sticky. This largely mirrors the global table summary, with the exception - // being the started state, which should also be sticky; i.e. we shouldn't go from - // started->pending->started in the global VDiff summary state. - if summary.State != ts.State && - (summary.State != vdiff.StartedState && ts.State != vdiff.PendingState) { - summary.State = ts.State - } } } } } - sort.Strings(shards) // sort for predictable output, for test purposes + + // The global VDiff summary should progress from pending->started->completed with + // error for any table being sticky for the global summary. We should only consider + // the VDiff to be complete if it's completed for every table on every shard. + if tableStateCounts[vdiff.ErrorState] > 0 { + summary.State = vdiff.ErrorState + } else if tableStateCounts[vdiff.StartedState] > 0 { + summary.State = vdiff.StartedState + } else if tableStateCounts[vdiff.PendingState] > 0 { + summary.State = vdiff.PendingState + } else if tableStateCounts[vdiff.CompletedState] == (len(tableSummaryMap) * len(shards)) { + // When doing shard consolidations/merges, we cannot rely solely on the + // vdiff_table state as there are N sources that we process rows from sequentially + // with each one writing to the shared _vt.vdiff_table record for the target shard. + // So we only mark the vdiff for the shard as completed when we've finished processing + // rows from all of the sources -- which is recorded by marking the vdiff done for the + // shard by setting _vt.vdiff.state = completed. + if completedShards == len(shards) { + summary.State = vdiff.CompletedState + } else { + summary.State = vdiff.StartedState + } + } else { + summary.State = vdiff.UnknownState + } + + // If the vdiff has been started then we can calculate the progress + if summary.State == vdiff.StartedState { + buildProgressReport(summary, totalRowsToCompare) + } + + sort.Strings(shards) // sort for predictable output summary.Shards = strings.Join(shards, ",") summary.TableSummaryMap = tableSummaryMap summary.Reports = reports @@ -554,3 +607,27 @@ func displayVDiff2ActionStatusResponse(wr *wrangler.Wrangler, format string, act wr.Logger().Printf(msg) } } + +func buildProgressReport(summary *vdiffSummary, rowsToCompare int64) { + report := &vdiff.ProgressReport{} + if summary.RowsCompared >= 1 { + // Round to 2 decimal points + report.Percentage = math.Round(math.Min((float64(summary.RowsCompared)/float64(rowsToCompare))*100, 100.00)*100) / 100 + } + if math.IsNaN(report.Percentage) { + report.Percentage = 0 + } + pctToGo := math.Abs(report.Percentage - 100.00) + startTime, _ := time.Parse(vdiff.TimestampFormat, summary.StartedAt) + curTime := time.Now().UTC() + runTime := curTime.Unix() - startTime.Unix() + if report.Percentage >= 1 { + // calculate how long 1% took, on avg, and multiply that by the % left + eta := time.Unix(((int64(runTime)/int64(report.Percentage))*int64(pctToGo))+curTime.Unix(), 1).UTC() + // cap the ETA at 1 year out to prevent providing nonsensical ETAs + if eta.Before(time.Now().UTC().AddDate(1, 0, 0)) { + report.ETA = eta.Format(vdiff.TimestampFormat) + } + } + summary.Progress = report +} diff --git a/go/vt/vtctl/vdiff2_test.go b/go/vt/vtctl/vdiff2_test.go index 3508f2321ca..db4827bd73f 100644 --- a/go/vt/vtctl/vdiff2_test.go +++ b/go/vt/vtctl/vdiff2_test.go @@ -1,9 +1,13 @@ package vtctl import ( + "math" "testing" + "time" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" ) func TestGetStructNames(t *testing.T) { @@ -15,3 +19,111 @@ func TestGetStructNames(t *testing.T) { want := []string{"A", "B"} require.EqualValues(t, want, got) } + +func TestBuildProgressReport(t *testing.T) { + type args struct { + summary *vdiffSummary + rowsToCompare int64 + } + tests := []struct { + name string + args args + want *vdiff.ProgressReport + }{ + { + name: "no progress", + args: args{ + summary: &vdiffSummary{RowsCompared: 0}, + rowsToCompare: 100, + }, + want: &vdiff.ProgressReport{ + Percentage: 0, + ETA: "", // no ETA + }, + }, + { + name: "one third of the way", + args: args{ + summary: &vdiffSummary{ + RowsCompared: 33, + StartedAt: time.Now().Add(-10 * time.Second).UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 100, + }, + want: &vdiff.ProgressReport{ + Percentage: 33, + ETA: time.Now().Add(20 * time.Second).UTC().Format(vdiff.TimestampFormat), + }, + }, + { + name: "half way", + args: args{ + summary: &vdiffSummary{ + RowsCompared: 5000000000, + StartedAt: time.Now().Add(-10 * time.Hour).UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 10000000000, + }, + want: &vdiff.ProgressReport{ + Percentage: 50, + ETA: time.Now().Add(10 * time.Hour).UTC().Format(vdiff.TimestampFormat), + }, + }, + { + name: "full progress", + args: args{ + summary: &vdiffSummary{ + RowsCompared: 100, + CompletedAt: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 100, + }, + want: &vdiff.ProgressReport{ + Percentage: 100, + ETA: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + }, + { + name: "more than in I_S", + args: args{ + summary: &vdiffSummary{ + RowsCompared: 100, + CompletedAt: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 50, + }, + want: &vdiff.ProgressReport{ + Percentage: 100, + ETA: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buildProgressReport(tt.args.summary, tt.args.rowsToCompare) + // We always check the percentage + require.Equal(t, tt.want.Percentage, tt.args.summary.Progress.Percentage) + + // We only check the ETA if there is one + if tt.want.ETA != "" { + // Let's check that we're within 1 second to avoid flakes + wantTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA) + require.NoError(t, err) + var timeDiff float64 + if tt.want.Percentage == 100 { + completedTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.CompletedAt) + require.NoError(t, err) + timeDiff = math.Abs(completedTime.Sub(wantTime).Seconds()) + } else { + startTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.StartedAt) + require.NoError(t, err) + completedTimeUnix := float64(time.Now().UTC().Unix()-startTime.UTC().Unix()) * (100 / tt.want.Percentage) + estimatedTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA) + require.NoError(t, err) + timeDiff = math.Abs(estimatedTime.Sub(startTime).Seconds() - completedTimeUnix) + } + require.LessOrEqual(t, timeDiff, 1.0) + } + }) + } +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index 475c3dc5748..5b7d2b99539 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -25,6 +25,7 @@ import ( "github.com/google/uuid" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/withddl" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -59,6 +60,11 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat return nil, err } defer dbClient.Close() + + vde.vdiffSchemaCreateOnce.Do(func() { + _, _ = withDDL.Exec(ctx, withddl.QueryToTriggerWithDDL, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + }) + var qr *sqltypes.Result var err error options := req.Options @@ -67,18 +73,21 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat switch action { case CreateAction, ResumeAction: query := fmt.Sprintf(sqlGetVDiffID, encodeString(req.VdiffUuid)) - if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return nil, err } recordFound := len(qr.Rows) == 1 if recordFound && action == CreateAction { - return nil, fmt.Errorf("vdiff with UUID %s already exists", req.VdiffUuid) + return nil, fmt.Errorf("vdiff with UUID %s already exists on tablet %v", + req.VdiffUuid, vde.thisTablet.Alias) } else if action == ResumeAction { if !recordFound { - return nil, fmt.Errorf("vdiff with UUID %s not found", req.VdiffUuid) + return nil, fmt.Errorf("vdiff with UUID %s not found on tablet %v", + req.VdiffUuid, vde.thisTablet.Alias) } if resp.Id, err = qr.Named().Row().ToInt64("id"); err != nil { - return nil, fmt.Errorf("vdiff found with invalid id: %w", err) + return nil, fmt.Errorf("vdiff found with invalid id on tablet %v: %w", + vde.thisTablet.Alias, err) } } options, err = vde.fixupOptions(options) @@ -93,27 +102,36 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat query := fmt.Sprintf(sqlNewVDiff, encodeString(req.Keyspace), encodeString(req.Workflow), "pending", encodeString(string(optionsJSON)), vde.thisTablet.Shard, topoproto.TabletDbName(vde.thisTablet), req.VdiffUuid) - if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return nil, err } if qr.InsertID == 0 { - return nil, fmt.Errorf("unable to create vdiff record (%w); statement: %s", err, query) + return nil, fmt.Errorf("unable to create vdiff for UUID %s on tablet %v (%w)", + req.VdiffUuid, vde.thisTablet.Alias, err) } resp.Id = int64(qr.InsertID) } else { query := fmt.Sprintf(sqlResumeVDiff, encodeString(string(optionsJSON)), encodeString(req.VdiffUuid)) - if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return nil, err } if qr.RowsAffected == 0 { - return nil, fmt.Errorf("unable to update vdiff record (%w); statement: %s", err, query) + msg := fmt.Sprintf("no completed vdiff found for UUID %s on tablet %v", + req.VdiffUuid, vde.thisTablet.Alias) + if err != nil { + msg = fmt.Sprintf("%s (%v)", msg, err) + } + return nil, fmt.Errorf(msg) } } + resp.VdiffUuid = req.VdiffUuid - qr, err := vde.getVDiffByID(ctx, resp.Id) + qr, err := vde.getVDiffByID(ctx, dbClient, resp.Id) if err != nil { return nil, err } + vde.mu.Lock() + defer vde.mu.Unlock() if err := vde.addController(qr.Named().Row(), options); err != nil { return nil, err } @@ -121,7 +139,7 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat vdiffUUID := "" if req.SubCommand == LastActionArg { query := fmt.Sprintf(sqlGetMostRecentVDiff, encodeString(req.Keyspace), encodeString(req.Workflow)) - if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return nil, err } if len(qr.Rows) == 1 { @@ -136,12 +154,13 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat if vdiffUUID != "" { resp.VdiffUuid = vdiffUUID query := fmt.Sprintf(sqlGetVDiffByKeyspaceWorkflowUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(vdiffUUID)) - if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return nil, err } switch len(qr.Rows) { case 0: - return nil, fmt.Errorf("no vdiff found for keyspace %s and workflow %s: %s", req.Keyspace, req.Workflow, query) + return nil, fmt.Errorf("no vdiff found for UUID %s keyspace %s and workflow %s on tablet %v", + vdiffUUID, req.Keyspace, req.Workflow, vde.thisTablet.Alias) case 1: row := qr.Named().Row() vdiffID, _ := row["id"].ToInt64() @@ -151,12 +170,13 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat return nil, err } default: - return nil, fmt.Errorf("too many vdiffs found (%d) for keyspace %s and workflow %s", len(qr.Rows), req.Keyspace, req.Workflow) + return nil, fmt.Errorf("too many vdiffs found (%d) for UUID %s keyspace %s and workflow %s on tablet %v", + len(qr.Rows), vdiffUUID, req.Keyspace, req.Workflow, vde.thisTablet.Alias) } } switch req.SubCommand { case AllActionArg: - if qr, err = withDDL.Exec(context.Background(), sqlGetAllVDiffs, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(sqlGetAllVDiffs, -1); err != nil { return nil, err } resp.Output = sqltypes.ResultToProto3(qr) @@ -176,9 +196,9 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat if err != nil { return nil, fmt.Errorf("action argument %s not supported", req.SubCommand) } - query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(uuid.String())) + query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(uuid.String())) } - if _, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if _, err = dbClient.ExecuteFetch(query, 1); err != nil { return nil, err } default: @@ -192,14 +212,14 @@ func (vde *Engine) getVDiffSummary(vdiffID int64, dbClient binlogplayer.DBClient var err error query := fmt.Sprintf(sqlVDiffSummary, vdiffID) - if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if qr, err = dbClient.ExecuteFetch(query, -1); err != nil { return nil, err } return sqltypes.ResultToProto3(qr), nil } -// Validate vdiff options. Also setup defaults where applicable +// Validate vdiff options. Also setup defaults where applicable. func (vde *Engine) fixupOptions(options *tabletmanagerdatapb.VDiffOptions) (*tabletmanagerdatapb.VDiffOptions, error) { // Assign defaults to sourceCell and targetCell if not specified. sourceCell := options.PickerOptions.SourceCell diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index fd663b0a3c1..24c95567567 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -18,12 +18,12 @@ package vdiff import ( "context" + "errors" "fmt" "strings" - "vitess.io/vitess/go/vt/withddl" - "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/vterrors" "google.golang.org/protobuf/encoding/prototext" @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -101,29 +102,20 @@ func (ct *controller) Stop() { func (ct *controller) run(ctx context.Context) { defer func() { - log.Infof("vdiff stopped") + log.Infof("Run finished for vdiff %s", ct.uuid) close(ct.done) }() dbClient := ct.vde.dbClientFactoryFiltered() if err := dbClient.Connect(); err != nil { - log.Errorf("db connect error: %v", err) + log.Errorf("Encountered an error connecting to database for vdiff %s: %v", ct.uuid, err) return } defer dbClient.Close() - ct.vde.vdiffSchemaCreateOnce.Do(func() { - _, _ = withDDL.ExecIgnore(ctx, withddl.QueryToTriggerWithDDL, dbClient.ExecuteFetch) - }) - - query := fmt.Sprintf(sqlGetVDiffByID, ct.id) - qr, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + qr, err := ct.vde.getVDiffByID(ctx, dbClient, ct.id) if err != nil { - log.Errorf(fmt.Sprintf("No data for %s", query), err) - return - } - if len(qr.Rows) == 0 { - log.Errorf("Missing vdiff row for %s", query) + log.Errorf("Encountered an error getting vdiff record for %s: %v", ct.uuid, err) return } @@ -131,19 +123,18 @@ func (ct *controller) run(ctx context.Context) { state := VDiffState(strings.ToLower(row["state"].ToString())) switch state { case PendingState: - log.Infof("Starting vdiff") + log.Infof("Starting vdiff %s", ct.uuid) if err := ct.start(ctx, dbClient); err != nil { - log.Errorf("run() failed: %s", err) + log.Errorf("Encountered an error for vdiff %s: %s", ct.uuid, err) insertVDiffLog(ctx, dbClient, ct.id, fmt.Sprintf("Error: %s", err)) - if err := ct.updateState(dbClient, ErrorState); err != nil { - return + if err = ct.updateState(dbClient, ErrorState, err); err != nil { + log.Errorf("Encountered an error marking vdiff %s as errored: %v", ct.uuid, err) } return } default: - log.Infof("run() done, state is %s", state) + log.Infof("VDiff %s was not marked as pending, doing nothing", state) } - log.Infof("run() has ended") } type migrationSource struct { @@ -153,15 +144,21 @@ type migrationSource struct { position mysql.Position } -func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffState) error { +func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffState, err error) error { extraCols := "" - if state == StartedState { + switch state { + case StartedState: extraCols = ", started_at = utc_timestamp()" - } else if state == CompletedState { + case CompletedState: extraCols = ", completed_at = utc_timestamp()" + default: + } + if err == nil { + // Clear out any previous error for the vdiff on this shard + err = errors.New("") } - query := fmt.Sprintf(sqlUpdateVDiffState, encodeString(string(state)), extraCols, ct.id) - if _, err := withDDL.Exec(ct.vde.ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + query := fmt.Sprintf(sqlUpdateVDiffState, encodeString(string(state)), encodeString(err.Error()), extraCols, ct.id) + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } insertVDiffLog(ct.vde.ctx, dbClient, ct.id, fmt.Sprintf("State changed to: %s", state)) @@ -169,14 +166,24 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta } func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) error { + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + } ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow), encodeString(ct.vde.dbName)) query := fmt.Sprintf(sqlGetVReplicationEntry, ct.workflowFilter) - qr, err := withDDL.Exec(ct.vde.ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + qr, err := dbClient.ExecuteFetch(query, -1) if err != nil { return err } log.Infof("Found %d vreplication streams for %s", len(qr.Rows), ct.workflow) for i, row := range qr.Named().Rows { + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + } source := newMigrationSource() sourceBytes, err := row["source"].ToBytes() if err != nil { @@ -184,7 +191,7 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) } var bls binlogdatapb.BinlogSource if err := prototext.Unmarshal(sourceBytes, &bls); err != nil { - log.Errorf("Failed to unmarshal vdiff binlog source: %v", err) + log.Errorf("Encountered an error unmarshalling vdiff binlog source for %s: %v", ct.uuid, err) return err } source.shard = bls.Shard @@ -205,11 +212,11 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) if err != nil { return err } - if err := ct.updateState(dbClient, StartedState); err != nil { + if err := ct.updateState(dbClient, StartedState, nil); err != nil { return err } if err := wd.diff(ctx); err != nil { - log.Infof("wd.diff error %v", err) + log.Errorf("Encountered an error performing workflow diff for vdiff %s: %v", ct.uuid, err) return err } @@ -221,6 +228,6 @@ func newMigrationSource() *migrationSource { } func (ct *controller) validate() error { - // todo: check if vreplication workflow has errors, what else? + // TODO: check if vreplication workflow has errors, what else? return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index c1562c2f5e8..a1b4cdacf37 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -19,10 +19,12 @@ package vdiff import ( "context" "encoding/json" + "errors" "fmt" "sync" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" @@ -59,7 +61,8 @@ type Engine struct { // snapshotMu is used to ensure that only one vdiff snapshot cycle is active at a time, // because we stop/start vreplication workflows during this process - snapshotMu sync.Mutex + snapshotMu sync.Mutex + vdiffSchemaCreateOnce sync.Once } @@ -92,7 +95,8 @@ func (vde *Engine) Open(ctx context.Context, vre *vreplication.Engine) { if vde.ts == nil || vde.isOpen { return } - log.Infof("VDiff Engine: opening") + log.Infof("VDiff Engine: opening...") + if vde.cancelRetry != nil { vde.cancelRetry() vde.cancelRetry = nil @@ -107,16 +111,21 @@ func (vde *Engine) Open(ctx context.Context, vre *vreplication.Engine) { } func (vde *Engine) openLocked(ctx context.Context) error { + // Start any pending VDiffs rows, err := vde.getPendingVDiffs(ctx) if err != nil { return err } - vde.ctx, vde.cancel = context.WithCancel(ctx) - vde.isOpen = true + vde.isOpen = true // now we are open and have things to close if err := vde.initControllers(rows); err != nil { return err } + + // At this point we've fully and succesfully opened so begin + // retrying error'd VDiffs until the engine is closed. + go vde.retryErroredVDiffs() + return nil } @@ -144,7 +153,7 @@ func (vde *Engine) retry(ctx context.Context, err error) { default: } if err := vde.openLocked(ctx); err == nil { - log.Infof("VDiff engine opened successfully") + log.Infof("VDiff engine: opened successfully") // Don't invoke cancelRetry because openLocked // will hold on to this context for later cancelation. vde.cancelRetry = nil @@ -155,10 +164,13 @@ func (vde *Engine) retry(ctx context.Context, err error) { } } +// addController creates a new controller using the given vdiff record and adds it to the engine. +// You must already have the main engine mutex (mu) locked before calling this. func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletmanagerdata.VDiffOptions) error { ct, err := newController(vde.ctx, row, vde.dbClientFactoryDba, vde.ts, vde, options) if err != nil { - return fmt.Errorf("controller could not be initialized for stream: %+v", row) + return fmt.Errorf("controller could not be initialized for stream %+v on tablet %v", + row, vde.thisTablet.Alias) } vde.controllers[ct.id] = ct return nil @@ -170,7 +182,7 @@ func (vde *Engine) initControllers(qr *sqltypes.Result) error { } for _, row := range qr.Named().Rows { options := &tabletmanagerdata.VDiffOptions{} - if err := json.Unmarshal([]byte(row.AsString("options", "{}")), options); err != nil { + if err := json.Unmarshal(row.AsBytes("options", []byte("{}")), options); err != nil { return err } if err := vde.addController(row, options); err != nil { @@ -205,6 +217,7 @@ func (vde *Engine) Close() { } vde.cancel() + // We still have to wait for all controllers to stop. for _, ct := range vde.controllers { ct.Stop() @@ -225,12 +238,16 @@ func (vde *Engine) getDBClient(isAdmin bool) binlogplayer.DBClient { } return vde.dbClientFactoryFiltered() } + func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, error) { dbClient := vde.dbClientFactoryFiltered() if err := dbClient.Connect(); err != nil { return nil, err } defer dbClient.Close() + + // We have to use ExecIgnore here so as not to block quick tablet state + // transitions from primary to non-primary when starting the engine qr, err := withDDL.ExecIgnore(ctx, sqlGetPendingVDiffs, dbClient.ExecuteFetch) if err != nil { return nil, err @@ -241,18 +258,88 @@ func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, erro return qr, nil } -func (vde *Engine) getVDiffByID(ctx context.Context, id int64) (*sqltypes.Result, error) { - dbClient := vde.dbClientFactoryFiltered() - if err := dbClient.Connect(); err != nil { +func (vde *Engine) getVDiffsToRetry(ctx context.Context, dbClient binlogplayer.DBClient) (*sqltypes.Result, error) { + qr, err := withDDL.Exec(ctx, sqlGetVDiffsToRetry, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + if err != nil { return nil, err } - defer dbClient.Close() - qr, err := withDDL.Exec(ctx, fmt.Sprintf(sqlGetVDiffByID, id), dbClient.ExecuteFetch, dbClient.ExecuteFetch) + if len(qr.Rows) == 0 { + return nil, nil + } + return qr, nil +} + +func (vde *Engine) getVDiffByID(ctx context.Context, dbClient binlogplayer.DBClient, id int64) (*sqltypes.Result, error) { + qr, err := dbClient.ExecuteFetch(fmt.Sprintf(sqlGetVDiffByID, id), -1) if err != nil { return nil, err } if len(qr.Rows) != 1 { - return nil, fmt.Errorf("unable to read vdiff table for %d", id) + return nil, fmt.Errorf("no vdiff found for id %d on tablet %v", + id, vde.thisTablet.Alias) } return qr, nil } + +func (vde *Engine) retryVDiffs(ctx context.Context) error { + vde.mu.Lock() + defer vde.mu.Unlock() + dbClient := vde.dbClientFactoryFiltered() + if err := dbClient.Connect(); err != nil { + return err + } + defer dbClient.Close() + + qr, err := vde.getVDiffsToRetry(ctx, dbClient) + if err != nil { + return err + } + if qr == nil || len(qr.Rows) == 0 { + return nil + } + for _, row := range qr.Named().Rows { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + lastError := mysql.NewSQLErrorFromError(errors.New(row.AsString("last_error", ""))) + if !mysql.IsEphemeralError(lastError) { + continue + } + uuid := row.AsString("vdiff_uuid", "") + id, err := row.ToInt64("id") + if err != nil { + return err + } + log.Infof("Retrying vdiff %s that had an ephemeral error of '%v'", uuid, lastError) + if _, err = dbClient.ExecuteFetch(fmt.Sprintf(sqlRetryVDiff, id), 1); err != nil { + return err + } + options := &tabletmanagerdata.VDiffOptions{} + if err := json.Unmarshal(row.AsBytes("options", []byte("{}")), options); err != nil { + return err + } + if err := vde.addController(row, options); err != nil { + return err + } + } + return nil +} + +func (vde *Engine) retryErroredVDiffs() { + tkr := time.NewTicker(time.Second * 30) + defer tkr.Stop() + for { + select { + case <-vde.ctx.Done(): + log.Info("VDiff engine: closing...") + return + case <-tkr.C: + } + + if err := vde.retryVDiffs(vde.ctx); err != nil { + log.Errorf("Error retrying vdiffs: %v", err) + } + } +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/primitive_executor.go b/go/vt/vttablet/tabletmanager/vdiff/primitive_executor.go index 0734ce0f42f..f2be4bae995 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/primitive_executor.go +++ b/go/vt/vttablet/tabletmanager/vdiff/primitive_executor.go @@ -76,7 +76,7 @@ func newPrimitiveExecutor(ctx context.Context, prim vtgateEngine.Primitive, name } // next gets the next row in the stream for this shard, if there's currently no rows to process in the stream then wait on the -// result channel for the shard streamer to produce them +// result channel for the shard streamer to produce them. func (pe *primitiveExecutor) next() ([]sqltypes.Value, error) { for len(pe.rows) == 0 { qr, ok := <-pe.resultch @@ -92,7 +92,7 @@ func (pe *primitiveExecutor) next() ([]sqltypes.Value, error) { } // drain fastforward's a shard to process (and ignore) everything from its results stream and return a count of the -// discarded rows +// discarded rows. func (pe *primitiveExecutor) drain(ctx context.Context) (int64, error) { var count int64 for { diff --git a/go/vt/vttablet/tabletmanager/vdiff/report.go b/go/vt/vttablet/tabletmanager/vdiff/report.go index b32dcd8e157..4f9b264cddd 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/report.go +++ b/go/vt/vttablet/tabletmanager/vdiff/report.go @@ -48,6 +48,11 @@ type DiffReport struct { MismatchedRowsDiffs []*DiffMismatch `json:"MismatchedRowsSample,omitempty"` } +type ProgressReport struct { + Percentage float64 + ETA string `json:"ETA,omitempty"` // a formatted date +} + // DiffMismatch is a sample of row diffs between source and target. type DiffMismatch struct { Source *RowDiff `json:"Source,omitempty"` diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 30ef161d631..1960714f897 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -26,13 +26,19 @@ func init() { ddls = append(ddls, sqlCreateSidecarDB, sqlCreateVDiffTable, sqlCreateVDiffTableTable, sqlCreateVDiffLogTable) // Changes to VDiff related schema over time ddls = append(ddls, - "ALTER TABLE _vt.vdiff MODIFY COLUMN id bigint AUTO_INCREMENT", - "ALTER TABLE _vt.vdiff CHANGE started_timestamp started_at timestamp NULL DEFAULT NULL", - "ALTER TABLE _vt.vdiff CHANGE completed_timestamp completed_at timestamp NULL DEFAULT NULL", - "ALTER TABLE _vt.vdiff MODIFY COLUMN state varbinary(64)", - "ALTER TABLE _vt.vdiff_table MODIFY COLUMN table_name varbinary(128)", - "ALTER TABLE _vt.vdiff_table MODIFY COLUMN state varbinary(64)", - "ALTER TABLE _vt.vdiff_table MODIFY COLUMN lastpk varbinary(2000)", + `ALTER TABLE _vt.vdiff MODIFY COLUMN id bigint AUTO_INCREMENT`, + `ALTER TABLE _vt.vdiff CHANGE started_timestamp started_at timestamp NULL DEFAULT NULL`, + `ALTER TABLE _vt.vdiff CHANGE completed_timestamp completed_at timestamp NULL DEFAULT NULL`, + `ALTER TABLE _vt.vdiff MODIFY COLUMN state varbinary(64)`, + `ALTER TABLE _vt.vdiff MODIFY COLUMN keyspace varbinary(256)`, + `ALTER TABLE _vt.vdiff ADD COLUMN last_error varbinary(512)`, + `ALTER TABLE _vt.vdiff ADD INDEX (state)`, + `ALTER TABLE _vt.vdiff ADD INDEX ks_wf_idx (keyspace(64), workflow(64))`, + `ALTER TABLE _vt.vdiff_table MODIFY COLUMN table_name varbinary(128)`, + `ALTER TABLE _vt.vdiff_table MODIFY COLUMN state varbinary(64)`, + `ALTER TABLE _vt.vdiff_table MODIFY COLUMN lastpk varbinary(2000)`, + `ALTER TABLE _vt.vdiff_table MODIFY COLUMN table_rows bigint not null default 0`, + `ALTER TABLE _vt.vdiff_table MODIFY COLUMN rows_compared bigint not null default 0`, ) withDDL = withddl.New(ddls) } @@ -76,8 +82,10 @@ const ( primary key (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4` sqlNewVDiff = "insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values(%s, %s, '%s', %s, '%s', '%s', '%s')" - sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending', - vd.options = %s, vdt.state = 'pending', vdt.rows_compared = 0 where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id` + sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.options = %s, vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending', + vdt.state = 'pending' where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id and vd.state = 'completed' and vdt.state = 'completed'` + sqlRetryVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'pending', vd.last_error = '', vdt.state = 'pending' + where vd.id = %d and vd.id = vdt.vdiff_id and vd.state = 'error' and vdt.state = 'error'` sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %s and workflow = %s and vdiff_uuid = %s" sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit 1" sqlGetVDiffByID = "select * from _vt.vdiff where id = %d" @@ -85,28 +93,33 @@ const ( inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) where vd.keyspace = %s and vd.workflow = %s` sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - and vd.keyspace = %s and vd.workflow = %s and vd.vdiff_uuid = %s` - sqlVDiffSummary = `select vd.state as vdiff_state, vdt.table_name as table_name, + and vd.vdiff_uuid = %s` + sqlVDiffSummary = `select vd.state as vdiff_state, vd.last_error as last_error, vdt.table_name as table_name, vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows, - vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at, - IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report + vd.started_at as started_at, vdt.table_rows as table_rows, vdt.rows_compared as rows_compared, + vd.completed_at as completed_at, IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vdt.vdiff_id = %d` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` - sqlUpdateVDiffState = "update _vt.vdiff set state = %s %s where id = %d" + sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" sqlGetVReplicationEntry = "select * from _vt.vreplication %s" sqlGetPendingVDiffs = "select * from _vt.vdiff where state = 'pending'" + sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and options->>'$.core_options.auto_retry' = 'true'" sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %s" sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc" + sqlGetTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name = %s" + sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%d, %s, 'pending', %d)" - sqlGetVDiffTable = `select vdt.lastpk as lastpk from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report + from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vdt.vdiff_id = %d and vdt.table_name = %s` - sqlUpdateTableRows = "update _vt.vdiff_table set table_rows = %d where vdiff_id = %d and table_name = %s" - sqlUpdateTableProgress = "update _vt.vdiff_table set rows_compared = %d, lastpk = %s where vdiff_id = %d and table_name = %s" - sqlUpdateTableNoProgress = "update _vt.vdiff_table set rows_compared = %d where vdiff_id = %d and table_name = %s" - sqlUpdateTableState = "update _vt.vdiff_table set state = %s, report = %s where vdiff_id = %d and table_name = %s" - sqlUpdateTableMismatch = "update _vt.vdiff_table set mismatch = true where vdiff_id = %d and table_name = %s" + sqlUpdateTableRows = "update _vt.vdiff_table set table_rows = %d where vdiff_id = %d and table_name = %s" + sqlUpdateTableProgress = "update _vt.vdiff_table set rows_compared = %d, lastpk = %s, report = %s where vdiff_id = %d and table_name = %s" + sqlUpdateTableNoProgress = "update _vt.vdiff_table set rows_compared = %d, report = %s where vdiff_id = %d and table_name = %s" + sqlUpdateTableState = "update _vt.vdiff_table set state = %s where vdiff_id = %d and table_name = %s" + sqlUpdateTableStateAndReport = "update _vt.vdiff_table set state = %s, rows_compared = %d, report = %s where vdiff_id = %d and table_name = %s" + sqlUpdateTableMismatch = "update _vt.vdiff_table set mismatch = true where vdiff_id = %d and table_name = %s" sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %d and state != 'completed'" ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/shard_streamer.go b/go/vt/vttablet/tabletmanager/vdiff/shard_streamer.go index e700a9b9629..6d016709cce 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/shard_streamer.go +++ b/go/vt/vttablet/tabletmanager/vdiff/shard_streamer.go @@ -38,7 +38,7 @@ type shardStreamer struct { } // StreamExecute implements the StreamExecutor interface of the Primitive executor and -// it simply waits for a result to be available for this shard and sends it to the merge sorter +// it simply waits for a result to be available for this shard and sends it to the merge sorter. func (sm *shardStreamer) StreamExecute(ctx context.Context, vcursor engine.VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { for result := range sm.result { if err := callback(result); err != nil { diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 19ba1a7603f..f6f67f2f03c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -24,6 +24,7 @@ import ( "time" "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo" "google.golang.org/protobuf/encoding/prototext" @@ -140,7 +141,7 @@ func (td *tableDiffer) stopTargetVReplicationStreams(ctx context.Context, dbClie // update position of all source streams query = fmt.Sprintf("select id, source, pos from _vt.vreplication %s", ct.workflowFilter) - qr, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + qr, err := dbClient.ExecuteFetch(query, -1) if err != nil { return err } @@ -152,7 +153,8 @@ func (td *tableDiffer) stopTargetVReplicationStreams(ctx context.Context, dbClie return err } if mpos.IsZero() { - return fmt.Errorf("stream %d has not started", id) + return fmt.Errorf("stream %d has not started on tablet %v", + id, td.wd.ct.vde.thisTablet.Alias) } sourceBytes, err := row["source"].ToBytes() if err != nil { @@ -347,7 +349,8 @@ func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStr if len(fields) == 0 { if len(vsr.Fields) == 0 { - return fmt.Errorf("did not received expected fields in response %+v", vsr) + return fmt.Errorf("did not received expected fields in response %+v on tablet %v", + vsr, td.wd.ct.vde.thisTablet.Alias) } fields = vsr.Fields gtidch <- vsr.Gtid @@ -406,18 +409,40 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare *int64, debug, on } defer dbClient.Close() + // We need to continue were we left off when appropriate. This can be an + // auto-retry on error, or a manual retry via the resume command. + // Otherwise the existing state will be empty and we start from scratch. + query := fmt.Sprintf(sqlGetVDiffTable, td.wd.ct.id, encodeString(td.table.Name)) + cs, err := dbClient.ExecuteFetch(query, -1) + if err != nil { + return nil, err + } + if len(cs.Rows) == 0 { + return nil, fmt.Errorf("no state found for vdiff table %s for vdiff_id %d on tablet %v", + td.table.Name, td.wd.ct.id, td.wd.ct.vde.thisTablet.Alias) + } else if len(cs.Rows) > 1 { + return nil, fmt.Errorf("invalid state found for vdiff table %s (multiple records) for vdiff_id %d on tablet %v", + td.table.Name, td.wd.ct.id, td.wd.ct.vde.thisTablet.Alias) + } + curState := cs.Named().Row() + mismatch := curState.AsBool("mismatch", false) + dr := &DiffReport{} + if rpt := curState.AsBytes("report", []byte("{}")); json.Valid(rpt) { + if err = json.Unmarshal(rpt, dr); err != nil { + return nil, err + } + } + dr.TableName = td.table.Name + sourceExecutor := newPrimitiveExecutor(ctx, td.sourcePrimitive, "source") targetExecutor := newPrimitiveExecutor(ctx, td.targetPrimitive, "target") - dr := &DiffReport{TableName: td.table.Name} var sourceRow, lastProcessedRow, targetRow []sqltypes.Value - var err error advanceSource := true advanceTarget := true - mismatch := false // Save our progress when we finish the run defer func() { - if err := td.updateTableProgress(dbClient, dr.ProcessedRows, lastProcessedRow); err != nil { + if err := td.updateTableProgress(dbClient, dr, lastProcessedRow); err != nil { log.Errorf("Failed to update vdiff progress on %s table: %v", td.table.Name, err) } }() @@ -425,6 +450,12 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare *int64, debug, on for { lastProcessedRow = sourceRow + select { + case <-ctx.Done(): + return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + } + if !mismatch && dr.MismatchedRows > 0 { mismatch = true log.Infof("Flagging mismatch for %s: %+v", td.table.Name, dr) @@ -550,7 +581,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare *int64, debug, on // approximate progress information but without too much overhead for when it's not // needed or even desired. if dr.ProcessedRows%1e4 == 0 { - if err := td.updateTableProgress(dbClient, dr.ProcessedRows, sourceRow); err != nil { + if err := td.updateTableProgress(dbClient, dr, sourceRow); err != nil { return nil, err } } @@ -583,44 +614,77 @@ func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []com return 0, nil } -func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, numRows int64, lastRow []sqltypes.Value) error { +func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *DiffReport, lastRow []sqltypes.Value) error { + if dr == nil { + return fmt.Errorf("cannot update progress with a nil diff report") + } var lastPK []byte var err error var query string + rpt, err := json.Marshal(dr) + if err != nil { + return err + } if lastRow != nil { lastPK, err = td.lastPKFromRow(lastRow) if err != nil { return err } - query = fmt.Sprintf(sqlUpdateTableProgress, numRows, encodeString(string(lastPK)), td.wd.ct.id, encodeString(td.table.Name)) - } else if numRows != 0 { - // This should never happen - return fmt.Errorf("invalid vdiff state detected, %d row(s) were processed but the row data is missing", numRows) + + query = fmt.Sprintf(sqlUpdateTableProgress, dr.ProcessedRows, encodeString(string(lastPK)), encodeString(string(rpt)), td.wd.ct.id, encodeString(td.table.Name)) } else { - // We didn't process any rows this time around so reflect that and keep any - // lastpk from a previous run. This is only relevant for RESUMEd vdiffs. - query = fmt.Sprintf(sqlUpdateTableNoProgress, numRows, td.wd.ct.id, encodeString(td.table.Name)) + query = fmt.Sprintf(sqlUpdateTableNoProgress, dr.ProcessedRows, encodeString(string(rpt)), td.wd.ct.id, encodeString(td.table.Name)) + } + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + return err + } + return nil +} + +func (td *tableDiffer) updateTableRows(ctx context.Context, dbClient binlogplayer.DBClient) error { + query := fmt.Sprintf(sqlGetTableRows, encodeString(td.wd.ct.vde.dbName), encodeString(td.table.Name)) + qr, err := dbClient.ExecuteFetch(query, 1) + if err != nil { + return err + } + if len(qr.Rows) == 0 { + return fmt.Errorf("no information_schema status found for table %s on tablet %v", + td.table.Name, td.wd.ct.vde.thisTablet.Alias) } + row := qr.Named().Row() + query = fmt.Sprintf(sqlUpdateTableRows, row.AsInt64("table_rows", 0), td.wd.ct.id, encodeString(td.table.Name)) if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } return nil } -func (td *tableDiffer) updateTableState(ctx context.Context, dbClient binlogplayer.DBClient, tableName string, state VDiffState, dr *DiffReport) error { - reportJSON := "{}" +func (td *tableDiffer) updateTableState(ctx context.Context, dbClient binlogplayer.DBClient, state VDiffState) error { + query := fmt.Sprintf(sqlUpdateTableState, encodeString(string(state)), td.wd.ct.id, encodeString(td.table.Name)) + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + return err + } + insertVDiffLog(ctx, dbClient, td.wd.ct.id, fmt.Sprintf("%s: table %s", state, encodeString(td.table.Name))) + + return nil +} + +func (td *tableDiffer) updateTableStateAndReport(ctx context.Context, dbClient binlogplayer.DBClient, state VDiffState, dr *DiffReport) error { + var report string if dr != nil { reportJSONBytes, err := json.Marshal(dr) if err != nil { return err } - reportJSON = string(reportJSONBytes) + report = string(reportJSONBytes) + } else { + report = "{}" } - query := fmt.Sprintf(sqlUpdateTableState, encodeString(string(state)), encodeString(reportJSON), td.wd.ct.id, encodeString(tableName)) - if _, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + query := fmt.Sprintf(sqlUpdateTableStateAndReport, encodeString(string(state)), dr.ProcessedRows, encodeString(report), td.wd.ct.id, encodeString(td.table.Name)) + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } - insertVDiffLog(ctx, dbClient, td.wd.ct.id, fmt.Sprintf("%s: table %s", state, encodeString(tableName))) + insertVDiffLog(ctx, dbClient, td.wd.ct.id, fmt.Sprintf("%s: table %s", state, encodeString(td.table.Name))) return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index 5cc50181d39..6c97c658a4c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -121,7 +121,8 @@ func (td *tableDiffer) buildTablePlan() (*tablePlan, error) { colname := targetSelect.SelectExprs[i].(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() _, ok := fields[colname] if !ok { - return nil, fmt.Errorf("column %v not found in table %v", colname, tp.table.Name) + return nil, fmt.Errorf("column %v not found in table %v on tablet %v", + colname, tp.table.Name, td.wd.ct.vde.thisTablet.Alias) } tp.compareCols[i].colName = colname @@ -159,7 +160,7 @@ func (td *tableDiffer) buildTablePlan() (*tablePlan, error) { return tp, err } -// findPKs identifies PKs and removes them from the columns to do data comparison +// findPKs identifies PKs and removes them from the columns to do data comparison. func (tp *tablePlan) findPKs(targetSelect *sqlparser.Select) error { var orderby sqlparser.OrderBy for _, pk := range tp.table.PrimaryKeyColumns { diff --git a/go/vt/vttablet/tabletmanager/vdiff/utils.go b/go/vt/vttablet/tabletmanager/vdiff/utils.go index 37ee0832cc8..a6c711bae79 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/utils.go +++ b/go/vt/vttablet/tabletmanager/vdiff/utils.go @@ -30,7 +30,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/engine" ) -// newMergeSorter creates an engine.MergeSort based on the shard streamers and pk columns. +// newMergeSorter creates an engine.MergeSort based on the shard streamers and pk columns func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compareColInfo) *engine.MergeSort { prims := make([]engine.StreamExecutor, 0, len(participants)) for _, participant := range participants { @@ -72,7 +72,7 @@ func pkColsToGroupByParams(pkCols []int) []*engine.GroupByParams { func insertVDiffLog(ctx context.Context, dbClient binlogplayer.DBClient, vdiffID int64, message string) { query := "insert into _vt.vdiff_log(vdiff_id, message) values (%d, %s)" query = fmt.Sprintf(query, vdiffID, encodeString(message)) - if _, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { log.Error("Error inserting into _vt.vdiff_log: %v", err) } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 9d694be274d..cfc8e620a93 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -33,6 +33,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" @@ -40,12 +41,11 @@ import ( ) // workflowDiffer has metadata and state for the vdiff of a single workflow on this tablet -// only one vdiff can be running for a workflow at any time +// only one vdiff can be running for a workflow at any time. type workflowDiffer struct { ct *controller tableDiffers map[string]*tableDiffer // key is table name - tableSizes map[string]int64 // approx. size of tables when vdiff started opts *tabletmanagerdatapb.VDiffOptions } @@ -53,7 +53,6 @@ func newWorkflowDiffer(ct *controller, opts *tabletmanagerdatapb.VDiffOptions) ( wd := &workflowDiffer{ ct: ct, opts: opts, - tableSizes: make(map[string]int64), tableDiffers: make(map[string]*tableDiffer, 1), } return wd, nil @@ -95,60 +94,43 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa } func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { - tableName := td.table.Name - log.Infof("Starting differ on table %s", tableName) - if err := td.updateTableState(ctx, dbClient, tableName, StartedState, nil); err != nil { + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + } + + log.Infof("Starting differ on table %s for vdiff %s", td.table.Name, wd.ct.uuid) + if err := td.updateTableState(ctx, dbClient, StartedState); err != nil { return err } if err := td.initialize(ctx); err != nil { return err } - log.Infof("initialize done") + log.Infof("Table initialization done on table %s for vdiff %s", td.table.Name, wd.ct.uuid) dr, err := td.diff(ctx, &wd.opts.CoreOptions.MaxRows, wd.opts.ReportOptions.DebugQuery, false, wd.opts.CoreOptions.MaxExtraRowsToCompare) if err != nil { - log.Errorf("td.diff error %s", err.Error()) + log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err) return err } - log.Infof("td.diff done for %s, with dr %+v", tableName, dr) + log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr) if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 { wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare) } if dr.MismatchedRows > 0 || dr.ExtraRowsTarget > 0 || dr.ExtraRowsSource > 0 { - if err := updateTableMismatch(dbClient, wd.ct.id, tableName); err != nil { + if err := updateTableMismatch(dbClient, wd.ct.id, td.table.Name); err != nil { return err } } - log.Infof("td.diff after reconciliation for %s, with dr %+v", tableName, dr) - if err := td.updateTableState(ctx, dbClient, tableName, CompletedState, dr); err != nil { + log.Infof("Completed reconciliation on table %s for vdiff %s with updated report: %+v", td.table.Name, wd.ct.uuid, dr) + if err := td.updateTableStateAndReport(ctx, dbClient, CompletedState, dr); err != nil { return err } return nil } -func (wd *workflowDiffer) getTotalRowsEstimate(dbClient binlogplayer.DBClient) error { - query := "select db_name as db_name from _vt.vreplication where workflow = %s limit 1" - query = fmt.Sprintf(query, encodeString(wd.ct.workflow)) - qr, err := dbClient.ExecuteFetch(query, 1) - if err != nil { - return err - } - dbName, _ := qr.Named().Row().ToString("db_name") - query = "select table_name as table_name, table_rows as table_rows from information_schema.tables where table_schema = %s" - query = fmt.Sprintf(query, encodeString(dbName)) - qr, err = dbClient.ExecuteFetch(query, -1) - if err != nil { - return err - } - for _, row := range qr.Named().Rows { - tableName, _ := row.ToString("table_name") - tableRows, _ := row.ToInt64("table_rows") - wd.tableSizes[tableName] = tableRows - } - return nil -} - func (wd *workflowDiffer) diff(ctx context.Context) error { dbClient := wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { @@ -156,6 +138,12 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { } defer dbClient.Close() + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + } + filter := wd.ct.filter req := &tabletmanagerdatapb.GetSchemaRequest{} schm, err := schematools.GetSchema(ctx, wd.ct.ts, wd.ct.tmc, wd.ct.vde.thisTablet.Alias, req) @@ -165,39 +153,37 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { if err = wd.buildPlan(dbClient, filter, schm); err != nil { return vterrors.Wrap(err, "buildPlan") } - if err := wd.getTotalRowsEstimate(dbClient); err != nil { + if err := wd.initVDiffTables(dbClient); err != nil { return err } for _, td := range wd.tableDiffers { - tableRows, ok := wd.tableSizes[td.table.Name] - if !ok { - tableRows = 0 + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: } - var query string - query = fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(td.table.Name)) - qr, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(td.table.Name)) + qr, err := dbClient.ExecuteFetch(query, 1) if err != nil { return err } if len(qr.Rows) == 0 { - query = fmt.Sprintf(sqlNewVDiffTable, wd.ct.id, encodeString(td.table.Name), tableRows) - } else { - // Update the table rows estimate when resuming - query = fmt.Sprintf(sqlUpdateTableRows, tableRows, wd.ct.id, encodeString(td.table.Name)) - } - if _, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil { - return err + return fmt.Errorf("no vdiff table found for %s on tablet %v", + td.table.Name, wd.ct.vde.thisTablet.Alias) } - log.Infof("starting table %s", td.table.Name) + log.Infof("Starting diff of table %s for vdiff %s", td.table.Name, wd.ct.uuid) if err := wd.diffTable(ctx, dbClient, td); err != nil { - if err := td.updateTableState(ctx, dbClient, td.table.Name, ErrorState, nil); err != nil { + if err := td.updateTableState(ctx, dbClient, ErrorState); err != nil { return err } insertVDiffLog(ctx, dbClient, wd.ct.id, fmt.Sprintf("Table %s Error: %s", td.table.Name, err)) return err } - log.Infof("done table %s", td.table.Name) + if err := td.updateTableState(ctx, dbClient, CompletedState); err != nil { + return err + } + log.Infof("Completed diff of table %s for vdiff %s", td.table.Name, wd.ct.uuid) } if err := wd.markIfCompleted(ctx, dbClient); err != nil { return err @@ -207,12 +193,15 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { func (wd *workflowDiffer) markIfCompleted(ctx context.Context, dbClient binlogplayer.DBClient) error { query := fmt.Sprintf(sqlGetIncompleteTables, wd.ct.id) - qr, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch, dbClient.ExecuteFetch) + qr, err := dbClient.ExecuteFetch(query, -1) if err != nil { return err } + + // Double check to be sure all of the individual table diffs completed without error + // before marking the vdiff as completed. if len(qr.Rows) == 0 { - if err := wd.ct.updateState(dbClient, CompletedState); err != nil { + if err := wd.ct.updateState(dbClient, CompletedState, nil); err != nil { return err } } @@ -265,12 +254,13 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl } } if len(wd.tableDiffers) == 0 { - return fmt.Errorf("no tables found to diff, %s:%s", optTables, specifiedTables) + return fmt.Errorf("no tables found to diff, %s:%s, on tablet %v", + optTables, specifiedTables, wd.ct.vde.thisTablet.Alias) } return nil } -// getTableLastPK gets the lastPK protobuf message for a given vdiff table +// getTableLastPK gets the lastPK protobuf message for a given vdiff table. func (wd *workflowDiffer) getTableLastPK(dbClient binlogplayer.DBClient, tableName string) (*querypb.QueryResult, error) { query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(tableName)) qr, err := dbClient.ExecuteFetch(query, 1) @@ -292,3 +282,41 @@ func (wd *workflowDiffer) getTableLastPK(dbClient binlogplayer.DBClient, tableNa } return nil, nil } + +func (wd *workflowDiffer) initVDiffTables(dbClient binlogplayer.DBClient) error { + tableIn := strings.Builder{} + n := 0 + for tableName := range wd.tableDiffers { + tableIn.WriteString(encodeString(tableName)) + if n++; n < len(wd.tableDiffers) { + tableIn.WriteByte(',') + } + } + query := fmt.Sprintf(sqlGetAllTableRows, encodeString(wd.ct.vde.dbName), tableIn.String()) + qr, err := dbClient.ExecuteFetch(query, -1) + if err != nil { + return err + } + for _, row := range qr.Named().Rows { + tableName, _ := row.ToString("table_name") + tableRows, _ := row.ToInt64("table_rows") + + query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(tableName)) + qr, err := dbClient.ExecuteFetch(query, -1) + if err != nil { + return err + } + if len(qr.Rows) == 0 { + query = fmt.Sprintf(sqlNewVDiffTable, wd.ct.id, encodeString(tableName), tableRows) + } else if len(qr.Rows) == 1 { + query = fmt.Sprintf(sqlUpdateTableRows, tableRows, wd.ct.id, encodeString(tableName)) + } else { + return fmt.Errorf("invalid state found for vdiff table %s for vdiff_id %d on tablet %s", + tableName, wd.ct.id, wd.ct.vde.thisTablet.Alias) + } + if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + return err + } + } + return nil +} diff --git a/proto/tabletmanagerdata.proto b/proto/tabletmanagerdata.proto index 0cac42b42c7..fedf5c98010 100644 --- a/proto/tabletmanagerdata.proto +++ b/proto/tabletmanagerdata.proto @@ -529,7 +529,7 @@ message VDiffReportOptions { message VDiffCoreOptions { string tables = 1; - bool resumable = 2; + bool auto_retry = 2; int64 max_rows = 3; bool checksum = 4; int64 sample_pct = 5; diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 8c4e56692c8..cef82cb220b 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -22691,8 +22691,8 @@ export namespace tabletmanagerdata { /** VDiffCoreOptions tables */ tables?: (string|null); - /** VDiffCoreOptions resumable */ - resumable?: (boolean|null); + /** VDiffCoreOptions auto_retry */ + auto_retry?: (boolean|null); /** VDiffCoreOptions max_rows */ max_rows?: (number|Long|null); @@ -22722,8 +22722,8 @@ export namespace tabletmanagerdata { /** VDiffCoreOptions tables. */ public tables: string; - /** VDiffCoreOptions resumable. */ - public resumable: boolean; + /** VDiffCoreOptions auto_retry. */ + public auto_retry: boolean; /** VDiffCoreOptions max_rows. */ public max_rows: (number|Long); diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index de38cd0f56b..139273787c5 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -51640,7 +51640,7 @@ $root.tabletmanagerdata = (function() { * @memberof tabletmanagerdata * @interface IVDiffCoreOptions * @property {string|null} [tables] VDiffCoreOptions tables - * @property {boolean|null} [resumable] VDiffCoreOptions resumable + * @property {boolean|null} [auto_retry] VDiffCoreOptions auto_retry * @property {number|Long|null} [max_rows] VDiffCoreOptions max_rows * @property {boolean|null} [checksum] VDiffCoreOptions checksum * @property {number|Long|null} [sample_pct] VDiffCoreOptions sample_pct @@ -51672,12 +51672,12 @@ $root.tabletmanagerdata = (function() { VDiffCoreOptions.prototype.tables = ""; /** - * VDiffCoreOptions resumable. - * @member {boolean} resumable + * VDiffCoreOptions auto_retry. + * @member {boolean} auto_retry * @memberof tabletmanagerdata.VDiffCoreOptions * @instance */ - VDiffCoreOptions.prototype.resumable = false; + VDiffCoreOptions.prototype.auto_retry = false; /** * VDiffCoreOptions max_rows. @@ -51745,8 +51745,8 @@ $root.tabletmanagerdata = (function() { writer = $Writer.create(); if (message.tables != null && Object.hasOwnProperty.call(message, "tables")) writer.uint32(/* id 1, wireType 2 =*/10).string(message.tables); - if (message.resumable != null && Object.hasOwnProperty.call(message, "resumable")) - writer.uint32(/* id 2, wireType 0 =*/16).bool(message.resumable); + if (message.auto_retry != null && Object.hasOwnProperty.call(message, "auto_retry")) + writer.uint32(/* id 2, wireType 0 =*/16).bool(message.auto_retry); if (message.max_rows != null && Object.hasOwnProperty.call(message, "max_rows")) writer.uint32(/* id 3, wireType 0 =*/24).int64(message.max_rows); if (message.checksum != null && Object.hasOwnProperty.call(message, "checksum")) @@ -51795,7 +51795,7 @@ $root.tabletmanagerdata = (function() { message.tables = reader.string(); break; case 2: - message.resumable = reader.bool(); + message.auto_retry = reader.bool(); break; case 3: message.max_rows = reader.int64(); @@ -51850,9 +51850,9 @@ $root.tabletmanagerdata = (function() { if (message.tables != null && message.hasOwnProperty("tables")) if (!$util.isString(message.tables)) return "tables: string expected"; - if (message.resumable != null && message.hasOwnProperty("resumable")) - if (typeof message.resumable !== "boolean") - return "resumable: boolean expected"; + if (message.auto_retry != null && message.hasOwnProperty("auto_retry")) + if (typeof message.auto_retry !== "boolean") + return "auto_retry: boolean expected"; if (message.max_rows != null && message.hasOwnProperty("max_rows")) if (!$util.isInteger(message.max_rows) && !(message.max_rows && $util.isInteger(message.max_rows.low) && $util.isInteger(message.max_rows.high))) return "max_rows: integer|Long expected"; @@ -51885,8 +51885,8 @@ $root.tabletmanagerdata = (function() { var message = new $root.tabletmanagerdata.VDiffCoreOptions(); if (object.tables != null) message.tables = String(object.tables); - if (object.resumable != null) - message.resumable = Boolean(object.resumable); + if (object.auto_retry != null) + message.auto_retry = Boolean(object.auto_retry); if (object.max_rows != null) if ($util.Long) (message.max_rows = $util.Long.fromValue(object.max_rows)).unsigned = false; @@ -51943,7 +51943,7 @@ $root.tabletmanagerdata = (function() { var object = {}; if (options.defaults) { object.tables = ""; - object.resumable = false; + object.auto_retry = false; if ($util.Long) { var long = new $util.Long(0, 0, false); object.max_rows = options.longs === String ? long.toString() : options.longs === Number ? long.toNumber() : long; @@ -51968,8 +51968,8 @@ $root.tabletmanagerdata = (function() { } if (message.tables != null && message.hasOwnProperty("tables")) object.tables = message.tables; - if (message.resumable != null && message.hasOwnProperty("resumable")) - object.resumable = message.resumable; + if (message.auto_retry != null && message.hasOwnProperty("auto_retry")) + object.auto_retry = message.auto_retry; if (message.max_rows != null && message.hasOwnProperty("max_rows")) if (typeof message.max_rows === "number") object.max_rows = options.longs === String ? String(message.max_rows) : message.max_rows;