Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 73 additions & 33 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type QueryEngine struct {
plans *cache.LRUCache
queryRuleSources *rules.Map

queryStatsMu sync.RWMutex
queryStats map[string]*QueryStats

// Pools
conns *connpool.Pool
streamConns *connpool.Pool
Expand Down Expand Up @@ -182,6 +185,7 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab
plans: cache.NewLRUCache(int64(config.QueryPlanCacheSize)),
queryRuleSources: rules.NewMap(),
queryPoolWaiterCap: sync2.NewAtomicInt64(int64(config.QueryPoolWaiterCap)),
queryStats: make(map[string]*QueryStats),
}

qe.conns = connpool.New(
Expand Down Expand Up @@ -478,53 +482,88 @@ func (qe *QueryEngine) QueryPlanCacheCap() int {
return int(qe.plans.Capacity())
}

// QueryStats tracks query stats for export per planName/tableName
type QueryStats struct {
mu sync.Mutex
queryCount int64
time time.Duration
mysqlTime time.Duration
rowCount int64
errorCount int64
}

// AddStats adds the given stats for the planName.tableName
func (qe *QueryEngine) AddStats(planName, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) {
key := tableName + "." + planName

qe.queryStatsMu.RLock()
stats, ok := qe.queryStats[key]
qe.queryStatsMu.RUnlock()

if !ok {
// Check again with the write lock held and
// create a new record only if none exists
qe.queryStatsMu.Lock()
if stats, ok = qe.queryStats[key]; !ok {
stats = &QueryStats{}
qe.queryStats[key] = stats
}
qe.queryStatsMu.Unlock()
}

stats.mu.Lock()
stats.queryCount += queryCount
stats.time += duration
stats.mysqlTime += mysqlTime
stats.rowCount += rowCount
stats.errorCount += errorCount
stats.mu.Unlock()
}

func (qe *QueryEngine) getQueryCount() map[string]int64 {
f := func(plan *TabletPlan) int64 {
queryCount, _, _, _, _ := plan.Stats()
return queryCount
qstats := make(map[string]int64)
qe.queryStatsMu.RLock()
defer qe.queryStatsMu.RUnlock()
for k, qs := range qe.queryStats {
qs.mu.Lock()
qstats[k] = qs.queryCount
qs.mu.Unlock()
}
return qe.getQueryStats(f)
return qstats
}

func (qe *QueryEngine) getQueryTime() map[string]int64 {
f := func(plan *TabletPlan) int64 {
_, time, _, _, _ := plan.Stats()
return int64(time)
qstats := make(map[string]int64)
qe.queryStatsMu.RLock()
defer qe.queryStatsMu.RUnlock()
for k, qs := range qe.queryStats {
qs.mu.Lock()
qstats[k] = int64(qs.time)
qs.mu.Unlock()
}
return qe.getQueryStats(f)
return qstats
}

func (qe *QueryEngine) getQueryRowCount() map[string]int64 {
f := func(plan *TabletPlan) int64 {
_, _, _, rowCount, _ := plan.Stats()
return rowCount
qstats := make(map[string]int64)
qe.queryStatsMu.RLock()
defer qe.queryStatsMu.RUnlock()
for k, qs := range qe.queryStats {
qs.mu.Lock()
qstats[k] = qs.rowCount
qs.mu.Unlock()
}
return qe.getQueryStats(f)
return qstats
}

func (qe *QueryEngine) getQueryErrorCount() map[string]int64 {
f := func(plan *TabletPlan) int64 {
_, _, _, _, errorCount := plan.Stats()
return errorCount
}
return qe.getQueryStats(f)
}

type queryStatsFunc func(*TabletPlan) int64

func (qe *QueryEngine) getQueryStats(f queryStatsFunc) map[string]int64 {
keys := qe.plans.Keys()
qstats := make(map[string]int64)
for _, v := range keys {
if plan := qe.peekQuery(v); plan != nil {
table := plan.TableName()
if table.IsEmpty() {
table = sqlparser.NewTableIdent("Join")
}
planType := plan.PlanID.String()
data := f(plan)
qstats[table.String()+"."+planType] += data
}
qe.queryStatsMu.RLock()
defer qe.queryStatsMu.RUnlock()
for k, qs := range qe.queryStats {
qs.mu.Lock()
qstats[k] = qs.errorCount
qs.mu.Unlock()
}
return qstats
}
Expand Down Expand Up @@ -589,6 +628,7 @@ func (qe *QueryEngine) handleHTTPQueryStats(response http.ResponseWriter, reques
pqstats.Table = plan.TableName().String()
pqstats.Plan = plan.PlanID
pqstats.QueryCount, pqstats.Time, pqstats.MysqlTime, pqstats.RowCount, pqstats.ErrorCount = plan.Stats()

qstats = append(qstats, pqstats)
}
}
Expand Down
12 changes: 10 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,19 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
tabletenv.QueryStats.Add(planName, duration)
tabletenv.RecordUserQuery(qre.ctx, qre.plan.TableName(), "Execute", int64(duration))

mysqlTime := qre.logStats.MysqlResponseTime
tableName := qre.plan.TableName().String()
if tableName == "" {
tableName = "Join"
}

if reply == nil {
qre.plan.AddStats(1, duration, qre.logStats.MysqlResponseTime, 0, 1)
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, 0, 1)
qre.plan.AddStats(1, duration, mysqlTime, 0, 1)
return
}
qre.plan.AddStats(1, duration, qre.logStats.MysqlResponseTime, int64(reply.RowsAffected), 0)
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.logStats.RowsAffected = int(reply.RowsAffected)
qre.logStats.Rows = reply.Rows
tabletenv.ResultStats.Add(int64(len(reply.Rows)))
Expand Down