Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions go/vt/sidecardb/schema/vdiff/vdiff_log.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ limitations under the License.

CREATE TABLE IF NOT EXISTS vdiff_log
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`vdiff_id` int(11) NOT NULL,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`message` text NOT NULL,
PRIMARY KEY (`id`)
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`vdiff_id` bigint(20) NOT NULL,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`message` text NOT NULL,
PRIMARY KEY (`id`),
KEY `vdiff_id_idx` (`vdiff_id`)
) ENGINE = InnoDB CHARSET = utf8mb4
2 changes: 1 addition & 1 deletion go/vt/sidecardb/schema/vdiff/vdiff_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

CREATE TABLE IF NOT EXISTS vdiff_table
(
`vdiff_id` varchar(64) NOT NULL,
`vdiff_id` bigint(20) NOT NULL,
`table_name` varbinary(128) NOT NULL,
`state` varbinary(64) DEFAULT NULL,
`lastpk` varbinary(2000) DEFAULT NULL,
Expand Down
27 changes: 15 additions & 12 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom
if dr.ExtraRowsSource == 0 || dr.ExtraRowsTarget == 0 {
return nil
}
matchedSourceDiffs := make([]bool, int(dr.ExtraRowsSource))
matchedTargetDiffs := make([]bool, int(dr.ExtraRowsTarget))
matchedSourceDiffs := make([]bool, len(dr.ExtraRowsSourceDiffs))
matchedTargetDiffs := make([]bool, len(dr.ExtraRowsTargetDiffs))
matchedDiffs := int64(0)

maxRows := int(dr.ExtraRowsSource)
Expand All @@ -131,8 +131,8 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom
dr.TableName, wd.ct.uuid, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows)

// Find the matching extra rows
for i := 0; i < maxRows; i++ {
for j := 0; j < int(dr.ExtraRowsTarget); j++ {
for i := 0; i < len(dr.ExtraRowsSourceDiffs); i++ {
for j := 0; j < len(dr.ExtraRowsTargetDiffs); j++ {
if matchedTargetDiffs[j] {
// previously matched
continue
Expand All @@ -151,17 +151,18 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom
dr.TableName, maxRows, wd.ct.uuid)
} else {
// Now remove the matching extra rows
newExtraRowsSourceDiffs := make([]*RowDiff, 0, dr.ExtraRowsSource-matchedDiffs)
newExtraRowsTargetDiffs := make([]*RowDiff, 0, dr.ExtraRowsTarget-matchedDiffs)
for i := 0; i < int(dr.ExtraRowsSource); i++ {
newExtraRowsSourceDiffs := make([]*RowDiff, 0, int64(len(dr.ExtraRowsSourceDiffs))-matchedDiffs)
for i := 0; i < len(dr.ExtraRowsSourceDiffs); i++ {
if !matchedSourceDiffs[i] {
newExtraRowsSourceDiffs = append(newExtraRowsSourceDiffs, dr.ExtraRowsSourceDiffs[i])
}
if len(newExtraRowsSourceDiffs) >= maxRows {
break
}
}
for i := 0; i < int(dr.ExtraRowsTarget); i++ {

newExtraRowsTargetDiffs := make([]*RowDiff, 0, int64(len(dr.ExtraRowsTargetDiffs))-matchedDiffs)
for i := 0; i < len(dr.ExtraRowsTargetDiffs); i++ {
if !matchedTargetDiffs[i] {
newExtraRowsTargetDiffs = append(newExtraRowsTargetDiffs, dr.ExtraRowsTargetDiffs[i])
}
Expand All @@ -173,11 +174,13 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom
dr.ExtraRowsTargetDiffs = newExtraRowsTargetDiffs

// Update the counts
dr.ExtraRowsSource = int64(len(dr.ExtraRowsSourceDiffs))
dr.ExtraRowsTarget = int64(len(dr.ExtraRowsTargetDiffs))
dr.ExtraRowsSource -= matchedDiffs
dr.ExtraRowsTarget -= matchedDiffs
dr.MatchingRows += matchedDiffs
dr.MismatchedRows -= matchedDiffs
dr.ProcessedRows += matchedDiffs

// We do not update `ProcessedRows` here, because any extra target or source rows are already included in it.
// We do not update `MismatchedRows`, because extra target or source rows are not counted as mismatches.

log.Infof("Reconciled extra rows for table %s in vdiff %s, matching rows %d, extra source rows %d, extra target rows %d. Max compared rows %d",
dr.TableName, wd.ct.uuid, matchedDiffs, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows)
}
Expand Down
138 changes: 117 additions & 21 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

// TestReconcileExtraRows tests reconcileExtraRows() by providing different types of source and target slices and validating
// that the matching rows are correctly identified and removed.
func TestReconcileExtraRows(t *testing.T) {
Expand All @@ -54,13 +61,6 @@ func TestReconcileExtraRows(t *testing.T) {
wd, err := newWorkflowDiffer(ct, vdiffenv.opts, collations.MySQL8())
require.NoError(t, err)

dr := &DiffReport{
TableName: "t1",
ExtraRowsSourceDiffs: []*RowDiff{},
ExtraRowsTargetDiffs: []*RowDiff{},
MismatchedRowsDiffs: nil,
}

type testCase struct {
name string
maxExtras int64
Expand Down Expand Up @@ -175,35 +175,131 @@ func TestReconcileExtraRows(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dr := &DiffReport{
TableName: "t1",

ProcessedRows: 10 + max(int64(len(tc.extraDiffsSource)), int64(len(tc.extraDiffsTarget))),

MatchingRows: 10,

MismatchedRows: 0,
MismatchedRowsDiffs: nil,

ExtraRowsSource: int64(len(tc.extraDiffsSource)),
ExtraRowsSourceDiffs: tc.extraDiffsSource,

ExtraRowsTarget: int64(len(tc.extraDiffsTarget)),
ExtraRowsTargetDiffs: tc.extraDiffsTarget,
}

maxExtras := int64(10)
if tc.maxExtras != 0 {
maxExtras = tc.maxExtras
}

dr.ExtraRowsSourceDiffs = tc.extraDiffsSource
dr.ExtraRowsTargetDiffs = tc.extraDiffsTarget
dr.ExtraRowsSource = int64(len(tc.extraDiffsSource))
dr.ExtraRowsTarget = int64(len(tc.extraDiffsTarget))
origExtraRowsSource := dr.ExtraRowsSource

dr.MatchingRows = 0
dr.MismatchedRows = dr.ExtraRowsSource
dr.ProcessedRows = 0

require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras))

// check counts
require.Equal(t, dr.MatchingRows, origExtraRowsSource-dr.ExtraRowsSource)
require.Equal(t, dr.ProcessedRows, dr.MatchingRows)
require.Equal(t, dr.MismatchedRows, origExtraRowsSource-dr.MatchingRows)
require.Equal(t, dr.ExtraRowsSource, int64(len(tc.wantExtraSource)))
require.Equal(t, dr.ExtraRowsTarget, int64(len(tc.wantExtraTarget)))
// Matching rows should increase by the number of rows that we could reconcile
require.Equal(t, 10+origExtraRowsSource-dr.ExtraRowsSource, dr.MatchingRows)

// Processed rows should not change from the original value
require.Equal(t, 10+max(int64(len(tc.extraDiffsSource)), int64(len(tc.extraDiffsTarget))), dr.ProcessedRows)

// Mismatched rows should remain the same
require.Equal(t, int64(0), dr.MismatchedRows)

// Check other counts
require.Equal(t, int64(len(tc.wantExtraSource)), dr.ExtraRowsSource)
require.Equal(t, int64(len(tc.wantExtraTarget)), dr.ExtraRowsTarget)

// check actual extra rows
require.EqualValues(t, dr.ExtraRowsSourceDiffs, tc.wantExtraSource)
require.EqualValues(t, dr.ExtraRowsTargetDiffs, tc.wantExtraTarget)
})
}

t.Run("with `ExtraRowsSource` larger than `extraDiffsSource`", func(t *testing.T) {
dr := &DiffReport{
TableName: "t1",

// The max number of rows loaded on the source or the target
ProcessedRows: 6,

MismatchedRows: 0,
MismatchedRowsDiffs: nil,

// Simulate having hit `maxExtraRowsToCompare` / having found more rows on the source
ExtraRowsSource: 6,
ExtraRowsSourceDiffs: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"3b": "c3b"}},
},

ExtraRowsTarget: 4,
ExtraRowsTargetDiffs: []*RowDiff{
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"4b": "c4b"}},
{Row: map[string]string{"1": "c1"}},
},
}

maxExtras := int64(4)
require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras))

// Verify that reconciliation does not change the number of processed or mismatched rows
require.Equal(t, int64(6), dr.ProcessedRows)
require.Equal(t, int64(0), dr.MismatchedRows)

require.Equal(t, int64(4), dr.ExtraRowsSource)
require.Equal(t, int64(2), dr.ExtraRowsTarget)

require.Equal(t, int64(2), dr.MatchingRows)
})
t.Run("with `ExtraRowsTarget` larger than `extraDiffsTarget`", func(t *testing.T) {
dr := &DiffReport{
TableName: "t1",

// The max number of rows loaded on the source or the target
ProcessedRows: 6,

MismatchedRows: 0,
MismatchedRowsDiffs: nil,

ExtraRowsSource: 4,
ExtraRowsSourceDiffs: []*RowDiff{
{Row: map[string]string{"1": "c1"}},
{Row: map[string]string{"3a": "c3a"}},
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"3b": "c3b"}},
},

// Simulate having hit `maxExtraRowsToCompare` / having found more rows on the target
ExtraRowsTarget: 6,
ExtraRowsTargetDiffs: []*RowDiff{
{Row: map[string]string{"2": "c2"}},
{Row: map[string]string{"4a": "c4a"}},
{Row: map[string]string{"4b": "c4b"}},
{Row: map[string]string{"1": "c1"}},
},
}

maxExtras := int64(4)
require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras))

// Verify that reconciliation does not change the number of processed or mismatched rows
require.Equal(t, int64(6), dr.ProcessedRows)
require.Equal(t, int64(0), dr.MismatchedRows)

require.Equal(t, int64(2), dr.ExtraRowsSource)
require.Equal(t, int64(4), dr.ExtraRowsTarget)

require.Equal(t, int64(2), dr.MatchingRows)
})
}

func TestBuildPlanSuccess(t *testing.T) {
Expand Down
Loading