diff --git a/go/test/endtoend/preparestmt/stmt_methods_test.go b/go/test/endtoend/preparestmt/stmt_methods_test.go index 38ee96b9272..3b62cc9e061 100644 --- a/go/test/endtoend/preparestmt/stmt_methods_test.go +++ b/go/test/endtoend/preparestmt/stmt_methods_test.go @@ -450,6 +450,11 @@ func TestSpecializedPlan(t *testing.T) { dbo := Connect(t, "interpolateParams=false") defer dbo.Close() + oMap := getVarValue[map[string]any](t, "OptimizedQueryExecutions", clusterInstance.VtgateProcess.GetVars) + initExecCount := getVarValue[float64](t, "Passthrough", func() map[string]any { + return oMap + }) + queries := []struct { query string args []any @@ -475,6 +480,11 @@ func TestSpecializedPlan(t *testing.T) { } require.NoError(t, stmt.Close()) } + oMap = getVarValue[map[string]any](t, "OptimizedQueryExecutions", clusterInstance.VtgateProcess.GetVars) + finalExecCount := getVarValue[float64](t, "Passthrough", func() map[string]any { + return oMap + }) + require.EqualValues(t, 15, finalExecCount-initExecCount) // Validate specialized plan. p := getPlanWhenReady(t, queries[0].query, 100*time.Millisecond, clusterInstance.VtgateProcess.ReadQueryPlans) @@ -519,3 +529,20 @@ func getPlanWhenReady(t *testing.T, sql string, timeout time.Duration, plansFunc } } } + +func getVarValue[T any](t *testing.T, key string, varFunc func() map[string]any) T { + t.Helper() + + vars := varFunc() + require.NotNil(t, vars) + + value, exists := vars[key] + if !exists { + return *new(T) + } + castValue, ok := value.(T) + if !ok { + t.Errorf("unexpected type, want: %T, got %T", new(T), value) + } + return castValue +} diff --git a/go/test/vschemawrapper/vschema_wrapper.go b/go/test/vschemawrapper/vschema_wrapper.go index 9727b598bfe..4a526b434a6 100644 --- a/go/test/vschemawrapper/vschema_wrapper.go +++ b/go/test/vschemawrapper/vschema_wrapper.go @@ -69,7 +69,7 @@ func NewVschemaWrapper( Collation: env.CollationEnv().DefaultConnectionCharset(), DefaultTabletType: topodatapb.TabletType_PRIMARY, SetVarEnabled: true, - }) + }, nil) if err != nil { return nil, err } diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 7d25758a30e..7c838da4330 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -33,6 +33,10 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" @@ -42,11 +46,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/queryservice" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options, srvTopoCounts *stats.CountersWithSingleLabel) error { @@ -75,6 +74,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) eConfig := vtgate.ExecutorConfig{ + Name: "TestExecutor", Normalize: opts.Normalize, StreamSize: streamSize, AllowScatter: true, diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index df7093a773f..5ff6b5f04c8 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -61,6 +61,10 @@ type noopVCursor struct { inTx bool } +func (t *noopVCursor) GetExecutionMetrics() *Metrics { + panic("implement me") +} + func (t *noopVCursor) SetExecQueryTimeout(timeout *int) { panic("implement me") } @@ -467,6 +471,12 @@ type loggingVCursor struct { onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool) onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error) onRecordMirrorStatsFn func(time.Duration, time.Duration, error) + + metrics *Metrics +} + +func (f *loggingVCursor) GetExecutionMetrics() *Metrics { + return f.metrics } func (f *loggingVCursor) HasCreatedTempTable() { diff --git a/go/vt/vtgate/engine/metrics.go b/go/vt/vtgate/engine/metrics.go new file mode 100644 index 00000000000..57dfce41926 --- /dev/null +++ b/go/vt/vtgate/engine/metrics.go @@ -0,0 +1,32 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/servenv" +) + +type Metrics struct { + optimizedQueryExec *stats.CountersWithSingleLabel +} + +func InitMetrics(exporter *servenv.Exporter) *Metrics { + return &Metrics{ + optimizedQueryExec: exporter.NewCountersWithSingleLabel("OptimizedQueryExecutions", "Counts optimized queries executed at VTGate by plan type.", "Plan"), + } +} diff --git a/go/vt/vtgate/engine/specialized.go b/go/vt/vtgate/engine/plan_switcher.go similarity index 94% rename from go/vt/vtgate/engine/specialized.go rename to go/vt/vtgate/engine/plan_switcher.go index 6c89b39bca1..e9e10425869 100644 --- a/go/vt/vtgate/engine/specialized.go +++ b/go/vt/vtgate/engine/plan_switcher.go @@ -22,7 +22,6 @@ import ( "strings" "vitess.io/vitess/go/slice" - "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" ) @@ -83,6 +82,7 @@ func (s *PlanSwitcher) TryExecute( wantfields bool, ) (*sqltypes.Result, error) { if s.metCondition(bindVars) { + s.addOptimizedExecStats(vcursor) return s.Optimized.TryExecute(ctx, vcursor, bindVars, wantfields) } if s.Baseline == nil { @@ -99,6 +99,7 @@ func (s *PlanSwitcher) TryStreamExecute( callback func(*sqltypes.Result) error, ) error { if s.metCondition(bindVars) { + s.addOptimizedExecStats(vcursor) return s.Optimized.TryStreamExecute(ctx, vcursor, bindVars, wantfields, callback) } if s.Baseline == nil { @@ -152,4 +153,9 @@ func (s *PlanSwitcher) metCondition(bindVars map[string]*querypb.BindVariable) b return true } +func (s *PlanSwitcher) addOptimizedExecStats(vcursor VCursor) { + planType := getPlanType(s.Optimized) + vcursor.GetExecutionMetrics().optimizedQueryExec.Add(planType.String(), 1) +} + var _ Primitive = (*PlanSwitcher)(nil) diff --git a/go/vt/vtgate/engine/plan_switcher_test.go b/go/vt/vtgate/engine/plan_switcher_test.go new file mode 100644 index 00000000000..10c23464bab --- /dev/null +++ b/go/vt/vtgate/engine/plan_switcher_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/servenv" +) + +// TestPlanSwitcherMetrics tests that the PlanSwitcher increments the correct metric +func TestPlanSwitcherMetrics(t *testing.T) { + p := &PlanSwitcher{ + Optimized: &TransactionStatus{}, + } + + vc := &loggingVCursor{ + metrics: InitMetrics(servenv.NewExporter("PlanTest", "")), + } + initial := vc.metrics.optimizedQueryExec.Counts() + _, err := p.TryExecute(context.Background(), vc, nil, false) + require.NoError(t, err) + after := vc.metrics.optimizedQueryExec.Counts() + require.EqualValues(t, 1, after["MultiShard"]-initial["MultiShard"]) +} diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index be65bcde831..a394d391625 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -149,6 +149,8 @@ type ( RecordMirrorStats(time.Duration, time.Duration, error) SetLastInsertID(uint64) + + GetExecutionMetrics() *Metrics } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index caa7fa9c6e5..f6674b07677 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -111,6 +111,7 @@ func init() { // the abilities of the underlying vttablets. type ( ExecutorConfig struct { + Name string Normalize bool StreamSize int // AllowScatter will fail planning if set to false and a plan contains any scatter queries @@ -120,7 +121,8 @@ type ( } Executor struct { - config ExecutorConfig + config ExecutorConfig + exporter *servenv.Exporter env *vtenv.Environment serv srvtopo.Server @@ -128,6 +130,7 @@ type ( resolver *Resolver scatterConn *ScatterConn txConn *TxConn + metrics econtext.Metrics mu sync.Mutex vschema *vindexes.VSchema @@ -147,6 +150,10 @@ type ( vConfig econtext.VCursorConfig ddlConfig dynamicconfig.DDL } + + Metrics struct { + engineMetrics *engine.Metrics + } ) var executorOnce sync.Once @@ -180,6 +187,7 @@ func NewExecutor( ) *Executor { e := &Executor{ config: eConfig, + exporter: servenv.NewExporter(eConfig.Name, ""), env: env, serv: serv, cell: cell, @@ -194,6 +202,9 @@ func NewExecutor( } // setting the vcursor config. e.initVConfig(warnOnShardedOnly, pv) + e.metrics = &Metrics{ + engineMetrics: engine.InitMetrics(e.exporter), + } // we subscribe to update from the VSchemaManager e.vm = &VSchemaManager{ @@ -1117,7 +1128,7 @@ func (e *Executor) fetchOrCreatePlan( } query, comments := sqlparser.SplitMarginComments(queryString) - vcursor, _ = econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, nullResultsObserver{}, e.vConfig) + vcursor, _ = e.newVCursor(safeSession, comments, logStats) var setVarComment string if e.vConfig.SetVarEnabled { @@ -1163,6 +1174,10 @@ func (e *Executor) fetchOrCreatePlan( return plan, vcursor, stmt, nil } +func (e *Executor) newVCursor(safeSession *econtext.SafeSession, comments sqlparser.MarginComments, logStats *logstats.LogStats) (*econtext.VCursorImpl, error) { + return econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, nullResultsObserver{}, e.vConfig, e.metrics) +} + func (e *Executor) tryOptimizedPlan( ctx context.Context, vcursor *econtext.VCursorImpl, @@ -1801,3 +1816,7 @@ func fkMode(foreignkey string) vschemapb.Keyspace_ForeignKeyMode { } return vschemapb.Keyspace_unspecified } + +func (m *Metrics) GetExecutionMetrics() *engine.Metrics { + return m.engineMetrics +} diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 88455e8331f..e65a5264841 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -28,8 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" - "vitess.io/vitess/go/cache/theine" "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/sqltypes" @@ -45,6 +43,7 @@ import ( "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vtgate/engine" + econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/sandboxconn" @@ -245,6 +244,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex func createExecutorConfig() ExecutorConfig { return ExecutorConfig{ + Name: "TestExecutor", StreamSize: 10, AllowScatter: true, } @@ -252,6 +252,7 @@ func createExecutorConfig() ExecutorConfig { func createExecutorConfigWithNormalizer() ExecutorConfig { return ExecutorConfig{ + Name: "TestExecutor", StreamSize: 10, AllowScatter: true, Normalize: true, diff --git a/go/vt/vtgate/executor_plan_test.go b/go/vt/vtgate/executor_plan_test.go index d04c6d64c35..744cf7ef4d8 100644 --- a/go/vt/vtgate/executor_plan_test.go +++ b/go/vt/vtgate/executor_plan_test.go @@ -119,9 +119,7 @@ func TestDeferredOptimization(t *testing.T) { resolver := newTestResolver(ctx, nil, nil, "") executor := Executor{ - config: ExecutorConfig{ - AllowScatter: true, - }, + config: createExecutorConfig(), env: env, resolver: resolver, vschema: vindexes.BuildVSchema(result, parser), diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index e30e8ff8055..0ed67cb8a27 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -132,7 +132,7 @@ func TestPlanKey(t *testing.T) { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { ss := econtext.NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}) resolver := &fakeResolver{resolveShards: tc.resolvedShard} - vc, _ := econtext.NewVCursorImpl(ss, makeComments(""), e, nil, e.vm, e.VSchema(), resolver, nil, nullResultsObserver{}, cfg) + vc, _ := econtext.NewVCursorImpl(ss, makeComments(""), e, nil, e.vm, e.VSchema(), resolver, nil, nullResultsObserver{}, cfg, nil) key := buildPlanKey(ctx, vc, "SELECT 1", tc.setVarComment) require.Equal(t, tc.expectedPlanPrefixKey, key.DebugString(), "test case %d", i) }) @@ -1616,8 +1616,9 @@ var pv = querypb.ExecuteOptions_Gen4 func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + + emptyvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), nil) + unshardedvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), nil) query1 := "select * from music_user_map where id = 1" plan1, logStats := getPlanCached(t, ctx, r, emptyvc.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1683,7 +1684,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + emptyvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), nil) query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true) @@ -1702,7 +1703,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { t.Run("Skip Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + unshardedvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), nil) // Skip cache using directive query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" @@ -1715,12 +1716,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + ksIDVc1, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), nil) getPlanCached(t, ctx, r, ksIDVc1.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + ksIDVc2, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), nil) getPlanCached(t, ctx, r, ksIDVc2.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1729,7 +1730,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + emptyvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), nil) query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */) @@ -1745,7 +1746,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Skip Cache", func(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + unshardedvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), nil) query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1756,12 +1757,12 @@ func TestGetPlanCacheNormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + ksIDVc1, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), nil) getPlanCached(t, ctx, r, ksIDVc1.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + ksIDVc2, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), nil) getPlanCached(t, ctx, r, ksIDVc2.SafeSession, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1770,8 +1771,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { func TestGetPlanNormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + emptyvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), nil) + unshardedvc, _ := r.newVCursor(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), nil) query1 := "select * from music_user_map where id = 1" // 163 -- 80 query2 := "select * from music_user_map where id = 2" diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index 0b3ab09b8cc..99b6036a05a 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -28,8 +28,6 @@ import ( "github.com/google/uuid" "golang.org/x/exp/maps" - "vitess.io/vitess/go/vt/sysvars" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/config" "vitess.io/vitess/go/mysql/sqlerror" @@ -47,6 +45,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/sysvars" "vitess.io/vitess/go/vt/topo" topoprotopb "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" @@ -145,6 +144,10 @@ type ( ) ([]*srvtopo.ResolvedShard, [][][]sqltypes.Value, error) } + Metrics interface { + GetExecutionMetrics() *engine.Metrics + } + // VCursorImpl implements the VCursor functionality used by dependent // packages to call back into VTGate. VCursorImpl struct { @@ -158,6 +161,7 @@ type ( resolver Resolver topoServer *topo.Server logStats *logstats.LogStats + metrics Metrics // fkChecksState stores the state of foreign key checks variable. // This state is meant to be the final fk checks state after consulting the @@ -201,6 +205,7 @@ func NewVCursorImpl( serv srvtopo.Server, observer ResultsObserver, cfg VCursorConfig, + metrics Metrics, ) (*VCursorImpl, error) { keyspace, tabletType, destination, err := ParseDestinationTarget(safeSession.TargetString, cfg.DefaultTabletType, vschema) if err != nil { @@ -226,12 +231,13 @@ func NewVCursorImpl( marginComments: marginComments, executor: executor, logStats: logStats, - resolver: resolver, - vschema: vschema, - vm: vm, - topoServer: ts, + metrics: metrics, - observer: observer, + resolver: resolver, + vschema: vschema, + vm: vm, + topoServer: ts, + observer: observer, }, nil } @@ -262,16 +268,18 @@ func (vc *VCursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { clonedCtx := callerid.NewContext(ctx, callerId, immediateCallerId) v := &VCursorImpl{ - config: vc.config, - SafeSession: NewAutocommitSession(vc.SafeSession.Session), - keyspace: vc.keyspace, - tabletType: vc.tabletType, - destination: vc.destination, - marginComments: vc.marginComments, - executor: vc.executor, - resolver: vc.resolver, - topoServer: vc.topoServer, - logStats: &logstats.LogStats{Ctx: clonedCtx}, + config: vc.config, + SafeSession: NewAutocommitSession(vc.SafeSession.Session), + keyspace: vc.keyspace, + tabletType: vc.tabletType, + destination: vc.destination, + marginComments: vc.marginComments, + executor: vc.executor, + resolver: vc.resolver, + topoServer: vc.topoServer, + logStats: &logstats.LogStats{Ctx: clonedCtx}, + metrics: vc.metrics, + ignoreMaxMemoryRows: vc.ignoreMaxMemoryRows, vschema: vc.vschema, vm: vc.vm, @@ -303,6 +311,7 @@ func (vc *VCursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso resolver: vc.resolver, topoServer: vc.topoServer, logStats: &logstats.LogStats{Ctx: clonedCtx}, + metrics: vc.metrics, ignoreMaxMemoryRows: vc.ignoreMaxMemoryRows, vschema: vc.vschema, @@ -328,14 +337,21 @@ func (vc *VCursorImpl) cloneWithAutocommitSession() *VCursorImpl { marginComments: vc.marginComments, executor: vc.executor, logStats: vc.logStats, - resolver: vc.resolver, - vschema: vc.vschema, - vm: vc.vm, - topoServer: vc.topoServer, - observer: vc.observer, + metrics: vc.metrics, + + resolver: vc.resolver, + vschema: vc.vschema, + vm: vc.vm, + topoServer: vc.topoServer, + observer: vc.observer, } } +// GetExecutionMetrics provides the execution metrics object. +func (vc *VCursorImpl) GetExecutionMetrics() *engine.Metrics { + return vc.metrics.GetExecutionMetrics() +} + // HasSystemVariables returns whether the session has set system variables or not func (vc *VCursorImpl) HasSystemVariables() bool { return vc.SafeSession.HasSystemVariables() diff --git a/go/vt/vtgate/executorcontext/vcursor_impl_test.go b/go/vt/vtgate/executorcontext/vcursor_impl_test.go index 57069afdb45..ed7654a58d4 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl_test.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl_test.go @@ -174,7 +174,7 @@ func TestDestinationKeyspace(t *testing.T) { &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, fakeObserver{}, VCursorConfig{ DefaultTabletType: topodatapb.TabletType_PRIMARY, - }) + }, nil) impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -232,7 +232,7 @@ func TestSetTarget(t *testing.T) { for i, tc := range tests { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { cfg := VCursorConfig{DefaultTabletType: topodatapb.TabletType_PRIMARY} - vc, _ := NewVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, fakeObserver{}, cfg) + vc, _ := NewVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, fakeObserver{}, cfg, nil) vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" { @@ -257,7 +257,7 @@ func TestFirstSortedKeyspace(t *testing.T) { }, } - vc, err := NewVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, fakeObserver{}, VCursorConfig{}) + vc, err := NewVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, fakeObserver{}, VCursorConfig{}, nil) require.NoError(t, err) ks, err := vc.FirstSortedKeyspace() require.NoError(t, err) @@ -271,7 +271,7 @@ func TestSetExecQueryTimeout(t *testing.T) { vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, nil, nil, &vindexes.VSchema{}, nil, nil, fakeObserver{}, VCursorConfig{ // flag timeout QueryTimeout: 20, - }) + }, nil) require.NoError(t, err) vc.SetExecQueryTimeout(nil) @@ -312,7 +312,7 @@ func TestSetExecQueryTimeout(t *testing.T) { func TestRecordMirrorStats(t *testing.T) { safeSession := NewSafeSession(nil) logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil, streamlog.NewQueryLogConfigForTest()) - vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, logStats, nil, &vindexes.VSchema{}, nil, nil, fakeObserver{}, VCursorConfig{}) + vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, logStats, nil, &vindexes.VSchema{}, nil, nil, fakeObserver{}, VCursorConfig{}, nil) require.NoError(t, err) require.Zero(t, logStats.MirrorSourceExecuteTime)