diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index d8f6f51c286..df04691b215 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -89,6 +89,7 @@ type Stats struct { QueryCount *stats.CountersWithSingleLabel CopyRowCount *stats.Counter CopyLoopCount *stats.Counter + ErrorCounts *stats.CountersWithMultiLabels } // SetLastPosition sets the last replication position. @@ -129,7 +130,7 @@ func NewStats() *Stats { bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "") bps.CopyRowCount = stats.NewCounter("", "") bps.CopyLoopCount = stats.NewCounter("", "") - + bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"}) return bps } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index cb0e722c299..512eca415a8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -150,6 +150,7 @@ func (ct *controller) run(ctx context.Context) { default: } log.Errorf("stream %v: %v, retrying after %v", ct.id, err, *retryDelay) + ct.blpStats.ErrorCounts.Add([]string{"Stream Error"}, 1) timer := time.NewTimer(*retryDelay) select { case <-ctx.Done(): @@ -192,6 +193,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) tablet, err = ct.tabletPicker.PickForStreaming(ctx) if err != nil { + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) return err } log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) @@ -203,6 +205,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { // Table names can have search patterns. Resolve them against the schema. tables, err := mysqlctl.ResolveTables(ctx, ct.mysqld, dbClient.DBName(), ct.source.Tables) if err != nil { + ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1) return vterrors.Wrap(err, "failed to resolve table names") } @@ -241,6 +244,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { vr := newVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre) return vr.Replicate(ctx) } + ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1) return fmt.Errorf("missing source") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 90e32c7684b..2a218d98a91 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -252,6 +252,21 @@ func (st *vrStats) register() { } return result }) + stats.NewCountersFuncWithMultiLabels( + "VReplicationErrors", + "Errors during vreplication", + []string{"workflow", "type"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64) + for _, ct := range st.controllers { + for key, val := range ct.blpStats.ErrorCounts.Counts() { + result[fmt.Sprintf("%d_%s", ct.id, key)] = val + } + } + return result + }) } func (st *vrStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index ad3ef6f388c..cdfb62ed718 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -103,6 +103,7 @@ func (vp *vplayer) play(ctx context.Context) error { plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.pkInfoMap, vp.copyState) if err != nil { + vp.vr.stats.ErrorCounts.Add([]string{"Plan"}, 1) return err } vp.replicatorPlan = plan @@ -366,6 +367,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } } if err := vp.applyEvent(ctx, event, mustSave); err != nil { + vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1) + log.Errorf("Error applying event: %s", err.Error()) return err } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 105492a4790..48135c99109 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -121,6 +121,7 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame func (vr *vreplicator) Replicate(ctx context.Context) error { err := vr.replicate(ctx) if err != nil { + log.Errorf("Replicate error: %s", err.Error()) if err := vr.setMessage(err.Error()); err != nil { log.Errorf("Failed to set error state: %v", err) } @@ -165,10 +166,12 @@ func (vr *vreplicator) replicate(ctx context.Context) error { return err } if err := newVCopier(vr).copyNext(ctx, settings); err != nil { + vr.stats.ErrorCounts.Add([]string{"Copy"}, 1) return err } case settings.StartPos.IsZero(): if err := newVCopier(vr).initTablesForCopy(ctx); err != nil { + vr.stats.ErrorCounts.Add([]string{"Copy"}, 1) return err } default: @@ -180,6 +183,7 @@ func (vr *vreplicator) replicate(ctx context.Context) error { return vr.setState(binlogplayer.BlpStopped, "Stopped after copy.") } if err := vr.setState(binlogplayer.BlpRunning, ""); err != nil { + vr.stats.ErrorCounts.Add([]string{"Replicate"}, 1) return err } return newVPlayer(vr, settings, nil, mysql.Position{}, "replicate").play(ctx) diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 8cc08f7a329..a02d273b87b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -37,6 +37,7 @@ func (uvs *uvstreamer) copy(ctx context.Context) error { tableName := uvs.tablesToCopy[0] log.V(2).Infof("Copystate not empty starting catchupAndCopy on table %s", tableName) if err := uvs.catchupAndCopy(ctx, tableName); err != nil { + uvs.vse.errorCounts.Add("Copy", 1) return err } } @@ -50,6 +51,7 @@ func (uvs *uvstreamer) catchupAndCopy(ctx context.Context, tableName string) err if !uvs.pos.IsZero() { if err := uvs.catchup(ctx); err != nil { log.Infof("catchupAndCopy: catchup returned %v", err) + uvs.vse.errorCounts.Add("Catchup", 1) return err } } @@ -265,6 +267,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { return nil }) if err != nil { + uvs.vse.errorCounts.Add("StreamRows", 1) return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 532a2e48ef0..5ddc4ecaeaf 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -73,14 +73,17 @@ type Engine struct { vschemaUpdates *stats.Counter // vstreamer metrics - vstreamerPhaseTimings *servenv.TimingsWrapper - vstreamerEventsStreamed *stats.Counter - vstreamerPacketSize *stats.GaugeFunc - vstreamerNumPackets *stats.Counter - resultStreamerNumRows *stats.Counter - resultStreamerNumPackets *stats.Counter - rowStreamerNumRows *stats.Counter - rowStreamerNumPackets *stats.Counter + vstreamerPhaseTimings *servenv.TimingsWrapper + vstreamerEventsStreamed *stats.Counter + vstreamerPacketSize *stats.GaugeFunc + vstreamerNumPackets *stats.Counter + resultStreamerNumRows *stats.Counter + resultStreamerNumPackets *stats.Counter + rowStreamerNumRows *stats.Counter + rowStreamerNumPackets *stats.Counter + errorCounts *stats.CountersWithSingleLabel + vstreamersCreated *stats.Counter + vstreamersEndedWithErrors *stats.Counter } // NewEngine creates a new Engine. @@ -102,14 +105,17 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, cell str vschemaErrors: env.Exporter().NewCounter("VSchemaErrors", "Count of VSchema errors"), vschemaUpdates: env.Exporter().NewCounter("VSchemaUpdates", "Count of VSchema updates. Does not include errors"), - vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"), - vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"), - vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize), - vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"), - resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"), - resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"), - rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"), - rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"), + vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"), + vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"), + vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize), + vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"), + resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"), + resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"), + rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"), + rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"), + vstreamersCreated: env.Exporter().NewCounter("VStreamersCreated", "Count of vstreamers created"), + vstreamersEndedWithErrors: env.Exporter().NewCounter("VStreamersEndedWithErrors", "Count of vstreamers that ended with errors"), + errorCounts: env.Exporter().NewCountersWithSingleLabel("VStreamerErrors", "Tracks errors in vstreamer", "type", "Catchup", "Copy", "Send", "TablePlan"), } env.Exporter().HandleFunc("/debug/tablet_vschema", vse.ServeHTTP) return vse diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 6ed3db5bf29..b3caa498fab 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -286,6 +286,9 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error { } } } + if err != nil { + uvs.vse.errorCounts.Add("Send", 1) + } return err } @@ -298,7 +301,7 @@ func (uvs *uvstreamer) sendEventsForCurrentPos() error { Type: binlogdatapb.VEventType_OTHER, }} if err := uvs.send(evs); err != nil { - return wrapError(err, uvs.pos) + return wrapError(err, uvs.pos, uvs.vse) } return nil } @@ -320,6 +323,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error { return vterrors.Wrap(err, "could not decode position") } if !curPos.AtLeast(pos) { + uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1) return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos @@ -362,6 +366,7 @@ func (uvs *uvstreamer) Stream() error { log.Info("TablePKs is not nil: starting vs.copy()") if err := uvs.copy(uvs.ctx); err != nil { log.Infof("uvstreamer.Stream() copy returned with err %s", err) + uvs.vse.errorCounts.Add("Copy", 1) return err } uvs.sendTestEvent("Copy Done") diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go index c9cf44a6c55..1d3f3e7e044 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go @@ -308,8 +308,6 @@ func resetMetrics(t *testing.T) { engine.resultStreamerNumRows.Reset() engine.rowStreamerNumRows.Reset() engine.vstreamerPhaseTimings.Reset() - engine.vstreamerPhaseTimings.Reset() - engine.vstreamerPhaseTimings.Reset() } func validateMetrics(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index cfe453298ca..6d387beee85 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -147,9 +147,12 @@ func (vs *vstreamer) Stream() error { //defer vs.cancel() ctx := context.Background() defer ctx.Done() + vs.vse.vstreamersCreated.Add(1) log.Infof("Starting Stream() with startPos %s", vs.startPos) pos, err := mysql.DecodePosition(vs.startPos) if err != nil { + vs.vse.errorCounts.Add("StreamRows", 1) + vs.vse.vstreamersEndedWithErrors.Add(1) return err } vs.pos = pos @@ -161,21 +164,21 @@ func (vs *vstreamer) replicate(ctx context.Context) error { // Ensure se is Open. If vttablet came up in a non_serving role, // the schema engine may not have been initialized. if err := vs.se.Open(); err != nil { - return wrapError(err, vs.pos) + return wrapError(err, vs.pos, vs.vse) } conn, err := binlog.NewBinlogConnection(vs.cp) if err != nil { - return wrapError(err, vs.pos) + return wrapError(err, vs.pos, vs.vse) } defer conn.Close() events, err := conn.StartBinlogDumpFromPosition(vs.ctx, vs.pos) if err != nil { - return wrapError(err, vs.pos) + return wrapError(err, vs.pos, vs.vse) } err = vs.parseEvents(vs.ctx, events) - return wrapError(err, vs.pos) + return wrapError(err, vs.pos, vs.vse) } // parseEvents parses and sends events. @@ -252,6 +255,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case binlogdatapb.VEventType_SAVEPOINT: bufferedEvents = append(bufferedEvents, vevent) default: + vs.vse.errorCounts.Add("BufferAndTransmit", 1) return fmt.Errorf("unexpected event: %v", vevent) } return nil @@ -280,6 +284,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } vevents, err := vs.parseEvent(ev) if err != nil { + vs.vse.errorCounts.Add("ParseEvent", 1) return err } for _, vevent := range vevents { @@ -287,6 +292,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog if err == io.EOF { return nil } + vs.vse.errorCounts.Add("BufferAndTransmit", 1) return fmt.Errorf("error sending event: %v", err) } } @@ -308,6 +314,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog if err == io.EOF { return nil } + vs.vse.errorCounts.Add("Send", 1) return fmt.Errorf("error sending event: %v", err) } } @@ -503,6 +510,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e vevent, err := vs.buildTablePlan(id, tm) if err != nil { + vs.vse.errorCounts.Add("TablePlan", 1) return nil, err } if vevent != nil { @@ -538,7 +546,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e if err != nil { return nil, err } - } for _, vevent := range vevents { vevent.Timestamp = int64(ev.Timestamp()) @@ -807,8 +814,10 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo return plan.filter(values) } -func wrapError(err error, stopPos mysql.Position) error { +func wrapError(err error, stopPos mysql.Position, vse *Engine) error { if err != nil { + vse.vstreamersEndedWithErrors.Add(1) + vse.errorCounts.Add("StreamEnded", 1) err = fmt.Errorf("stream (at source tablet) error @ %v: %v", stopPos, err) log.Error(err) return err