Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,18 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err
func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) {
log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query)
td.wgShardStreamers.Add(1)

defer func() {
log.Infof("streamOneShard End on %s", participant.tablet.Alias.String())
close(participant.result)
close(gtidch)
select {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive code to handle a panic seen in a prod cluster where the channel was closed prematurely. Unable to reproduce it using local tests though.

Copy link
Copy Markdown
Member

@frouioui frouioui Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the context is done, we do not close the the channels, are they getting close somewhere else?

case <-ctx.Done():
default:
close(participant.result)
close(gtidch)
}
td.wgShardStreamers.Done()
}()

participant.err = func() error {
conn, err := tabletconn.GetDialer()(ctx, participant.tablet, false)
if err != nil {
Expand Down
104 changes: 77 additions & 27 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,18 @@ func newWorkflowDiffer(ct *controller, opts *tabletmanagerdatapb.VDiffOptions, c
return wd, nil
}

// If the only difference is the order in which the rows were returned
// by MySQL on each side then we'll have the same number of extras on
// both sides. If that's the case, then let's see if the extra rows on
// both sides are actually different.
// reconcileExtraRows compares the extra rows in the source and target tables. If there are any matching rows, they are
// removed from the extra rows. The number of extra rows to compare is limited by vdiff option maxExtraRowsToCompare.
func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompare int64, maxReportSampleRows int64) error {
err := wd.reconcileReferenceTables(dr)
if err != nil {
return err
}

return wd.doReconcileExtraRows(dr, maxExtraRowsToCompare, maxReportSampleRows)
}

func (wd *workflowDiffer) reconcileReferenceTables(dr *DiffReport) error {
if dr.MismatchedRows == 0 {
// Get the VSchema on the target and source keyspaces. We can then use this
// for handling additional edge cases, such as adjusting results for reference
Expand Down Expand Up @@ -105,41 +112,84 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa
dr.ExtraRowsTargetDiffs = nil
}
}
return nil
}

if (dr.ExtraRowsSource == dr.ExtraRowsTarget) && (dr.ExtraRowsSource <= maxExtraRowsToCompare) {
for i := 0; i < len(dr.ExtraRowsSourceDiffs); i++ {
foundMatch := false
for j := 0; j < len(dr.ExtraRowsTargetDiffs); j++ {
if reflect.DeepEqual(dr.ExtraRowsSourceDiffs[i], dr.ExtraRowsTargetDiffs[j]) {
dr.ExtraRowsSourceDiffs = append(dr.ExtraRowsSourceDiffs[:i], dr.ExtraRowsSourceDiffs[i+1:]...)
dr.ExtraRowsTargetDiffs = append(dr.ExtraRowsTargetDiffs[:j], dr.ExtraRowsTargetDiffs[j+1:]...)
dr.ExtraRowsSource--
dr.ExtraRowsTarget--
dr.ProcessedRows--
dr.MatchingRows++
// We've removed an element from both slices at the current index
// so we need to shift the counters back as well to process the
// new elements at the index and avoid using an index out of range.
i--
j--
foundMatch = true
break
}
func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCompare int64, maxReportSampleRows int64) error {
if dr.ExtraRowsSource == 0 || dr.ExtraRowsTarget == 0 {
return nil
}
matchedSourceDiffs := make([]bool, int(dr.ExtraRowsSource))
matchedTargetDiffs := make([]bool, int(dr.ExtraRowsTarget))
matchedDiffs := int64(0)

maxRows := int(dr.ExtraRowsSource)
if maxRows > int(maxExtraRowsToCompare) {
maxRows = int(maxExtraRowsToCompare)
}
log.Infof("Reconciling extra rows for table %s in vdiff %s, extra source rows %d, extra target rows %d, max rows %d",
dr.TableName, wd.ct.uuid, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows)

// Find the matching extra rows
for i := 0; i < maxRows; i++ {
for j := 0; j < int(dr.ExtraRowsTarget); j++ {
if matchedTargetDiffs[j] {
// previously matched
continue
}
// If we didn't find a match then the tables are in fact different and we can short circuit the second pass
if !foundMatch {
if reflect.DeepEqual(dr.ExtraRowsSourceDiffs[i], dr.ExtraRowsTargetDiffs[j]) {
matchedSourceDiffs[i] = true
matchedTargetDiffs[j] = true
matchedDiffs++
break
}
}
}
// We can now trim the extra rows diffs on both sides to the maxReportSampleRows value

if matchedDiffs == 0 {
log.Infof("No matching extra rows found for table %s in vdiff %s, checked %d rows",
dr.TableName, maxRows, wd.ct.uuid)
} else {
// Now remove the matching extra rows
newExtraRowsSourceDiffs := make([]*RowDiff, 0, dr.ExtraRowsSource-matchedDiffs)
newExtraRowsTargetDiffs := make([]*RowDiff, 0, dr.ExtraRowsTarget-matchedDiffs)
for i := 0; i < int(dr.ExtraRowsSource); i++ {
if !matchedSourceDiffs[i] {
newExtraRowsSourceDiffs = append(newExtraRowsSourceDiffs, dr.ExtraRowsSourceDiffs[i])
}
if len(newExtraRowsSourceDiffs) >= maxRows {
break
}
}
for i := 0; i < int(dr.ExtraRowsTarget); i++ {
if !matchedTargetDiffs[i] {
newExtraRowsTargetDiffs = append(newExtraRowsTargetDiffs, dr.ExtraRowsTargetDiffs[i])
}
if len(newExtraRowsTargetDiffs) >= maxRows {
break
}
}
dr.ExtraRowsSourceDiffs = newExtraRowsSourceDiffs
dr.ExtraRowsTargetDiffs = newExtraRowsTargetDiffs

// Update the counts
dr.ExtraRowsSource = int64(len(dr.ExtraRowsSourceDiffs))
dr.ExtraRowsTarget = int64(len(dr.ExtraRowsTargetDiffs))
dr.MatchingRows += matchedDiffs
dr.MismatchedRows -= matchedDiffs
dr.ProcessedRows += matchedDiffs
log.Infof("Reconciled extra rows for table %s in vdiff %s, matching rows %d, extra source rows %d, extra target rows %d. Max compared rows %d",
dr.TableName, wd.ct.uuid, matchedDiffs, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows)
}

// Trim the extra rows diffs to the maxReportSampleRows value. Note we need to do this after updating
// the slices and counts above, since maxExtraRowsToCompare can be greater than maxVDiffReportSampleRows.
if int64(len(dr.ExtraRowsSourceDiffs)) > maxReportSampleRows && maxReportSampleRows > 0 {
dr.ExtraRowsSourceDiffs = dr.ExtraRowsSourceDiffs[:maxReportSampleRows-1]
}
if int64(len(dr.ExtraRowsTargetDiffs)) > maxReportSampleRows && maxReportSampleRows > 0 {
dr.ExtraRowsTargetDiffs = dr.ExtraRowsTargetDiffs[:maxReportSampleRows-1]
}

return nil
}

Expand Down
170 changes: 170 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,176 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

// TestReconcileExtraRows tests reconcileExtraRows() by providing different types of source and target slices and validating
// that the matching rows are correctly identified and removed.
func TestReconcileExtraRows(t *testing.T) {
vdenv := newTestVDiffEnv(t)
defer vdenv.close()
UUID := uuid.New()
controllerQR := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
vdiffTestCols,
vdiffTestColTypes,
),
fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, vdiffenv.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS),
)

vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil)
ct := vdenv.newController(t, controllerQR)
wd, err := newWorkflowDiffer(ct, vdiffenv.opts, collations.MySQL8())
require.NoError(t, err)

dr := &DiffReport{
TableName: "t1",
ExtraRowsSourceDiffs: []*RowDiff{},
ExtraRowsTargetDiffs: []*RowDiff{},
MismatchedRowsDiffs: nil,
}

type testCase struct {
name string
maxExtras int64
extraDiffsSource []*RowDiff
extraDiffsTarget []*RowDiff

wantExtraSource []*RowDiff
wantExtraTarget []*RowDiff

wantProcessedCount int64
wantMatchingCount int64
wantMismatchedCount int64
}

testCases := []testCase{
{
name: "no extra rows, same order",
extraDiffsSource: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"2": "c2"}},
},
extraDiffsTarget: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"2": "c2"}},
},
wantExtraSource: []*RowDiff{},
wantExtraTarget: []*RowDiff{},
},
{
name: "no extra rows, different order",
extraDiffsSource: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"2": "c2"}},
},
extraDiffsTarget: []*RowDiff{
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"1": "c1"}},
},
wantExtraSource: []*RowDiff{},
wantExtraTarget: []*RowDiff{},
},
{
name: "extra rows, same count of extras on both",
extraDiffsSource: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"3b": "c3b"}},
},
extraDiffsTarget: []*RowDiff{
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"4b": "c4b"}},
{Row: map[string]string{"1": "c1"}},
},
wantExtraSource: []*RowDiff{
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"3b": "c3b"}},
},
wantExtraTarget: []*RowDiff{
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"4b": "c4b"}},
},
},
{
name: "extra rows, less extras on target",
extraDiffsSource: []*RowDiff{
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"3b": "c3b"}},
},
extraDiffsTarget: []*RowDiff{
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"1": "c1"}},
},
wantExtraSource: []*RowDiff{
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"3b": "c3b"}},
},
wantExtraTarget: []*RowDiff{
{Row: map[string]string{"4a": "c4a"}},
},
},
{
name: "extra rows, no matching rows",
extraDiffsSource: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"3b": "c3b"}},
},
extraDiffsTarget: []*RowDiff{
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"5": "c5"}},
{Row: map[string]string{"6": "c6"}},
},
wantExtraSource: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"3b": "c3b"}},
},
wantExtraTarget: []*RowDiff{
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"5": "c5"}},
{Row: map[string]string{"6": "c6"}},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
maxExtras := int64(10)
if tc.maxExtras != 0 {
maxExtras = tc.maxExtras
}

dr.ExtraRowsSourceDiffs = tc.extraDiffsSource
dr.ExtraRowsTargetDiffs = tc.extraDiffsTarget
dr.ExtraRowsSource = int64(len(tc.extraDiffsSource))
dr.ExtraRowsTarget = int64(len(tc.extraDiffsTarget))
origExtraRowsSource := dr.ExtraRowsSource

dr.MatchingRows = 0
dr.MismatchedRows = dr.ExtraRowsSource
dr.ProcessedRows = 0

require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras))

// check counts
require.Equal(t, dr.MatchingRows, origExtraRowsSource-dr.ExtraRowsSource)
require.Equal(t, dr.ProcessedRows, dr.MatchingRows)
require.Equal(t, dr.MismatchedRows, origExtraRowsSource-dr.MatchingRows)
require.Equal(t, dr.ExtraRowsSource, int64(len(tc.wantExtraSource)))
require.Equal(t, dr.ExtraRowsTarget, int64(len(tc.wantExtraTarget)))

// check actual extra rows
require.EqualValues(t, dr.ExtraRowsSourceDiffs, tc.wantExtraSource)
require.EqualValues(t, dr.ExtraRowsTargetDiffs, tc.wantExtraTarget)
})
}
}

func TestBuildPlanSuccess(t *testing.T) {
vdenv := newTestVDiffEnv(t)
defer vdenv.close()
Expand Down
Loading