Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
return vterrors.Wrap(err, "could not decode position")
}
if !curPos.AtLeast(pos) {
return fmt.Errorf("requested position %v is ahead of current position %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,7 @@ func TestNoFutureGTID(t *testing.T) {
}()
defer close(ch)
err = vstream(ctx, t, future, nil, nil, ch)
want := "is ahead of current position"
want := "GTIDSet Mismatch"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}
Expand Down
13 changes: 12 additions & 1 deletion go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type vdiff struct {
// The source and target keyspaces are pulled from ts.
sources map[string]*shardStreamer
targets map[string]*shardStreamer

workflow string
targetKeyspace string
}

// tableDiffer performs a diff for one table in the workflow.
Expand Down Expand Up @@ -153,6 +156,8 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC
tabletTypesStr: tabletTypesStr,
sources: make(map[string]*shardStreamer),
targets: make(map[string]*shardStreamer),
workflow: workflow,
targetKeyspace: targetKeyspace,
}
for shard, source := range ts.sources {
df.sources[shard] = &shardStreamer{
Expand Down Expand Up @@ -545,7 +550,13 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici
defer cancel()
return df.forAll(participants, func(shard string, participant *shardStreamer) error {
// Iteration for each participant.
if participant.position.IsZero() {
return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.master.Alias.String())
}
if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil {
if err.Error() == "context deadline exceeded" {
return fmt.Errorf("VDiff timed out for tablet %v: you may want to increase it with the flag -filtered_replication_wait_time=<timeoutSeconds>", topoproto.TabletAliasString(participant.tablet.Alias))
}
return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias))
}
participant.result = make(chan *sqltypes.Result, 1)
Expand Down Expand Up @@ -795,7 +806,7 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler) (*DiffReport, err
if targetRow == nil {
// no more rows from the target
// we know we have rows from source, drain, update count
wr.Logger().Errorf("Draining extra row(s) found on the source starting with: %v", sourceRow)
wr.Logger().Warningf("Draining extra row(s) found on the source starting with: %v", sourceRow)
count, err := sourceExecutor.drain(ctx)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/vdiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func TestVDiffReplicationWait(t *testing.T) {
env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target)

_, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "")
require.EqualError(t, err, "startQueryStreams(sources): WaitForPosition for tablet cell-0000000101: context deadline exceeded")
require.EqualError(t, err, "startQueryStreams(sources): VDiff timed out for tablet cell-0000000101: you may want to increase it with the flag -filtered_replication_wait_time=<timeoutSeconds>")
}

func TestVDiffFindPKs(t *testing.T) {
Expand Down