Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.getMatchingTablets(ctx)

if len(candidates) == 0 {
// if no candidates were found, sleep and try again
log.Infof("No tablet found for streaming, sleeping for %d seconds", int(GetTabletPickerRetryDelay()/1e9))
log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %d seconds",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, int(GetTabletPickerRetryDelay()/1e9))
time.Sleep(GetTabletPickerRetryDelay())
continue
}
Expand All @@ -131,6 +133,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
log.Infof("tablet picker found tablet %s", ti.Tablet.String())
return ti.Tablet, nil
}
}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/log"

querypb "vitess.io/vitess/go/vt/proto/query"

"github.com/golang/protobuf/jsonpb"
Expand Down Expand Up @@ -1982,6 +1984,7 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
format := subFlags.String("format", "", "Format of report") //"json" or ""

if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1995,6 +1998,9 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
}

_, err = wr.VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *format)
if err != nil {
log.Errorf("vdiff returning with error: %v", err)
}
return err
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,10 +801,10 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo

func wrapError(err error, stopPos mysql.Position) error {
if err != nil {
err = fmt.Errorf("stream error @ %v: %v", stopPos, err)
err = fmt.Errorf("stream (at source tablet) error @ %v: %v", stopPos, err)
log.Error(err)
return err
}
log.Infof("stream ended @ %v", stopPos)
log.Infof("stream (at source tablet) ended @ %v", stopPos)
return nil
}
44 changes: 44 additions & 0 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package wrangler
import (
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -117,6 +118,8 @@ type shardStreamer struct {
func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr string,
filteredReplicationWaitTime time.Duration,
format string) (map[string]*DiffReport, error) {
log.Infof("Starting VDiff for %s.%s, sourceCell %s, targetCell %s, tabletTypes %s, timeout %s",
targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr, filteredReplicationWaitTime.String())
// Assign defaults to sourceCell and targetCell if not specified.
if sourceCell == "" && targetCell == "" {
cells, err := wr.ts.GetCellInfoNames(ctx)
Expand Down Expand Up @@ -553,6 +556,7 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici
if participant.position.IsZero() {
return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.master.Alias.String())
}
log.Infof("WaitForPosition: tablet %s should reach position %s", participant.tablet.Alias.String(), mysql.EncodePosition(participant.position))
if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil {
if err.Error() == "context deadline exceeded" {
return fmt.Errorf("VDiff timed out for tablet %v: you may want to increase it with the flag -filtered_replication_wait_time=<timeoutSeconds>", topoproto.TabletAliasString(participant.tablet.Alias))
Expand Down Expand Up @@ -666,6 +670,7 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti
func (df *vdiff) restartTargets(ctx context.Context) error {
return df.forAll(df.targets, func(shard string, target *shardStreamer) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(target.master.DbName()), encodeString(df.ts.workflow))
log.Infof("restarting target replication with %s", query)
_, err := df.ts.wr.tmc.VReplicationExec(ctx, target.master.Tablet, query)
return err
})
Expand Down Expand Up @@ -760,6 +765,42 @@ func (sm *shardStreamer) StreamExecute(vcursor engine.VCursor, bindVars map[stri
return sm.err
}

// humanInt formats large integers to a value easier to the eye: 100000=100k 1e12=1b 234000000=234m ...
func humanInt(n int64) string {
var val float64
var unit string
switch true {
case n < 1000:
val = float64(n)
case n < 1e6:
val = float64(n) / 1000
unit = "k"
case n < 1e9:
val = float64(n) / 1e6
unit = "m"
default:
val = float64(n) / 1e9
unit = "b"
}
s := fmt.Sprintf("%0.3f", val)
s = strings.Replace(s, ".000", "", -1)

return fmt.Sprintf("%s%s", s, unit)
}

// logSteps returns a "human" readable value of n, for proportional steps of n (so as not to spam logs)
// the go-humanize package doesn't support counts atm
func logSteps(n int64) string {
if n == 0 {
return ""
}
step := int64(math.Floor(math.Pow(10, math.Floor(math.Log10(float64(n))))))
if (n%step == 0) || (n%1e6 == 0) { // min step is a million
return humanInt(n)
}
return ""
}

//-----------------------------------------------------------------
// tableDiffer

Expand All @@ -772,6 +813,9 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler) (*DiffReport, err
advanceSource := true
advanceTarget := true
for {
if s := logSteps(int64(dr.ProcessedRows)); s != "" {
log.Infof("VDiff processed %s rows", s)
}
if advanceSource {
sourceRow, err = sourceExecutor.next()
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions go/vt/wrangler/vdiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package wrangler

import (
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -962,3 +963,19 @@ func TestVDiffFindPKs(t *testing.T) {
}

}

func TestLogSteps(t *testing.T) {
testcases := []struct {
n int64
log string
}{
{1, "1"}, {2000, "2k"}, {1000000, "1m"}, {330000, ""}, {330001, ""},
{4000000, "4m"}, {40000000, "40m"}, {41000000, "41m"}, {4110000, ""},
{5000000000, "5b"}, {5010000000, "5.010b"}, {5011000000, "5.011b"},
}
for _, tc := range testcases {
t.Run(strconv.Itoa(int(tc.n)), func(t *testing.T) {
require.Equal(t, tc.log, logSteps(tc.n))
})
}
}