diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index c4782b467c4..c7fa2ddb254 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -80,6 +80,7 @@ type Engine struct { // vstreamer metrics vstreamerPhaseTimings *servenv.TimingsWrapper + vstreamerCount *stats.Gauge vstreamerEventsStreamed *stats.Counter vstreamerPacketSize *stats.GaugeFunc vstreamerNumPackets *stats.Counter @@ -115,6 +116,7 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot vschemaUpdates: env.Exporter().NewCounter("VSchemaUpdates", "Count of VSchema updates. Does not include errors"), vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"), + vstreamerCount: env.Exporter().NewGauge("VStreamerCount", "Current number of vstreamers"), vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"), vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize), vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"), @@ -126,7 +128,6 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot vstreamersEndedWithErrors: env.Exporter().NewCounter("VStreamersEndedWithErrors", "Count of vstreamers that ended with errors"), errorCounts: env.Exporter().NewCountersWithSingleLabel("VStreamerErrors", "Tracks errors in vstreamer", "type", "Catchup", "Copy", "Send", "TablePlan"), } - env.Exporter().NewGaugeFunc("VStreamerCount", "Current number of vstreamers", vse.getVStreamerCount) env.Exporter().HandleFunc("/debug/tablet_vschema", vse.ServeHTTP) return vse } @@ -376,12 +377,6 @@ func (vse *Engine) setWatch() { }) } -func (vse *Engine) getVStreamerCount() int64 { - vse.mu.Lock() - defer vse.mu.Unlock() - return int64(len(vse.streamers)) -} - func getPacketSize() int64 { return int64(*defaultPacketSize) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index c5cbb18883d..f5fc349df4b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -144,7 +144,11 @@ func (vs *vstreamer) Cancel() { func (vs *vstreamer) Stream() error { //defer vs.cancel() ctx := context.Background() - defer ctx.Done() + vs.vse.vstreamerCount.Add(1) + defer func() { + ctx.Done() + vs.vse.vstreamerCount.Add(-1) + }() vs.vse.vstreamersCreated.Add(1) log.Infof("Starting Stream() with startPos %s", vs.startPos) pos, err := mysql.DecodePosition(vs.startPos)