diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 3b377b1e316..3fa3976e389 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -569,7 +569,7 @@ func vdiff(t *testing.T, workflow, cells string) { t.Logf("vdiff err: %+v, output: %+v", err, output) require.Nil(t, err) require.NotNil(t, output) - diffReports := make([]*wrangler.DiffReport, 0) + diffReports := make(map[string]*wrangler.DiffReport) err = json.Unmarshal([]byte(output), &diffReports) require.Nil(t, err) if len(diffReports) < 1 { diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 802eba81058..3df51163c84 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2335,6 +2335,8 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Tablet types for source and target") filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be cancelled on a timeout.") maxRows := subFlags.Int64("limit", math.MaxInt64, "Max rows to stop comparing after") + debugQuery := subFlags.Bool("debug_query", false, "Adds a mysql query to the report that can be used for further debugging") + onlyPks := subFlags.Bool("only_pks", false, "When reporting missing rows, only show primary keys in the report.") format := subFlags.String("format", "", "Format of report") //"json" or "" tables := subFlags.String("tables", "", "Only run vdiff for these tables in the workflow") if err := subFlags.Parse(args); err != nil { @@ -2352,7 +2354,7 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla return fmt.Errorf("maximum number of rows to compare needs to be greater than 0") } _, err = wr. - VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *format, *maxRows, *tables) + VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *format, *maxRows, *tables, *debugQuery, *onlyPks) if err != nil { log.Errorf("vdiff returning with error: %v", err) if strings.Contains(err.Error(), "context deadline exceeded") { diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index bfa46b2e5dc..e08a1b6cb01 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -21,29 +21,29 @@ import ( "context" "encoding/json" "fmt" + "sort" "strings" "sync" "time" - "vitess.io/vitess/go/vt/vtgate/evalengine" - - "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/log" - "github.com/golang/protobuf/proto" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" @@ -55,11 +55,26 @@ import ( // DiffReport is the summary of differences for one table. type DiffReport struct { - ProcessedRows int - MatchingRows int - MismatchedRows int - ExtraRowsSource int - ExtraRowsTarget int + ProcessedRows int + MatchingRows int + MismatchedRows int + ExtraRowsSource int + ExtraRowsSourceSample []*RowDiff + ExtraRowsTarget int + ExtraRowsTargetSample []*RowDiff + MismatchedRowsSample []*DiffMismatch +} + +// DiffMismatch is a sample of row diffs between source and target. +type DiffMismatch struct { + Source *RowDiff + Target *RowDiff +} + +// RowDiff is a row that didn't match as part of the comparison. +type RowDiff struct { + Row map[string]sqltypes.Value + Query string } // vdiff contains the metadata for performing vdiff for one workflow. @@ -106,8 +121,10 @@ type tableDiffer struct { // pkCols has the indices of PK cols in the select list pkCols []int - // sourcePrimitive and targetPrimitive are used for streaming - // results from source and target. + // selectPks is the list of pk columns as they appear in the select clause for the diff. + selectPks []int + + // source Primitive and targetPrimitive are used for streaming sourcePrimitive engine.Primitive targetPrimitive engine.Primitive } @@ -130,7 +147,7 @@ type shardStreamer struct { // VDiff reports differences between the sources and targets of a vreplication workflow. func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sourceCell, targetCell, tabletTypesStr string, - filteredReplicationWaitTime time.Duration, format string, maxRows int64, tables string) (map[string]*DiffReport, error) { + filteredReplicationWaitTime time.Duration, format string, maxRows int64, tables string, debug, onlyPks bool) (map[string]*DiffReport, error) { log.Infof("Starting VDiff for %s.%s, sourceCell %s, targetCell %s, tabletTypes %s, timeout %s", targetKeyspace, workflowName, sourceCell, targetCell, tabletTypesStr, filteredReplicationWaitTime.String()) // Assign defaults to sourceCell and targetCell if not specified. @@ -229,26 +246,43 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou return nil, err } // Perform the diff of source and target streams. - dr, err := td.diff(ctx, df.ts.wr, &rowsToCompare) + dr, err := td.diff(ctx, df.ts.wr, &rowsToCompare, debug, onlyPks) if err != nil { return nil, vterrors.Wrap(err, "diff") } - if format == "json" { - json, err := json.MarshalIndent(*dr, "", "") - if err != nil { - wr.Logger().Printf("Error converting report to json: %v", err.Error()) + diffReports[table] = dr + } + if format == "json" { + json, err := json.MarshalIndent(diffReports, "", "") + if err != nil { + wr.Logger().Printf("Error converting report to json: %v", err.Error()) + } + jsonOutput += fmt.Sprintf("%s", json) + wr.logger.Printf("%s", jsonOutput) + } else { + for table, dr := range diffReports { + wr.Logger().Printf("Summary for table %v:\n", table) + wr.Logger().Printf("\tProcessedRows: %v\n", dr.ProcessedRows) + wr.Logger().Printf("\tMatchingRows: %v\n", dr.MatchingRows) + wr.Logger().Printf("\tMismatchedRows: %v\n", dr.MismatchedRows) + wr.Logger().Printf("\tExtraRowsSource: %v\n", dr.ExtraRowsSource) + wr.Logger().Printf("\tExtraRowsTarget: %v\n", dr.ExtraRowsTarget) + for i, rs := range dr.ExtraRowsSourceSample { + wr.Logger().Printf("\tSample extra row in source %v:\n", i) + formatSampleRow(wr.Logger(), rs, debug) } - if jsonOutput != "" { - jsonOutput += "," + for i, rs := range dr.ExtraRowsTargetSample { + wr.Logger().Printf("\tSample extra row in target %v:\n", i) + formatSampleRow(wr.Logger(), rs, debug) + } + for i, rs := range dr.MismatchedRowsSample { + wr.Logger().Printf("\tSample rows with mismatch %v:\n", i) + wr.Logger().Printf("\t\tSource row:\n") + formatSampleRow(wr.Logger(), rs.Source, debug) + wr.Logger().Printf("\t\tTarget row:\n") + formatSampleRow(wr.Logger(), rs.Target, debug) } - jsonOutput += fmt.Sprintf("%s", json) - } else { - wr.Logger().Printf("Summary for %v: %+v\n", td.targetTable, *dr) } - diffReports[table] = dr - } - if format == "json" && jsonOutput != "" { - wr.logger.Printf(`[ %s ]`, jsonOutput) } return diffReports, nil } @@ -358,6 +392,7 @@ func findPKs(table *tabletmanagerdatapb.TableDefinition, targetSelect *sqlparser if strings.EqualFold(pk, colname) { td.compareCols[i].isPK = true td.comparePKs = append(td.comparePKs, td.compareCols[i]) + td.selectPks = append(td.selectPks, i) // We'll be comparing pks separately. So, remove them from compareCols. td.pkCols = append(td.pkCols, i) found = true @@ -862,7 +897,7 @@ func humanInt(n int64) string { //----------------------------------------------------------------- // tableDiffer -func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *int64) (*DiffReport, error) { +func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *int64, debug, onlyPks bool) (*DiffReport, error) { sourceExecutor := newPrimitiveExecutor(ctx, td.sourcePrimitive) targetExecutor := newPrimitiveExecutor(ctx, td.targetPrimitive) dr := &DiffReport{} @@ -900,8 +935,13 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in advanceTarget = true if sourceRow == nil { + diffRow, err := td.genRowDiff(td.sourceExpression, targetRow, debug, onlyPks) + if err != nil { + return nil, vterrors.Wrap(err, "unexpected error generating diff") + } + dr.ExtraRowsTargetSample = append(dr.ExtraRowsTargetSample, diffRow) + // drain target, update count - wr.Logger().Errorf("Draining extra row(s) found on the target starting with: %v", targetRow) count, err := targetExecutor.drain(ctx) if err != nil { return nil, err @@ -913,7 +953,12 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in if targetRow == nil { // no more rows from the target // we know we have rows from source, drain, update count - wr.Logger().Warningf("Draining extra row(s) found on the source starting with: %v", sourceRow) + diffRow, err := td.genRowDiff(td.sourceExpression, sourceRow, debug, onlyPks) + if err != nil { + return nil, vterrors.Wrap(err, "unexpected error generating diff") + } + dr.ExtraRowsSourceSample = append(dr.ExtraRowsTargetSample, diffRow) + count, err := sourceExecutor.drain(ctx) if err != nil { return nil, err @@ -932,14 +977,22 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in return nil, err case c < 0: if dr.ExtraRowsSource < 10 { - wr.Logger().Errorf("[table=%v] Extra row %v on source: %v", td.targetTable, dr.ExtraRowsSource, sourceRow) + diffRow, err := td.genRowDiff(td.sourceExpression, sourceRow, debug, onlyPks) + if err != nil { + return nil, vterrors.Wrap(err, "unexpected error generating diff") + } + dr.ExtraRowsSourceSample = append(dr.ExtraRowsTargetSample, diffRow) } dr.ExtraRowsSource++ advanceTarget = false continue case c > 0: if dr.ExtraRowsTarget < 10 { - wr.Logger().Errorf("[table=%v] Extra row %v on target: %v", td.targetTable, dr.ExtraRowsTarget, targetRow) + diffRow, err := td.genRowDiff(td.targetExpression, targetRow, debug, onlyPks) + if err != nil { + return nil, vterrors.Wrap(err, "unexpected error generating diff") + } + dr.ExtraRowsTargetSample = append(dr.ExtraRowsTargetSample, diffRow) } dr.ExtraRowsTarget++ advanceSource = false @@ -954,7 +1007,15 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in return nil, err case c != 0: if dr.MismatchedRows < 10 { - wr.Logger().Errorf("[table=%v] Different content %v in same PK: %v != %v", td.targetTable, dr.MismatchedRows, sourceRow, targetRow) + sourceDiffRow, err := td.genRowDiff(td.targetExpression, sourceRow, debug, onlyPks) + if err != nil { + return nil, vterrors.Wrap(err, "unexpected error generating diff") + } + targetDiffRow, err := td.genRowDiff(td.targetExpression, targetRow, debug, onlyPks) + if err != nil { + return nil, vterrors.Wrap(err, "unexpected error generating diff") + } + dr.MismatchedRowsSample = append(dr.MismatchedRowsSample, &DiffMismatch{Source: sourceDiffRow, Target: targetDiffRow}) } dr.MismatchedRows++ default: @@ -993,6 +1054,72 @@ func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []com return 0, nil } +func (td *tableDiffer) genRowDiff(queryStmt string, row []sqltypes.Value, debug, onlyPks bool) (*RowDiff, error) { + drp := &RowDiff{} + drp.Row = make(map[string]sqltypes.Value) + statement, err := sqlparser.Parse(queryStmt) + if err != nil { + return nil, err + } + sel, ok := statement.(*sqlparser.Select) + if !ok { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + } + + if debug { + drp.Query = td.genDebugQueryDiff(sel, row, onlyPks) + } + + if onlyPks { + for _, pkI := range td.selectPks { + buf := sqlparser.NewTrackedBuffer(nil) + sel.SelectExprs[pkI].Format(buf) + col := buf.String() + drp.Row[col] = row[pkI] + } + return drp, nil + } + + for i := range sel.SelectExprs { + buf := sqlparser.NewTrackedBuffer(nil) + sel.SelectExprs[i].Format(buf) + col := buf.String() + drp.Row[col] = row[i] + } + + return drp, nil +} + +func (td *tableDiffer) genDebugQueryDiff(sel *sqlparser.Select, row []sqltypes.Value, onlyPks bool) string { + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select ") + + if onlyPks { + for i, pkI := range td.selectPks { + pk := sel.SelectExprs[pkI] + pk.Format(buf) + if i != len(td.selectPks)-1 { + buf.Myprintf(", ") + } + } + } else { + sel.SelectExprs.Format(buf) + } + buf.Myprintf(" from ") + sel.From.Format(buf) + buf.Myprintf(" where ") + for i, pkI := range td.selectPks { + sel.SelectExprs[pkI].Format(buf) + buf.Myprintf("=") + row[pkI].EncodeSQL(buf) + if i != len(td.selectPks)-1 { + buf.Myprintf(" AND ") + } + } + buf.Myprintf(";") + return buf.String() +} + //----------------------------------------------------------------- // contextVCursor @@ -1055,3 +1182,35 @@ func wrapWeightString(expr sqlparser.SelectExpr) *sqlparser.AliasedExpr { }, } } + +func formatSampleRow(logger logutil.Logger, rd *RowDiff, debug bool) { + keys := make([]string, 0, len(rd.Row)) + for k := range rd.Row { + keys = append(keys, k) + } + + sort.Strings(keys) + + for _, k := range keys { + logger.Printf("\t\t\t %s: %s\n", k, formatValue(rd.Row[k])) + } + + if debug { + logger.Printf("\t\tDebugQuery: %v\n", rd.Query) + } +} + +func formatValue(val sqltypes.Value) string { + if val.Type() == sqltypes.Null { + return "null (NULL_TYPE)" + } + if val.IsQuoted() || val.Type() == sqltypes.Bit { + if len(val.Raw()) >= 20 { + rawBytes := val.Raw()[:20] + rawBytes = append(rawBytes, []byte("...[TRUNCATED]")...) + return fmt.Sprintf("%q (%v)", rawBytes, val.Type()) + } + return fmt.Sprintf("%q (%v)", val.Raw(), val.Type()) + } + return fmt.Sprintf("%s (%v)", val.Raw(), val.Type()) +} diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index 31df295e468..0c93d02473a 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -81,6 +81,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -97,6 +98,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -113,6 +115,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -129,6 +132,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, false}, {1, 0, true}}, comparePKs: []compareColInfo{{1, 0, true}}, pkCols: []int{1}, + selectPks: []int{1}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{1, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{1, 0, true}}), }, @@ -145,6 +149,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -162,6 +167,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 2, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -179,6 +185,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 2, false}, {1, 0, true}}, comparePKs: []compareColInfo{{1, 0, true}}, pkCols: []int{1}, + selectPks: []int{1}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{1, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{1, 0, true}}), }, @@ -196,6 +203,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 2, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 2, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 2, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 2, true}}), }, @@ -213,6 +221,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, false}, {1, 2, true}}, comparePKs: []compareColInfo{{1, 2, true}}, pkCols: []int{1}, + selectPks: []int{1}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{1, 2, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{1, 2, true}}), }, @@ -230,6 +239,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, false}, {1, 2, true}}, comparePKs: []compareColInfo{{1, 2, true}}, pkCols: []int{1}, + selectPks: []int{1}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{1, 2, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{1, 2, true}}), }, @@ -245,6 +255,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, true}}, comparePKs: []compareColInfo{{0, 0, true}, {1, 0, true}}, pkCols: []int{0, 1}, + selectPks: []int{0, 1}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}, {1, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}, {1, 0, true}}), }, @@ -262,6 +273,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -280,6 +292,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -298,6 +311,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -316,6 +330,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -334,6 +349,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -351,6 +367,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, 0, true}}), }, @@ -368,6 +385,7 @@ func TestVDiffPlanSuccess(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}, {2, 0, false}, {3, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, sourcePrimitive: &engine.OrderedAggregate{ Aggregates: []engine.AggregateParams{{ Opcode: engine.AggregateCount, @@ -467,10 +485,12 @@ func TestVDiffUnsharded(t *testing.T) { ) testcases := []struct { - id string - source []*sqltypes.Result - target []*sqltypes.Result - dr *DiffReport + id string + source []*sqltypes.Result + target []*sqltypes.Result + dr *DiffReport + onlyPks bool + debug bool }{{ id: "1", source: sqltypes.MakeTestStreamingResults(fields, @@ -504,6 +524,15 @@ func TestVDiffUnsharded(t *testing.T) { ProcessedRows: 3, MatchingRows: 1, ExtraRowsTarget: 2, + ExtraRowsTargetSample: []*RowDiff{ + { + Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(4), + }, + Query: "", + }, + }, }, }, { id: "3", @@ -520,6 +549,15 @@ func TestVDiffUnsharded(t *testing.T) { ProcessedRows: 3, MatchingRows: 1, ExtraRowsSource: 2, + ExtraRowsSourceSample: []*RowDiff{ + { + Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(4), + }, + Query: "", + }, + }, }, }, { id: "4", @@ -538,6 +576,15 @@ func TestVDiffUnsharded(t *testing.T) { ProcessedRows: 3, MatchingRows: 2, ExtraRowsSource: 1, + ExtraRowsSourceSample: []*RowDiff{ + { + Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(4), + }, + Query: "", + }, + }, }, }, { id: "5", @@ -556,6 +603,15 @@ func TestVDiffUnsharded(t *testing.T) { ProcessedRows: 3, MatchingRows: 2, ExtraRowsTarget: 1, + ExtraRowsTargetSample: []*RowDiff{ + { + Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(4), + }, + Query: "", + }, + }, }, }, { id: "6", @@ -575,6 +631,92 @@ func TestVDiffUnsharded(t *testing.T) { ProcessedRows: 3, MatchingRows: 2, MismatchedRows: 1, + MismatchedRowsSample: []*DiffMismatch{ + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(3), + }, + Query: "", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(4), + }, + Query: "", + }, + }, + }, + }, + }, { + id: "7", + onlyPks: true, + source: sqltypes.MakeTestStreamingResults(fields, + "1|3", + "---", + "2|3", + "3|1", + ), + target: sqltypes.MakeTestStreamingResults(fields, + "1|3", + "---", + "2|4", + "3|1", + ), + dr: &DiffReport{ + ProcessedRows: 3, + MatchingRows: 2, + MismatchedRows: 1, + MismatchedRowsSample: []*DiffMismatch{ + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + }, + Query: "", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + }, + Query: "", + }, + }, + }, + }, + }, { + id: "8", + debug: true, + source: sqltypes.MakeTestStreamingResults(fields, + "1|3", + "---", + "2|3", + "3|1", + ), + target: sqltypes.MakeTestStreamingResults(fields, + "1|3", + "---", + "2|4", + "3|1", + ), + dr: &DiffReport{ + ProcessedRows: 3, + MatchingRows: 2, + MismatchedRows: 1, + MismatchedRowsSample: []*DiffMismatch{ + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(3), + }, + Query: "select c1, c2 from t1 where c1=2;", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewInt64(4), + }, + Query: "select c1, c2 from t1 where c1=2;", + }, + }, + }, }, }} @@ -583,7 +725,7 @@ func TestVDiffUnsharded(t *testing.T) { env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, tcase.source) env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, tcase.target) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "", tcase.debug, tcase.onlyPks) require.NoError(t, err) assert.Equal(t, tcase.dr, dr["t1"], tcase.id) }) @@ -646,7 +788,7 @@ func TestVDiffSharded(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 3, @@ -712,7 +854,7 @@ func TestVDiffAggregates(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 5, @@ -776,7 +918,7 @@ func TestVDiffPKWeightString(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 4, @@ -840,7 +982,7 @@ func TestVDiffNoPKWeightString(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 4, @@ -878,23 +1020,23 @@ func TestVDiffDefaults(t *testing.T) { env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, source) env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target) - _, err := env.wr.VDiff(context.Background(), "target", env.workflow, "", "", "replica", 30*time.Second, "", 100, "") + _, err := env.wr.VDiff(context.Background(), "target", env.workflow, "", "", "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) - _, err = env.wr.VDiff(context.Background(), "target", env.workflow, "", env.cell, "replica", 30*time.Second, "", 100, "") + _, err = env.wr.VDiff(context.Background(), "target", env.workflow, "", env.cell, "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) var df map[string]*DiffReport - df, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "", 100, "") + df, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) require.Equal(t, df["t1"].ProcessedRows, 3) - df, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "", 1, "") + df, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "", 1, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) require.Equal(t, df["t1"].ProcessedRows, 1) - df, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "", 0, "") + df, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "", 0, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) require.Equal(t, df["t1"].ProcessedRows, 0) - _, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 1*time.Nanosecond, "", 100, "") + _, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 1*time.Nanosecond, "", 100, "", false /*debug*/, false /*onlyPks*/) require.Error(t, err) err = topo.CheckKeyspaceLocked(context.Background(), "target") require.EqualErrorf(t, err, "keyspace target is not locked (no locksInfo)", "") @@ -931,7 +1073,7 @@ func TestVDiffReplicationWait(t *testing.T) { env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, source) env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target) - _, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "", 100, "") + _, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.Error(t, err) require.True(t, strings.Contains(err.Error(), "context deadline exceeded")) } @@ -969,6 +1111,7 @@ func TestVDiffFindPKs(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}}, comparePKs: []compareColInfo{{0, 0, true}}, pkCols: []int{0}, + selectPks: []int{0}, }, }, { name: "", @@ -995,6 +1138,7 @@ func TestVDiffFindPKs(t *testing.T) { compareCols: []compareColInfo{{0, 0, true}, {1, 0, false}, {2, 0, false}, {3, 0, true}}, comparePKs: []compareColInfo{{0, 0, true}, {3, 0, true}}, pkCols: []int{0, 3}, + selectPks: []int{0, 3}, }, }, } @@ -1115,6 +1259,56 @@ func TestVDiffNullWeightString(t *testing.T) { dr: &DiffReport{ ProcessedRows: 3, MismatchedRows: 3, + MismatchedRowsSample: []*DiffMismatch{ + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(1), + "c2": sqltypes.NewVarChar("abd"), + "weight_string(c2)": sqltypes.NULL, + }, + Query: "", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(1), + "c2": sqltypes.NewVarChar("abc"), + "weight_string(c2)": sqltypes.NULL, + }, + Query: "", + }, + }, + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewVarChar("abd"), + "weight_string(c2)": sqltypes.NewVarBinary("abd"), + }, + Query: "", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(2), + "c2": sqltypes.NewVarChar("abc"), + "weight_string(c2)": sqltypes.NewVarBinary("abc"), + }, + Query: "", + }, + }, + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(3), + "c2": sqltypes.NewVarChar("abd"), + "weight_string(c2)": sqltypes.NULL, + }, + Query: "", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(3), + "c2": sqltypes.NewVarChar("abc"), + "weight_string(c2)": sqltypes.NULL, + }, + Query: "", + }, + }, + }, }, }, { //this explicitly tests for a bug that existed with a small max_allowed_packet setting @@ -1129,6 +1323,24 @@ func TestVDiffNullWeightString(t *testing.T) { dr: &DiffReport{ ProcessedRows: 1, MismatchedRows: 1, + MismatchedRowsSample: []*DiffMismatch{ + { + Source: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(1), + "c2": sqltypes.NewVarChar("abd"), + "weight_string(c2)": sqltypes.NULL, + }, + Query: "", + }, + Target: &RowDiff{Row: map[string]sqltypes.Value{ + "c1": sqltypes.NewInt64(1), + "c2": sqltypes.NewVarChar("abc"), + "weight_string(c2)": sqltypes.NULL, + }, + Query: "", + }, + }, + }, }, }} for _, tcase := range testcases { @@ -1136,7 +1348,7 @@ func TestVDiffNullWeightString(t *testing.T) { env.tablets[101].setResults("select c1, c2, weight_string(c2) from t1 order by c1 asc", vdiffSourceGtid, tcase.source) env.tablets[201].setResults("select c1, c2, weight_string(c2) from t1 order by c1 asc", vdiffTargetMasterPosition, tcase.target) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "", 100, "", false /*debug*/, false /*onlyPks*/) require.NoError(t, err) require.Equal(t, tcase.dr, dr["t1"], tcase.id) })