diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 0dad013e307..1cde5626f7d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -88,6 +88,7 @@ type Engine struct { // vstreamer metrics vstreamerPhaseTimings *servenv.TimingsWrapper + vstreamerCount *stats.Gauge vstreamerEventsStreamed *stats.Counter vstreamerPacketSize *stats.GaugeFunc vstreamerNumPackets *stats.Counter @@ -125,6 +126,7 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot vschemaUpdates: env.Exporter().NewCounter("VSchemaUpdates", "Count of VSchema updates. Does not include errors"), vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"), + vstreamerCount: env.Exporter().NewGauge("VStreamerCount", "Current number of vstreamers"), vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"), vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize), vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"), diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index da61163a6ca..549298a441a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -163,7 +163,11 @@ func (vs *vstreamer) Cancel() { func (vs *vstreamer) Stream() error { //defer vs.cancel() ctx := context.Background() - defer ctx.Done() + vs.vse.vstreamerCount.Add(1) + defer func() { + ctx.Done() + vs.vse.vstreamerCount.Add(-1) + }() vs.vse.vstreamersCreated.Add(1) log.Infof("Starting Stream() with startPos %s", vs.startPos) pos, err := mysql.DecodePosition(vs.startPos)