diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index dfe4006a5a8..735f95ab2fe 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -126,6 +126,9 @@ type QueryEngine struct { plans *cache.LRUCache queryRuleSources *rules.Map + queryStatsMu sync.RWMutex + queryStats map[string]*QueryStats + // Pools conns *connpool.Pool streamConns *connpool.Pool @@ -182,6 +185,7 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab plans: cache.NewLRUCache(int64(config.QueryPlanCacheSize)), queryRuleSources: rules.NewMap(), queryPoolWaiterCap: sync2.NewAtomicInt64(int64(config.QueryPoolWaiterCap)), + queryStats: make(map[string]*QueryStats), } qe.conns = connpool.New( @@ -478,53 +482,88 @@ func (qe *QueryEngine) QueryPlanCacheCap() int { return int(qe.plans.Capacity()) } +// QueryStats tracks query stats for export per planName/tableName +type QueryStats struct { + mu sync.Mutex + queryCount int64 + time time.Duration + mysqlTime time.Duration + rowCount int64 + errorCount int64 +} + +// AddStats adds the given stats for the planName.tableName +func (qe *QueryEngine) AddStats(planName, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) { + key := tableName + "." + planName + + qe.queryStatsMu.RLock() + stats, ok := qe.queryStats[key] + qe.queryStatsMu.RUnlock() + + if !ok { + // Check again with the write lock held and + // create a new record only if none exists + qe.queryStatsMu.Lock() + if stats, ok = qe.queryStats[key]; !ok { + stats = &QueryStats{} + qe.queryStats[key] = stats + } + qe.queryStatsMu.Unlock() + } + + stats.mu.Lock() + stats.queryCount += queryCount + stats.time += duration + stats.mysqlTime += mysqlTime + stats.rowCount += rowCount + stats.errorCount += errorCount + stats.mu.Unlock() +} + func (qe *QueryEngine) getQueryCount() map[string]int64 { - f := func(plan *TabletPlan) int64 { - queryCount, _, _, _, _ := plan.Stats() - return queryCount + qstats := make(map[string]int64) + qe.queryStatsMu.RLock() + defer qe.queryStatsMu.RUnlock() + for k, qs := range qe.queryStats { + qs.mu.Lock() + qstats[k] = qs.queryCount + qs.mu.Unlock() } - return qe.getQueryStats(f) + return qstats } func (qe *QueryEngine) getQueryTime() map[string]int64 { - f := func(plan *TabletPlan) int64 { - _, time, _, _, _ := plan.Stats() - return int64(time) + qstats := make(map[string]int64) + qe.queryStatsMu.RLock() + defer qe.queryStatsMu.RUnlock() + for k, qs := range qe.queryStats { + qs.mu.Lock() + qstats[k] = int64(qs.time) + qs.mu.Unlock() } - return qe.getQueryStats(f) + return qstats } func (qe *QueryEngine) getQueryRowCount() map[string]int64 { - f := func(plan *TabletPlan) int64 { - _, _, _, rowCount, _ := plan.Stats() - return rowCount + qstats := make(map[string]int64) + qe.queryStatsMu.RLock() + defer qe.queryStatsMu.RUnlock() + for k, qs := range qe.queryStats { + qs.mu.Lock() + qstats[k] = qs.rowCount + qs.mu.Unlock() } - return qe.getQueryStats(f) + return qstats } func (qe *QueryEngine) getQueryErrorCount() map[string]int64 { - f := func(plan *TabletPlan) int64 { - _, _, _, _, errorCount := plan.Stats() - return errorCount - } - return qe.getQueryStats(f) -} - -type queryStatsFunc func(*TabletPlan) int64 - -func (qe *QueryEngine) getQueryStats(f queryStatsFunc) map[string]int64 { - keys := qe.plans.Keys() qstats := make(map[string]int64) - for _, v := range keys { - if plan := qe.peekQuery(v); plan != nil { - table := plan.TableName() - if table.IsEmpty() { - table = sqlparser.NewTableIdent("Join") - } - planType := plan.PlanID.String() - data := f(plan) - qstats[table.String()+"."+planType] += data - } + qe.queryStatsMu.RLock() + defer qe.queryStatsMu.RUnlock() + for k, qs := range qe.queryStats { + qs.mu.Lock() + qstats[k] = qs.errorCount + qs.mu.Unlock() } return qstats } @@ -589,6 +628,7 @@ func (qe *QueryEngine) handleHTTPQueryStats(response http.ResponseWriter, reques pqstats.Table = plan.TableName().String() pqstats.Plan = plan.PlanID pqstats.QueryCount, pqstats.Time, pqstats.MysqlTime, pqstats.RowCount, pqstats.ErrorCount = plan.Stats() + qstats = append(qstats, pqstats) } } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 7b754c9bff6..3ba89272379 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -79,11 +79,19 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { tabletenv.QueryStats.Add(planName, duration) tabletenv.RecordUserQuery(qre.ctx, qre.plan.TableName(), "Execute", int64(duration)) + mysqlTime := qre.logStats.MysqlResponseTime + tableName := qre.plan.TableName().String() + if tableName == "" { + tableName = "Join" + } + if reply == nil { - qre.plan.AddStats(1, duration, qre.logStats.MysqlResponseTime, 0, 1) + qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, 0, 1) + qre.plan.AddStats(1, duration, mysqlTime, 0, 1) return } - qre.plan.AddStats(1, duration, qre.logStats.MysqlResponseTime, int64(reply.RowsAffected), 0) + qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0) + qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), 0) qre.logStats.RowsAffected = int(reply.RowsAffected) qre.logStats.Rows = reply.Rows tabletenv.ResultStats.Add(int64(len(reply.Rows)))