Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Stats struct {
QueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
ErrorCounts *stats.CountersWithMultiLabels
}

// SetLastPosition sets the last replication position.
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewStats() *Stats {
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")

bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"})
return bps
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (ct *controller) run(ctx context.Context) {
default:
}
log.Errorf("stream %v: %v, retrying after %v", ct.id, err, *retryDelay)
ct.blpStats.ErrorCounts.Add([]string{"Stream Error"}, 1)
timer := time.NewTimer(*retryDelay)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -192,6 +193,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id)
tablet, err = ct.tabletPicker.PickForStreaming(ctx)
if err != nil {
ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1)
return err
}
log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String())
Expand All @@ -203,6 +205,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
// Table names can have search patterns. Resolve them against the schema.
tables, err := mysqlctl.ResolveTables(ctx, ct.mysqld, dbClient.DBName(), ct.source.Tables)
if err != nil {
ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1)
return vterrors.Wrap(err, "failed to resolve table names")
}

Expand Down Expand Up @@ -241,6 +244,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
vr := newVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre)
return vr.Replicate(ctx)
}
ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1)
return fmt.Errorf("missing source")
}

Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,21 @@ func (st *vrStats) register() {
}
return result
})
stats.NewCountersFuncWithMultiLabels(
"VReplicationErrors",
"Errors during vreplication",
[]string{"workflow", "type"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64)
for _, ct := range st.controllers {
for key, val := range ct.blpStats.ErrorCounts.Counts() {
result[fmt.Sprintf("%d_%s", ct.id, key)] = val
}
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (vp *vplayer) play(ctx context.Context) error {

plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.pkInfoMap, vp.copyState)
if err != nil {
vp.vr.stats.ErrorCounts.Add([]string{"Plan"}, 1)
return err
}
vp.replicatorPlan = plan
Expand Down Expand Up @@ -366,6 +367,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
log.Errorf("Error applying event: %s", err.Error())
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame
func (vr *vreplicator) Replicate(ctx context.Context) error {
err := vr.replicate(ctx)
if err != nil {
log.Errorf("Replicate error: %s", err.Error())
if err := vr.setMessage(err.Error()); err != nil {
log.Errorf("Failed to set error state: %v", err)
}
Expand Down Expand Up @@ -165,10 +166,12 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
return err
}
if err := newVCopier(vr).copyNext(ctx, settings); err != nil {
vr.stats.ErrorCounts.Add([]string{"Copy"}, 1)
return err
}
case settings.StartPos.IsZero():
if err := newVCopier(vr).initTablesForCopy(ctx); err != nil {
vr.stats.ErrorCounts.Add([]string{"Copy"}, 1)
return err
}
default:
Expand All @@ -180,6 +183,7 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
return vr.setState(binlogplayer.BlpStopped, "Stopped after copy.")
}
if err := vr.setState(binlogplayer.BlpRunning, ""); err != nil {
vr.stats.ErrorCounts.Add([]string{"Replicate"}, 1)
return err
}
return newVPlayer(vr, settings, nil, mysql.Position{}, "replicate").play(ctx)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (uvs *uvstreamer) copy(ctx context.Context) error {
tableName := uvs.tablesToCopy[0]
log.V(2).Infof("Copystate not empty starting catchupAndCopy on table %s", tableName)
if err := uvs.catchupAndCopy(ctx, tableName); err != nil {
uvs.vse.errorCounts.Add("Copy", 1)
return err
}
}
Expand All @@ -50,6 +51,7 @@ func (uvs *uvstreamer) catchupAndCopy(ctx context.Context, tableName string) err
if !uvs.pos.IsZero() {
if err := uvs.catchup(ctx); err != nil {
log.Infof("catchupAndCopy: catchup returned %v", err)
uvs.vse.errorCounts.Add("Catchup", 1)
return err
}
}
Expand Down Expand Up @@ -265,6 +267,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
return nil
})
if err != nil {
uvs.vse.errorCounts.Add("StreamRows", 1)
return err
}

Expand Down
38 changes: 22 additions & 16 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,17 @@ type Engine struct {
vschemaUpdates *stats.Counter

// vstreamer metrics
vstreamerPhaseTimings *servenv.TimingsWrapper
vstreamerEventsStreamed *stats.Counter
vstreamerPacketSize *stats.GaugeFunc
vstreamerNumPackets *stats.Counter
resultStreamerNumRows *stats.Counter
resultStreamerNumPackets *stats.Counter
rowStreamerNumRows *stats.Counter
rowStreamerNumPackets *stats.Counter
vstreamerPhaseTimings *servenv.TimingsWrapper
vstreamerEventsStreamed *stats.Counter
vstreamerPacketSize *stats.GaugeFunc
vstreamerNumPackets *stats.Counter
resultStreamerNumRows *stats.Counter
resultStreamerNumPackets *stats.Counter
rowStreamerNumRows *stats.Counter
rowStreamerNumPackets *stats.Counter
errorCounts *stats.CountersWithSingleLabel
vstreamersCreated *stats.Counter
vstreamersEndedWithErrors *stats.Counter
}

// NewEngine creates a new Engine.
Expand All @@ -102,14 +105,17 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, cell str
vschemaErrors: env.Exporter().NewCounter("VSchemaErrors", "Count of VSchema errors"),
vschemaUpdates: env.Exporter().NewCounter("VSchemaUpdates", "Count of VSchema updates. Does not include errors"),

vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"),
vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"),
vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize),
vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"),
resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"),
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"),
vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"),
vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize),
vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"),
resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"),
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
vstreamersCreated: env.Exporter().NewCounter("VStreamersCreated", "Count of vstreamers created"),
vstreamersEndedWithErrors: env.Exporter().NewCounter("VStreamersEndedWithErrors", "Count of vstreamers that ended with errors"),
errorCounts: env.Exporter().NewCountersWithSingleLabel("VStreamerErrors", "Tracks errors in vstreamer", "type", "Catchup", "Copy", "Send", "TablePlan"),
}
env.Exporter().HandleFunc("/debug/tablet_vschema", vse.ServeHTTP)
return vse
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
}
}
}
if err != nil {
uvs.vse.errorCounts.Add("Send", 1)
}
return err
}

Expand All @@ -298,7 +301,7 @@ func (uvs *uvstreamer) sendEventsForCurrentPos() error {
Type: binlogdatapb.VEventType_OTHER,
}}
if err := uvs.send(evs); err != nil {
return wrapError(err, uvs.pos)
return wrapError(err, uvs.pos, uvs.vse)
}
return nil
}
Expand All @@ -320,6 +323,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
return vterrors.Wrap(err, "could not decode position")
}
if !curPos.AtLeast(pos) {
uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1)
return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
Expand Down Expand Up @@ -362,6 +366,7 @@ func (uvs *uvstreamer) Stream() error {
log.Info("TablePKs is not nil: starting vs.copy()")
if err := uvs.copy(uvs.ctx); err != nil {
log.Infof("uvstreamer.Stream() copy returned with err %s", err)
uvs.vse.errorCounts.Add("Copy", 1)
return err
}
uvs.sendTestEvent("Copy Done")
Expand Down
2 changes: 0 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ func resetMetrics(t *testing.T) {
engine.resultStreamerNumRows.Reset()
engine.rowStreamerNumRows.Reset()
engine.vstreamerPhaseTimings.Reset()
engine.vstreamerPhaseTimings.Reset()
engine.vstreamerPhaseTimings.Reset()
}

func validateMetrics(t *testing.T) {
Expand Down
21 changes: 15 additions & 6 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ func (vs *vstreamer) Stream() error {
//defer vs.cancel()
ctx := context.Background()
defer ctx.Done()
vs.vse.vstreamersCreated.Add(1)
log.Infof("Starting Stream() with startPos %s", vs.startPos)
pos, err := mysql.DecodePosition(vs.startPos)
if err != nil {
vs.vse.errorCounts.Add("StreamRows", 1)
vs.vse.vstreamersEndedWithErrors.Add(1)
return err
}
vs.pos = pos
Expand All @@ -161,21 +164,21 @@ func (vs *vstreamer) replicate(ctx context.Context) error {
// Ensure se is Open. If vttablet came up in a non_serving role,
// the schema engine may not have been initialized.
if err := vs.se.Open(); err != nil {
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}

conn, err := binlog.NewBinlogConnection(vs.cp)
if err != nil {
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}
defer conn.Close()

events, err := conn.StartBinlogDumpFromPosition(vs.ctx, vs.pos)
if err != nil {
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}
err = vs.parseEvents(vs.ctx, events)
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}

// parseEvents parses and sends events.
Expand Down Expand Up @@ -252,6 +255,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case binlogdatapb.VEventType_SAVEPOINT:
bufferedEvents = append(bufferedEvents, vevent)
default:
vs.vse.errorCounts.Add("BufferAndTransmit", 1)
return fmt.Errorf("unexpected event: %v", vevent)
}
return nil
Expand Down Expand Up @@ -280,13 +284,15 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
vevents, err := vs.parseEvent(ev)
if err != nil {
vs.vse.errorCounts.Add("ParseEvent", 1)
return err
}
for _, vevent := range vevents {
if err := bufferAndTransmit(vevent); err != nil {
if err == io.EOF {
return nil
}
vs.vse.errorCounts.Add("BufferAndTransmit", 1)
return fmt.Errorf("error sending event: %v", err)
}
}
Expand All @@ -308,6 +314,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
if err == io.EOF {
return nil
}
vs.vse.errorCounts.Add("Send", 1)
return fmt.Errorf("error sending event: %v", err)
}
}
Expand Down Expand Up @@ -503,6 +510,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e

vevent, err := vs.buildTablePlan(id, tm)
if err != nil {
vs.vse.errorCounts.Add("TablePlan", 1)
return nil, err
}
if vevent != nil {
Expand Down Expand Up @@ -538,7 +546,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
if err != nil {
return nil, err
}

}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
Expand Down Expand Up @@ -807,8 +814,10 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
return plan.filter(values)
}

func wrapError(err error, stopPos mysql.Position) error {
func wrapError(err error, stopPos mysql.Position, vse *Engine) error {
if err != nil {
vse.vstreamersEndedWithErrors.Add(1)
vse.errorCounts.Add("StreamEnded", 1)
err = fmt.Errorf("stream (at source tablet) error @ %v: %v", stopPos, err)
log.Error(err)
return err
Expand Down