Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func vdiff(t *testing.T, workflow, cells string) {
t.Logf("vdiff err: %+v, output: %+v", err, output)
require.Nil(t, err)
require.NotNil(t, output)
diffReports := make([]*wrangler.DiffReport, 0)
diffReports := make(map[string]*wrangler.DiffReport)
err = json.Unmarshal([]byte(output), &diffReports)
require.Nil(t, err)
if len(diffReports) < 1 {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,8 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be cancelled on a timeout.")
maxRows := subFlags.Int64("limit", math.MaxInt64, "Max rows to stop comparing after")
debugQuery := subFlags.Bool("debug_query", false, "Adds a mysql query to the report that can be used for further debugging")
onlyPks := subFlags.Bool("only_pks", false, "When reporting missing rows, only show primary keys in the report.")
format := subFlags.String("format", "", "Format of report") //"json" or ""
tables := subFlags.String("tables", "", "Only run vdiff for these tables in the workflow")
if err := subFlags.Parse(args); err != nil {
Expand All @@ -2352,7 +2354,7 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
return fmt.Errorf("maximum number of rows to compare needs to be greater than 0")
}
_, err = wr.
VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *format, *maxRows, *tables)
VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *format, *maxRows, *tables, *debugQuery, *onlyPks)
if err != nil {
log.Errorf("vdiff returning with error: %v", err)
if strings.Contains(err.Error(), "context deadline exceeded") {
Expand Down
225 changes: 192 additions & 33 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,29 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"

"github.com/golang/protobuf/proto"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

Expand All @@ -55,11 +55,26 @@ import (

// DiffReport is the summary of differences for one table.
type DiffReport struct {
ProcessedRows int
MatchingRows int
MismatchedRows int
ExtraRowsSource int
ExtraRowsTarget int
ProcessedRows int
MatchingRows int
MismatchedRows int
ExtraRowsSource int
ExtraRowsSourceSample []*RowDiff
ExtraRowsTarget int
ExtraRowsTargetSample []*RowDiff
MismatchedRowsSample []*DiffMismatch
}

// DiffMismatch is a sample of row diffs between source and target.
type DiffMismatch struct {
Source *RowDiff
Target *RowDiff
}

// RowDiff is a row that didn't match as part of the comparison.
type RowDiff struct {
Row map[string]sqltypes.Value
Query string
}

// vdiff contains the metadata for performing vdiff for one workflow.
Expand Down Expand Up @@ -106,8 +121,10 @@ type tableDiffer struct {
// pkCols has the indices of PK cols in the select list
pkCols []int

// sourcePrimitive and targetPrimitive are used for streaming
// results from source and target.
// selectPks is the list of pk columns as they appear in the select clause for the diff.
selectPks []int

// source Primitive and targetPrimitive are used for streaming
sourcePrimitive engine.Primitive
targetPrimitive engine.Primitive
}
Expand All @@ -130,7 +147,7 @@ type shardStreamer struct {

// VDiff reports differences between the sources and targets of a vreplication workflow.
func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sourceCell, targetCell, tabletTypesStr string,
filteredReplicationWaitTime time.Duration, format string, maxRows int64, tables string) (map[string]*DiffReport, error) {
filteredReplicationWaitTime time.Duration, format string, maxRows int64, tables string, debug, onlyPks bool) (map[string]*DiffReport, error) {
log.Infof("Starting VDiff for %s.%s, sourceCell %s, targetCell %s, tabletTypes %s, timeout %s",
targetKeyspace, workflowName, sourceCell, targetCell, tabletTypesStr, filteredReplicationWaitTime.String())
// Assign defaults to sourceCell and targetCell if not specified.
Expand Down Expand Up @@ -229,26 +246,43 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou
return nil, err
}
// Perform the diff of source and target streams.
dr, err := td.diff(ctx, df.ts.wr, &rowsToCompare)
dr, err := td.diff(ctx, df.ts.wr, &rowsToCompare, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "diff")
}
if format == "json" {
json, err := json.MarshalIndent(*dr, "", "")
if err != nil {
wr.Logger().Printf("Error converting report to json: %v", err.Error())
diffReports[table] = dr
}
if format == "json" {
json, err := json.MarshalIndent(diffReports, "", "")
if err != nil {
wr.Logger().Printf("Error converting report to json: %v", err.Error())
}
jsonOutput += fmt.Sprintf("%s", json)
wr.logger.Printf("%s", jsonOutput)
} else {
for table, dr := range diffReports {
wr.Logger().Printf("Summary for table %v:\n", table)
wr.Logger().Printf("\tProcessedRows: %v\n", dr.ProcessedRows)
wr.Logger().Printf("\tMatchingRows: %v\n", dr.MatchingRows)
wr.Logger().Printf("\tMismatchedRows: %v\n", dr.MismatchedRows)
wr.Logger().Printf("\tExtraRowsSource: %v\n", dr.ExtraRowsSource)
wr.Logger().Printf("\tExtraRowsTarget: %v\n", dr.ExtraRowsTarget)
for i, rs := range dr.ExtraRowsSourceSample {
wr.Logger().Printf("\tSample extra row in source %v:\n", i)
formatSampleRow(wr.Logger(), rs, debug)
}
if jsonOutput != "" {
jsonOutput += ","
for i, rs := range dr.ExtraRowsTargetSample {
wr.Logger().Printf("\tSample extra row in target %v:\n", i)
formatSampleRow(wr.Logger(), rs, debug)
}
for i, rs := range dr.MismatchedRowsSample {
wr.Logger().Printf("\tSample rows with mismatch %v:\n", i)
wr.Logger().Printf("\t\tSource row:\n")
formatSampleRow(wr.Logger(), rs.Source, debug)
wr.Logger().Printf("\t\tTarget row:\n")
formatSampleRow(wr.Logger(), rs.Target, debug)
}
jsonOutput += fmt.Sprintf("%s", json)
} else {
wr.Logger().Printf("Summary for %v: %+v\n", td.targetTable, *dr)
}
diffReports[table] = dr
}
if format == "json" && jsonOutput != "" {
wr.logger.Printf(`[ %s ]`, jsonOutput)
}
return diffReports, nil
}
Expand Down Expand Up @@ -358,6 +392,7 @@ func findPKs(table *tabletmanagerdatapb.TableDefinition, targetSelect *sqlparser
if strings.EqualFold(pk, colname) {
td.compareCols[i].isPK = true
td.comparePKs = append(td.comparePKs, td.compareCols[i])
td.selectPks = append(td.selectPks, i)
// We'll be comparing pks separately. So, remove them from compareCols.
td.pkCols = append(td.pkCols, i)
found = true
Expand Down Expand Up @@ -862,7 +897,7 @@ func humanInt(n int64) string {
//-----------------------------------------------------------------
// tableDiffer

func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *int64) (*DiffReport, error) {
func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *int64, debug, onlyPks bool) (*DiffReport, error) {
sourceExecutor := newPrimitiveExecutor(ctx, td.sourcePrimitive)
targetExecutor := newPrimitiveExecutor(ctx, td.targetPrimitive)
dr := &DiffReport{}
Expand Down Expand Up @@ -900,8 +935,13 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in
advanceTarget = true

if sourceRow == nil {
diffRow, err := td.genRowDiff(td.sourceExpression, targetRow, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "unexpected error generating diff")
}
dr.ExtraRowsTargetSample = append(dr.ExtraRowsTargetSample, diffRow)

// drain target, update count
wr.Logger().Errorf("Draining extra row(s) found on the target starting with: %v", targetRow)
count, err := targetExecutor.drain(ctx)
if err != nil {
return nil, err
Expand All @@ -913,7 +953,12 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in
if targetRow == nil {
// no more rows from the target
// we know we have rows from source, drain, update count
wr.Logger().Warningf("Draining extra row(s) found on the source starting with: %v", sourceRow)
diffRow, err := td.genRowDiff(td.sourceExpression, sourceRow, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "unexpected error generating diff")
}
dr.ExtraRowsSourceSample = append(dr.ExtraRowsTargetSample, diffRow)

count, err := sourceExecutor.drain(ctx)
if err != nil {
return nil, err
Expand All @@ -932,14 +977,22 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in
return nil, err
case c < 0:
if dr.ExtraRowsSource < 10 {
wr.Logger().Errorf("[table=%v] Extra row %v on source: %v", td.targetTable, dr.ExtraRowsSource, sourceRow)
diffRow, err := td.genRowDiff(td.sourceExpression, sourceRow, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "unexpected error generating diff")
}
dr.ExtraRowsSourceSample = append(dr.ExtraRowsTargetSample, diffRow)
}
dr.ExtraRowsSource++
advanceTarget = false
continue
case c > 0:
if dr.ExtraRowsTarget < 10 {
wr.Logger().Errorf("[table=%v] Extra row %v on target: %v", td.targetTable, dr.ExtraRowsTarget, targetRow)
diffRow, err := td.genRowDiff(td.targetExpression, targetRow, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "unexpected error generating diff")
}
dr.ExtraRowsTargetSample = append(dr.ExtraRowsTargetSample, diffRow)
}
dr.ExtraRowsTarget++
advanceSource = false
Expand All @@ -954,7 +1007,15 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, rowsToCompare *in
return nil, err
case c != 0:
if dr.MismatchedRows < 10 {
wr.Logger().Errorf("[table=%v] Different content %v in same PK: %v != %v", td.targetTable, dr.MismatchedRows, sourceRow, targetRow)
sourceDiffRow, err := td.genRowDiff(td.targetExpression, sourceRow, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "unexpected error generating diff")
}
targetDiffRow, err := td.genRowDiff(td.targetExpression, targetRow, debug, onlyPks)
if err != nil {
return nil, vterrors.Wrap(err, "unexpected error generating diff")
}
dr.MismatchedRowsSample = append(dr.MismatchedRowsSample, &DiffMismatch{Source: sourceDiffRow, Target: targetDiffRow})
}
dr.MismatchedRows++
default:
Expand Down Expand Up @@ -993,6 +1054,72 @@ func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []com
return 0, nil
}

func (td *tableDiffer) genRowDiff(queryStmt string, row []sqltypes.Value, debug, onlyPks bool) (*RowDiff, error) {
drp := &RowDiff{}
drp.Row = make(map[string]sqltypes.Value)
statement, err := sqlparser.Parse(queryStmt)
if err != nil {
return nil, err
}
sel, ok := statement.(*sqlparser.Select)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement))
}

if debug {
drp.Query = td.genDebugQueryDiff(sel, row, onlyPks)
}

if onlyPks {
for _, pkI := range td.selectPks {
buf := sqlparser.NewTrackedBuffer(nil)
sel.SelectExprs[pkI].Format(buf)
col := buf.String()
drp.Row[col] = row[pkI]
}
return drp, nil
}

for i := range sel.SelectExprs {
buf := sqlparser.NewTrackedBuffer(nil)
sel.SelectExprs[i].Format(buf)
col := buf.String()
drp.Row[col] = row[i]
}

return drp, nil
}

func (td *tableDiffer) genDebugQueryDiff(sel *sqlparser.Select, row []sqltypes.Value, onlyPks bool) string {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select ")

if onlyPks {
for i, pkI := range td.selectPks {
pk := sel.SelectExprs[pkI]
pk.Format(buf)
if i != len(td.selectPks)-1 {
buf.Myprintf(", ")
}
}
} else {
sel.SelectExprs.Format(buf)
}
buf.Myprintf(" from ")
sel.From.Format(buf)
buf.Myprintf(" where ")
for i, pkI := range td.selectPks {
sel.SelectExprs[pkI].Format(buf)
buf.Myprintf("=")
row[pkI].EncodeSQL(buf)
if i != len(td.selectPks)-1 {
buf.Myprintf(" AND ")
}
}
buf.Myprintf(";")
return buf.String()
}

//-----------------------------------------------------------------
// contextVCursor

Expand Down Expand Up @@ -1055,3 +1182,35 @@ func wrapWeightString(expr sqlparser.SelectExpr) *sqlparser.AliasedExpr {
},
}
}

func formatSampleRow(logger logutil.Logger, rd *RowDiff, debug bool) {
keys := make([]string, 0, len(rd.Row))
for k := range rd.Row {
keys = append(keys, k)
}

sort.Strings(keys)

for _, k := range keys {
logger.Printf("\t\t\t %s: %s\n", k, formatValue(rd.Row[k]))
}

if debug {
logger.Printf("\t\tDebugQuery: %v\n", rd.Query)
}
}

func formatValue(val sqltypes.Value) string {
if val.Type() == sqltypes.Null {
return "null (NULL_TYPE)"
}
if val.IsQuoted() || val.Type() == sqltypes.Bit {
if len(val.Raw()) >= 20 {
rawBytes := val.Raw()[:20]
rawBytes = append(rawBytes, []byte("...[TRUNCATED]")...)
return fmt.Sprintf("%q (%v)", rawBytes, val.Type())
}
return fmt.Sprintf("%q (%v)", val.Raw(), val.Type())
}
return fmt.Sprintf("%s (%v)", val.Raw(), val.Type())
}
Loading