diff --git a/go/vt/sidecardb/schema/vdiff/vdiff_log.sql b/go/vt/sidecardb/schema/vdiff/vdiff_log.sql index bedb7824295..7912abe823a 100644 --- a/go/vt/sidecardb/schema/vdiff/vdiff_log.sql +++ b/go/vt/sidecardb/schema/vdiff/vdiff_log.sql @@ -16,9 +16,10 @@ limitations under the License. CREATE TABLE IF NOT EXISTS vdiff_log ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `vdiff_id` int(11) NOT NULL, - `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - `message` text NOT NULL, - PRIMARY KEY (`id`) + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `vdiff_id` bigint(20) NOT NULL, + `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `message` text NOT NULL, + PRIMARY KEY (`id`), + KEY `vdiff_id_idx` (`vdiff_id`) ) ENGINE = InnoDB CHARSET = utf8mb4 diff --git a/go/vt/sidecardb/schema/vdiff/vdiff_table.sql b/go/vt/sidecardb/schema/vdiff/vdiff_table.sql index 2296398e430..20fda26b81e 100644 --- a/go/vt/sidecardb/schema/vdiff/vdiff_table.sql +++ b/go/vt/sidecardb/schema/vdiff/vdiff_table.sql @@ -16,7 +16,7 @@ limitations under the License. CREATE TABLE IF NOT EXISTS vdiff_table ( - `vdiff_id` varchar(64) NOT NULL, + `vdiff_id` bigint(20) NOT NULL, `table_name` varbinary(128) NOT NULL, `state` varbinary(64) DEFAULT NULL, `lastpk` varbinary(2000) DEFAULT NULL, diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index ab78f961dc1..ea52a972d1c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -119,8 +119,8 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom if dr.ExtraRowsSource == 0 || dr.ExtraRowsTarget == 0 { return nil } - matchedSourceDiffs := make([]bool, int(dr.ExtraRowsSource)) - matchedTargetDiffs := make([]bool, int(dr.ExtraRowsTarget)) + matchedSourceDiffs := make([]bool, len(dr.ExtraRowsSourceDiffs)) + matchedTargetDiffs := make([]bool, len(dr.ExtraRowsTargetDiffs)) matchedDiffs := int64(0) maxRows := int(dr.ExtraRowsSource) @@ -131,8 +131,8 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom dr.TableName, wd.ct.uuid, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows) // Find the matching extra rows - for i := 0; i < maxRows; i++ { - for j := 0; j < int(dr.ExtraRowsTarget); j++ { + for i := 0; i < len(dr.ExtraRowsSourceDiffs); i++ { + for j := 0; j < len(dr.ExtraRowsTargetDiffs); j++ { if matchedTargetDiffs[j] { // previously matched continue @@ -151,9 +151,8 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom dr.TableName, maxRows, wd.ct.uuid) } else { // Now remove the matching extra rows - newExtraRowsSourceDiffs := make([]*RowDiff, 0, dr.ExtraRowsSource-matchedDiffs) - newExtraRowsTargetDiffs := make([]*RowDiff, 0, dr.ExtraRowsTarget-matchedDiffs) - for i := 0; i < int(dr.ExtraRowsSource); i++ { + newExtraRowsSourceDiffs := make([]*RowDiff, 0, int64(len(dr.ExtraRowsSourceDiffs))-matchedDiffs) + for i := 0; i < len(dr.ExtraRowsSourceDiffs); i++ { if !matchedSourceDiffs[i] { newExtraRowsSourceDiffs = append(newExtraRowsSourceDiffs, dr.ExtraRowsSourceDiffs[i]) } @@ -161,7 +160,9 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom break } } - for i := 0; i < int(dr.ExtraRowsTarget); i++ { + + newExtraRowsTargetDiffs := make([]*RowDiff, 0, int64(len(dr.ExtraRowsTargetDiffs))-matchedDiffs) + for i := 0; i < len(dr.ExtraRowsTargetDiffs); i++ { if !matchedTargetDiffs[i] { newExtraRowsTargetDiffs = append(newExtraRowsTargetDiffs, dr.ExtraRowsTargetDiffs[i]) } @@ -173,11 +174,13 @@ func (wd *workflowDiffer) doReconcileExtraRows(dr *DiffReport, maxExtraRowsToCom dr.ExtraRowsTargetDiffs = newExtraRowsTargetDiffs // Update the counts - dr.ExtraRowsSource = int64(len(dr.ExtraRowsSourceDiffs)) - dr.ExtraRowsTarget = int64(len(dr.ExtraRowsTargetDiffs)) + dr.ExtraRowsSource -= matchedDiffs + dr.ExtraRowsTarget -= matchedDiffs dr.MatchingRows += matchedDiffs - dr.MismatchedRows -= matchedDiffs - dr.ProcessedRows += matchedDiffs + + // We do not update `ProcessedRows` here, because any extra target or source rows are already included in it. + // We do not update `MismatchedRows`, because extra target or source rows are not counted as mismatches. + log.Infof("Reconciled extra rows for table %s in vdiff %s, matching rows %d, extra source rows %d, extra target rows %d. Max compared rows %d", dr.TableName, wd.ct.uuid, matchedDiffs, dr.ExtraRowsSource, dr.ExtraRowsTarget, maxRows) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index 97dd4406b3b..a5fd5d6035d 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -36,6 +36,13 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" ) +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + // TestReconcileExtraRows tests reconcileExtraRows() by providing different types of source and target slices and validating // that the matching rows are correctly identified and removed. func TestReconcileExtraRows(t *testing.T) { @@ -54,13 +61,6 @@ func TestReconcileExtraRows(t *testing.T) { wd, err := newWorkflowDiffer(ct, vdiffenv.opts, collations.MySQL8()) require.NoError(t, err) - dr := &DiffReport{ - TableName: "t1", - ExtraRowsSourceDiffs: []*RowDiff{}, - ExtraRowsTargetDiffs: []*RowDiff{}, - MismatchedRowsDiffs: nil, - } - type testCase struct { name string maxExtras int64 @@ -175,35 +175,131 @@ func TestReconcileExtraRows(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + dr := &DiffReport{ + TableName: "t1", + + ProcessedRows: 10 + max(int64(len(tc.extraDiffsSource)), int64(len(tc.extraDiffsTarget))), + + MatchingRows: 10, + + MismatchedRows: 0, + MismatchedRowsDiffs: nil, + + ExtraRowsSource: int64(len(tc.extraDiffsSource)), + ExtraRowsSourceDiffs: tc.extraDiffsSource, + + ExtraRowsTarget: int64(len(tc.extraDiffsTarget)), + ExtraRowsTargetDiffs: tc.extraDiffsTarget, + } + maxExtras := int64(10) if tc.maxExtras != 0 { maxExtras = tc.maxExtras } - dr.ExtraRowsSourceDiffs = tc.extraDiffsSource - dr.ExtraRowsTargetDiffs = tc.extraDiffsTarget - dr.ExtraRowsSource = int64(len(tc.extraDiffsSource)) - dr.ExtraRowsTarget = int64(len(tc.extraDiffsTarget)) origExtraRowsSource := dr.ExtraRowsSource - dr.MatchingRows = 0 - dr.MismatchedRows = dr.ExtraRowsSource - dr.ProcessedRows = 0 - require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras)) - // check counts - require.Equal(t, dr.MatchingRows, origExtraRowsSource-dr.ExtraRowsSource) - require.Equal(t, dr.ProcessedRows, dr.MatchingRows) - require.Equal(t, dr.MismatchedRows, origExtraRowsSource-dr.MatchingRows) - require.Equal(t, dr.ExtraRowsSource, int64(len(tc.wantExtraSource))) - require.Equal(t, dr.ExtraRowsTarget, int64(len(tc.wantExtraTarget))) + // Matching rows should increase by the number of rows that we could reconcile + require.Equal(t, 10+origExtraRowsSource-dr.ExtraRowsSource, dr.MatchingRows) + + // Processed rows should not change from the original value + require.Equal(t, 10+max(int64(len(tc.extraDiffsSource)), int64(len(tc.extraDiffsTarget))), dr.ProcessedRows) + + // Mismatched rows should remain the same + require.Equal(t, int64(0), dr.MismatchedRows) + + // Check other counts + require.Equal(t, int64(len(tc.wantExtraSource)), dr.ExtraRowsSource) + require.Equal(t, int64(len(tc.wantExtraTarget)), dr.ExtraRowsTarget) // check actual extra rows require.EqualValues(t, dr.ExtraRowsSourceDiffs, tc.wantExtraSource) require.EqualValues(t, dr.ExtraRowsTargetDiffs, tc.wantExtraTarget) }) } + + t.Run("with `ExtraRowsSource` larger than `extraDiffsSource`", func(t *testing.T) { + dr := &DiffReport{ + TableName: "t1", + + // The max number of rows loaded on the source or the target + ProcessedRows: 6, + + MismatchedRows: 0, + MismatchedRowsDiffs: nil, + + // Simulate having hit `maxExtraRowsToCompare` / having found more rows on the source + ExtraRowsSource: 6, + ExtraRowsSourceDiffs: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + + ExtraRowsTarget: 4, + ExtraRowsTargetDiffs: []*RowDiff{ + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"4b": "c4b"}}, + {Row: map[string]string{"1": "c1"}}, + }, + } + + maxExtras := int64(4) + require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras)) + + // Verify that reconciliation does not change the number of processed or mismatched rows + require.Equal(t, int64(6), dr.ProcessedRows) + require.Equal(t, int64(0), dr.MismatchedRows) + + require.Equal(t, int64(4), dr.ExtraRowsSource) + require.Equal(t, int64(2), dr.ExtraRowsTarget) + + require.Equal(t, int64(2), dr.MatchingRows) + }) + t.Run("with `ExtraRowsTarget` larger than `extraDiffsTarget`", func(t *testing.T) { + dr := &DiffReport{ + TableName: "t1", + + // The max number of rows loaded on the source or the target + ProcessedRows: 6, + + MismatchedRows: 0, + MismatchedRowsDiffs: nil, + + ExtraRowsSource: 4, + ExtraRowsSourceDiffs: []*RowDiff{ + {Row: map[string]string{"1": "c1"}}, + {Row: map[string]string{"3a": "c3a"}}, + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"3b": "c3b"}}, + }, + + // Simulate having hit `maxExtraRowsToCompare` / having found more rows on the target + ExtraRowsTarget: 6, + ExtraRowsTargetDiffs: []*RowDiff{ + {Row: map[string]string{"2": "c2"}}, + {Row: map[string]string{"4a": "c4a"}}, + {Row: map[string]string{"4b": "c4b"}}, + {Row: map[string]string{"1": "c1"}}, + }, + } + + maxExtras := int64(4) + require.NoError(t, wd.doReconcileExtraRows(dr, maxExtras, maxExtras)) + + // Verify that reconciliation does not change the number of processed or mismatched rows + require.Equal(t, int64(6), dr.ProcessedRows) + require.Equal(t, int64(0), dr.MismatchedRows) + + require.Equal(t, int64(2), dr.ExtraRowsSource) + require.Equal(t, int64(4), dr.ExtraRowsTarget) + + require.Equal(t, int64(2), dr.MatchingRows) + }) } func TestBuildPlanSuccess(t *testing.T) {