Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,330 changes: 668 additions & 662 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/proto/topodata/topodata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 13 additions & 16 deletions go/vt/proto/vtrpc/vtrpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
DirectiveIgnoreMaxMemoryRows = "IGNORE_MAX_MEMORY_ROWS"
// DirectiveAllowScatter lets scatter plans pass through even when they are turned off by `no-scatter`.
DirectiveAllowScatter = "ALLOW_SCATTER"
// DirectiveWorkloadName specifies the name of the client application workload issuing the query.
DirectiveWorkloadName = "WORKLOAD_NAME"
)

func isNonSpace(r rune) bool {
Expand Down Expand Up @@ -405,3 +407,23 @@ func AllowScatterDirective(stmt Statement) bool {
}
return directives.IsSet(DirectiveAllowScatter)
}

// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of
// the query directive that specifies it.
func GetWorkloadNameFromStatement(statement Statement) string {
var directives CommentDirectives
switch stmt := statement.(type) {
case *Select:
directives = ExtractCommentDirectives(stmt.Comments)
case *Insert:
directives = ExtractCommentDirectives(stmt.Comments)
case *Update:
directives = ExtractCommentDirectives(stmt.Comments)
case *Delete:
directives = ExtractCommentDirectives(stmt.Comments)
default:
return ""
}

return directives.GetString(DirectiveWorkloadName, "")
}
4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func (t *noopVCursor) SetWorkload(querypb.ExecuteOptions_Workload) {
panic("implement me")
}

func (t *noopVCursor) SetWorkloadName(string) {
panic("implement me")
}

func (t *noopVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) {
panic("implement me")
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type (
SetTransactionMode(vtgatepb.TransactionMode)
SetWorkload(querypb.ExecuteOptions_Workload)
SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion)
SetWorkloadName(string)
SetFoundRows(uint64)

SetDDLStrategy(string)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt))

// Normalize if possible and retry.
if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.MustRewriteAST(stmt, qo.getSelectLimit() > 0) {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,12 @@ func (vc *vcursorImpl) SetPlannerVersion(v planbuilder.PlannerVersion) {
vc.safeSession.GetOrCreateOptions().PlannerVersion = v
}

func (vc *vcursorImpl) SetWorkloadName(workloadName string) {
if workloadName != "" {
vc.safeSession.GetOrCreateOptions().WorkloadName = workloadName
}
}

// SetFoundRows implements the SessionActions interface
func (vc *vcursorImpl) SetFoundRows(foundRows uint64) {
vc.safeSession.FoundRows = foundRows
Expand Down
35 changes: 25 additions & 10 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type QueryEngine struct {
// stats
queryCounts, queryTimes, queryRowCounts, queryErrorCounts *stats.CountersWithMultiLabels

// stats flags
enablePerWorkloadTableMetrics bool

// Loggers
accessCheckerLogger *logutil.ThrottledLogger
}
Expand All @@ -168,11 +171,12 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
}

qe := &QueryEngine{
env: env,
se: se,
tables: make(map[string]*schema.Table),
plans: cache.NewDefaultCacheImpl(cacheCfg),
queryRuleSources: rules.NewMap(),
env: env,
se: se,
tables: make(map[string]*schema.Table),
plans: cache.NewDefaultCacheImpl(cacheCfg),
queryRuleSources: rules.NewMap(),
enablePerWorkloadTableMetrics: config.EnablePerWorkloadTableMetrics,
}

qe.conns = connpool.NewPool(env, "ConnPool", config.OltpReadPool)
Expand Down Expand Up @@ -222,10 +226,16 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
env.Exporter().NewGaugeFunc("QueryCacheSize", "Query engine query cache size", qe.plans.UsedCapacity)
env.Exporter().NewGaugeFunc("QueryCacheCapacity", "Query engine query cache capacity", qe.plans.MaxCapacity)
env.Exporter().NewCounterFunc("QueryCacheEvictions", "Query engine query cache evictions", qe.plans.Evictions)
qe.queryCounts = env.Exporter().NewCountersWithMultiLabels("QueryCounts", "query counts", []string{"Table", "Plan"})
qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", []string{"Table", "Plan"})
qe.queryRowCounts = env.Exporter().NewCountersWithMultiLabels("QueryRowCounts", "query row counts", []string{"Table", "Plan"})
qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", []string{"Table", "Plan"})

labels := []string{"Table", "Plan"}
if config.EnablePerWorkloadTableMetrics {
labels = []string{"Table", "Plan", "Workload"}
}

qe.queryCounts = env.Exporter().NewCountersWithMultiLabels("QueryCounts", "query counts", labels)
qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", labels)
qe.queryRowCounts = env.Exporter().NewCountersWithMultiLabels("QueryRowCounts", "query row counts", labels)
qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", labels)

env.Exporter().HandleFunc("/debug/hotrows", qe.txSerializer.ServeHTTP)
env.Exporter().HandleFunc("/debug/tablet_plans", qe.handleHTTPQueryPlans)
Expand Down Expand Up @@ -424,9 +434,14 @@ func (qe *QueryEngine) QueryPlanCacheLen() int {
}

// AddStats adds the given stats for the planName.tableName
func (qe *QueryEngine) AddStats(planName, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) {
func (qe *QueryEngine) AddStats(planName, tableName, workload string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) {
// table names can contain "." characters, replace them!
keys := []string{tableName, planName}
// Only use the workload as a label if that's enabled in the configuration.
if qe.enablePerWorkloadTableMetrics {
keys = append(keys, workload)
}

qe.queryCounts.Add(keys, queryCount)
qe.queryTimes.Add(keys, int64(duration))
qe.queryRowCounts.Add(keys, rowCount)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
}

if reply == nil {
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, 0, 1)
qre.tsv.qe.AddStats(planName, tableName, qre.options.GetWorkloadName(), 1, duration, mysqlTime, 0, 1)
qre.plan.AddStats(1, duration, mysqlTime, 0, 0, 1)
return
}
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.tsv.qe.AddStats(planName, tableName, qre.options.GetWorkloadName(), 1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0)
qre.logStats.RowsAffected = int(reply.RowsAffected)
qre.logStats.Rows = reply.Rows
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func init() {

flag.BoolVar(&enableReplicationReporter, "enable_replication_reporter", false, "Use polling to track replication lag.")
flag.BoolVar(&currentConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.")

flag.BoolVar(&currentConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload")
}

// Init must be called after flag.Parse, and before doing any other operations.
Expand Down Expand Up @@ -286,6 +288,8 @@ type TabletConfig struct {

EnforceStrictTransTables bool `json:"-"`
EnableOnlineDDL bool `json:"-"`

EnablePerWorkloadTableMetrics bool `json:"-"`
}

// ConnPoolConfig contains the config for a conn pool.
Expand Down Expand Up @@ -486,6 +490,8 @@ var defaultConfig = TabletConfig{

EnforceStrictTransTables: true,
EnableOnlineDDL: true,

EnablePerWorkloadTableMetrics: false,
}

// defaultTxThrottlerConfig formats the default throttlerdata.Configuration
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ func (tsv *TabletServer) execRequest(
span, ctx := trace.NewSpan(ctx, "TabletServer."+requestName)
if options != nil {
span.Annotate("isolation-level", options.TransactionIsolation)
span.Annotate("workload_name", options.WorkloadName)
}
trace.AnnotateSQL(span, sql)
if target != nil {
Expand Down
4 changes: 4 additions & 0 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ message ExecuteOptions {
// if the user has created temp tables, Vitess will not reuse plans created for this session in other sessions.
// The current session can still use other sessions cached plans.
bool has_created_temp_tables = 12;

// WorkloadName specifies the name of the workload as indicated in query directives. This is used for instrumentation
// in metrics and tracing spans. We use 15 here so that it matches what is upstreamed in v16+
string WorkloadName = 15;
}

// Field describes a single column returned by a query
Expand Down