diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 1904a8e97a5..9f73c771025 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -398,12 +398,18 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) { log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query) td.wgShardStreamers.Add(1) + defer func() { log.Infof("streamOneShard End on %s", participant.tablet.Alias.String()) - close(participant.result) - close(gtidch) + select { + case <-ctx.Done(): + default: + close(participant.result) + close(gtidch) + } td.wgShardStreamers.Done() }() + participant.err = func() error { conn, err := tabletconn.GetDialer()(ctx, participant.tablet, false) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 775ae1d9179..ab78f961dc1 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -70,11 +70,18 @@ func newWorkflowDiffer(ct *controller, opts *tabletmanagerdatapb.VDiffOptions, c return wd, nil } -// If the only difference is the order in which the rows were returned -// by MySQL on each side then we'll have the same number of extras on -// both sides. If that's the case, then let's see if the extra rows on -// both sides are actually different. +// reconcileExtraRows compares the extra rows in the source and target tables. If there are any matching rows, they are +// removed from the extra rows. The number of extra rows to compare is limited by vdiff option maxExtraRowsToCompare. func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompare int64, maxReportSampleRows int64) error { + err := wd.reconcileReferenceTables(dr) + if err != nil { + return err + } + + return wd.doReconcileExtraRows(dr, maxExtraRowsToCompare, maxReportSampleRows) +} + +func (wd *workflowDiffer) reconcileReferenceTables(dr *DiffReport) error { if dr.MismatchedRows == 0 { // Get the VSchema on the target and source keyspaces. We can then use this // for handling additional edge cases, such as adjusting results for reference @@ -105,41 +112,84 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa dr.ExtraRowsTargetDiffs = nil } } + return nil +} - if (dr.ExtraRowsSource == dr.ExtraRowsTarget) && (dr.ExtraRowsSource <= maxExtraRowsToCompare) { - for i := 0; i < len(dr.ExtraRowsSourceDiffs); i++ { - foundMatch := false - for j := 0; j < len(dr.ExtraRowsTargetDiffs); j++ { - if reflect.DeepEqual(dr.ExtraRowsSourceDiffs[i], dr.ExtraRowsTargetDiffs[j]) { - dr.ExtraRowsSourceDiffs = append(dr.ExtraRowsSourceDiffs[:i], dr.ExtraRowsSourceDiffs[i+1:]...) - dr.ExtraRowsTargetDiffs = append(dr.ExtraRowsTargetDiffs[:j], dr.ExtraRowsTargetDiffs[j+1:]...) - dr.ExtraRowsSource-- - dr.ExtraRowsTarget-- - dr.ProcessedRows-- - dr.MatchingRows++ - // We've removed an element from both slices at the current index - // so we need to shift the counters back as well to process the - // new elements at the index and avoid using an index out of range. - i-- - j-- - foundMatch = true - break - } +func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCompare int64, maxReportSampleRows int64) error { + if dr.ExtraRowsSource == 0 || dr.ExtraRowsTarget == 0 { + return nil + } + matchedSourceDiffs := make([]bool, int(dr.ExtraRowsSource)) + matchedTargetDiffs := make([]bool, int(dr.ExtraRowsTarget)) + matchedDiffs := int64(0) + + maxRows := int(dr.ExtraRowsSource) + if maxRows > int(maxExtraRowsToCompare) { + maxRows = int(maxExtraRowsToCompare) + } + log.Infof("Reconciling extra rows for table %s in vdiff %s, extra source rows %d, extra target rows %d, max rows %d", + dr.TableName, wd.ct.uuid, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows) + + // Find the matching extra rows + for i := 0; i < maxRows; i++ { + for j := 0; j < int(dr.ExtraRowsTarget); j++ { + if matchedTargetDiffs[j] { + // previously matched + continue } - // If we didn't find a match then the tables are in fact different and we can short circuit the second pass - if !foundMatch { + if reflect.DeepEqual(dr.ExtraRowsSourceDiffs[i], dr.ExtraRowsTargetDiffs[j]) { + matchedSourceDiffs[i] = true + matchedTargetDiffs[j] = true + matchedDiffs++ break } } } - // We can now trim the extra rows diffs on both sides to the maxReportSampleRows value + + if matchedDiffs == 0 { + log.Infof("No matching extra rows found for table %s in vdiff %s, checked %d rows", + dr.TableName, maxRows, wd.ct.uuid) + } else { + // Now remove the matching extra rows + newExtraRowsSourceDiffs := make([]*RowDiff, 0, dr.ExtraRowsSource-matchedDiffs) + newExtraRowsTargetDiffs := make([]*RowDiff, 0, dr.ExtraRowsTarget-matchedDiffs) + for i := 0; i < int(dr.ExtraRowsSource); i++ { + if !matchedSourceDiffs[i] { + newExtraRowsSourceDiffs = append(newExtraRowsSourceDiffs, dr.ExtraRowsSourceDiffs[i]) + } + if len(newExtraRowsSourceDiffs) >= maxRows { + break + } + } + for i := 0; i < int(dr.ExtraRowsTarget); i++ { + if !matchedTargetDiffs[i] { + newExtraRowsTargetDiffs = append(newExtraRowsTargetDiffs, dr.ExtraRowsTargetDiffs[i]) + } + if len(newExtraRowsTargetDiffs) >= maxRows { + break + } + } + dr.ExtraRowsSourceDiffs = newExtraRowsSourceDiffs + dr.ExtraRowsTargetDiffs = newExtraRowsTargetDiffs + + // Update the counts + dr.ExtraRowsSource = int64(len(dr.ExtraRowsSourceDiffs)) + dr.ExtraRowsTarget = int64(len(dr.ExtraRowsTargetDiffs)) + dr.MatchingRows += matchedDiffs + dr.MismatchedRows -= matchedDiffs + dr.ProcessedRows += matchedDiffs + log.Infof("Reconciled extra rows for table %s in vdiff %s, matching rows %d, extra source rows %d, extra target rows %d. Max compared rows %d", + dr.TableName, wd.ct.uuid, matchedDiffs, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows) + } + + // Trim the extra rows diffs to the maxReportSampleRows value. Note we need to do this after updating + // the slices and counts above, since maxExtraRowsToCompare can be greater than maxVDiffReportSampleRows. if int64(len(dr.ExtraRowsSourceDiffs)) > maxReportSampleRows && maxReportSampleRows > 0 { dr.ExtraRowsSourceDiffs = dr.ExtraRowsSourceDiffs[:maxReportSampleRows-1] } if int64(len(dr.ExtraRowsTargetDiffs)) > maxReportSampleRows && maxReportSampleRows > 0 { dr.ExtraRowsTargetDiffs = dr.ExtraRowsTargetDiffs[:maxReportSampleRows-1] } - return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index 9d6f4050a64..97dd4406b3b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -36,6 +36,176 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" ) +// TestReconcileExtraRows tests reconcileExtraRows() by providing different types of source and target slices and validating +// that the matching rows are correctly identified and removed. +func TestReconcileExtraRows(t *testing.T) { + vdenv := newTestVDiffEnv(t) + defer vdenv.close() + UUID := uuid.New() + controllerQR := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, vdiffenv.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS), + ) + + vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil) + ct := vdenv.newController(t, controllerQR) + wd, err := newWorkflowDiffer(ct, vdiffenv.opts, collations.MySQL8()) + require.NoError(t, err) + + dr := &DiffReport{ + TableName: "t1", + ExtraRowsSourceDiffs: []*RowDiff{}, + ExtraRowsTargetDiffs: []*RowDiff{}, + MismatchedRowsDiffs: nil, + } + + type testCase struct { + name string + maxExtras int64 + extraDiffsSource []*RowDiff + extraDiffsTarget []*RowDiff + + wantExtraSource []*RowDiff + wantExtraTarget []*RowDiff + + wantProcessedCount int64 + wantMatchingCount int64 + wantMismatchedCount int64 + } + + testCases := []testCase{ + { + name: "no extra rows, same order", + extraDiffsSource: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"2": "c2"}}, + }, + extraDiffsTarget: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"2": "c2"}}, + }, + wantExtraSource: []*RowDiff{}, + wantExtraTarget: []*RowDiff{}, + }, + { + name: "no extra rows, different order", + extraDiffsSource: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"2": "c2"}}, + }, + extraDiffsTarget: []*RowDiff{ + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"1": "c1"}}, + }, + wantExtraSource: []*RowDiff{}, + wantExtraTarget: []*RowDiff{}, + }, + { + name: "extra rows, same count of extras on both", + extraDiffsSource: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + extraDiffsTarget: []*RowDiff{ + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"4b": "c4b"}}, + {Row: map[string]string{"1": "c1"}}, + }, + wantExtraSource: []*RowDiff{ + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + wantExtraTarget: []*RowDiff{ + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"4b": "c4b"}}, + }, + }, + { + name: "extra rows, less extras on target", + extraDiffsSource: []*RowDiff{ + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + extraDiffsTarget: []*RowDiff{ + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"1": "c1"}}, + }, + wantExtraSource: []*RowDiff{ + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + wantExtraTarget: []*RowDiff{ + {Row: map[string]string{"4a": "c4a"}}, + }, + }, + { + name: "extra rows, no matching rows", + extraDiffsSource: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + extraDiffsTarget: []*RowDiff{ + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"5": "c5"}}, + {Row: map[string]string{"6": "c6"}}, + }, + wantExtraSource: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + wantExtraTarget: []*RowDiff{ + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"5": "c5"}}, + {Row: map[string]string{"6": "c6"}}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + maxExtras := int64(10) + if tc.maxExtras != 0 { + maxExtras = tc.maxExtras + } + + dr.ExtraRowsSourceDiffs = tc.extraDiffsSource + dr.ExtraRowsTargetDiffs = tc.extraDiffsTarget + dr.ExtraRowsSource = int64(len(tc.extraDiffsSource)) + dr.ExtraRowsTarget = int64(len(tc.extraDiffsTarget)) + origExtraRowsSource := dr.ExtraRowsSource + + dr.MatchingRows = 0 + dr.MismatchedRows = dr.ExtraRowsSource + dr.ProcessedRows = 0 + + require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras)) + + // check counts + require.Equal(t, dr.MatchingRows, origExtraRowsSource-dr.ExtraRowsSource) + require.Equal(t, dr.ProcessedRows, dr.MatchingRows) + require.Equal(t, dr.MismatchedRows, origExtraRowsSource-dr.MatchingRows) + require.Equal(t, dr.ExtraRowsSource, int64(len(tc.wantExtraSource))) + require.Equal(t, dr.ExtraRowsTarget, int64(len(tc.wantExtraTarget))) + + // check actual extra rows + require.EqualValues(t, dr.ExtraRowsSourceDiffs, tc.wantExtraSource) + require.EqualValues(t, dr.ExtraRowsTargetDiffs, tc.wantExtraTarget) + }) + } +} + func TestBuildPlanSuccess(t *testing.T) { vdenv := newTestVDiffEnv(t) defer vdenv.close()