diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 21e4cec9ce2..6ed3db5bf29 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -320,7 +320,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error { return vterrors.Wrap(err, "could not decode position") } if !curPos.AtLeast(pos) { - return fmt.Errorf("requested position %v is ahead of current position %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) + return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos return nil diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 4e85193473b..d10e757a7cc 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1513,7 +1513,7 @@ func TestNoFutureGTID(t *testing.T) { }() defer close(ch) err = vstream(ctx, t, future, nil, nil, ch) - want := "is ahead of current position" + want := "GTIDSet Mismatch" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("err: %v, must contain %s", err, want) } diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index d3f8063f03c..800efef28e8 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -71,6 +71,9 @@ type vdiff struct { // The source and target keyspaces are pulled from ts. sources map[string]*shardStreamer targets map[string]*shardStreamer + + workflow string + targetKeyspace string } // tableDiffer performs a diff for one table in the workflow. @@ -153,6 +156,8 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC tabletTypesStr: tabletTypesStr, sources: make(map[string]*shardStreamer), targets: make(map[string]*shardStreamer), + workflow: workflow, + targetKeyspace: targetKeyspace, } for shard, source := range ts.sources { df.sources[shard] = &shardStreamer{ @@ -545,7 +550,13 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici defer cancel() return df.forAll(participants, func(shard string, participant *shardStreamer) error { // Iteration for each participant. + if participant.position.IsZero() { + return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.master.Alias.String()) + } if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil { + if err.Error() == "context deadline exceeded" { + return fmt.Errorf("VDiff timed out for tablet %v: you may want to increase it with the flag -filtered_replication_wait_time=", topoproto.TabletAliasString(participant.tablet.Alias)) + } return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias)) } participant.result = make(chan *sqltypes.Result, 1) @@ -795,7 +806,7 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler) (*DiffReport, err if targetRow == nil { // no more rows from the target // we know we have rows from source, drain, update count - wr.Logger().Errorf("Draining extra row(s) found on the source starting with: %v", sourceRow) + wr.Logger().Warningf("Draining extra row(s) found on the source starting with: %v", sourceRow) count, err := sourceExecutor.drain(ctx) if err != nil { return nil, err diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index b3a0b65c490..dcb8d5d3d1a 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -891,7 +891,7 @@ func TestVDiffReplicationWait(t *testing.T) { env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target) _, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "") - require.EqualError(t, err, "startQueryStreams(sources): WaitForPosition for tablet cell-0000000101: context deadline exceeded") + require.EqualError(t, err, "startQueryStreams(sources): VDiff timed out for tablet cell-0000000101: you may want to increase it with the flag -filtered_replication_wait_time=") } func TestVDiffFindPKs(t *testing.T) {