From 3a5a62d0293b7851ea27407ca7cf4d9cb9f63f8d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 27 Aug 2025 12:44:53 +0530 Subject: [PATCH] fix: emit QueryExecutionsByTable for success cases only Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/misc_test.go | 32 +++++++ .../endtoend/vtgate/unsharded/main_test.go | 67 +++++++++++++++ go/vt/vtgate/executor_stats_test.go | 85 +++++++++++++++++++ go/vt/vtgate/plan_execute.go | 8 +- 4 files changed, 189 insertions(+), 3 deletions(-) create mode 100644 go/vt/vtgate/executor_stats_test.go diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index df71069e50c..ae737b8db06 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -1008,6 +1008,38 @@ func TestQueryProcessedMetric(t *testing.T) { } } +// TestQueryProcessedMetric verifies that query metrics are correctly published. +func TestMetricForExplain(t *testing.T) { + conn, closer := start(t) + defer closer() + + initialQP := getQPMetric(t, "QueryExecutions") + initialQT := getQPMetric(t, "QueryExecutionsByTable") + t.Run("explain t1", func(t *testing.T) { + utils.Exec(t, conn, "explain t1") + updatedQP := getQPMetric(t, "QueryExecutions") + updatedQT := getQPMetric(t, "QueryExecutionsByTable") + assert.EqualValuesf(t, 1, getValue(updatedQP, "EXPLAIN.Passthrough.PRIMARY")-getValue(initialQP, "EXPLAIN.Passthrough.PRIMARY"), "queryExecutions metric: %s", "explain") + assert.EqualValuesf(t, 1, getValue(updatedQT, "EXPLAIN.ks_t1")-getValue(initialQT, "EXPLAIN.ks_t1"), "queryExecutionsByTable metric: %s", "asdasd") + }) + + t.Run("explain `select id1, id2 from t1`", func(t *testing.T) { + utils.ExecAllowError(t, conn, "explain `select id1, id2 from t1`") + updatedQP := getQPMetric(t, "QueryExecutions") + updatedQT := getQPMetric(t, "QueryExecutionsByTable") + assert.EqualValuesf(t, 1, getValue(updatedQP, "EXPLAIN.Passthrough.PRIMARY")-getValue(initialQP, "EXPLAIN.Passthrough.PRIMARY"), "queryExecutions metric: %s", "explain") + assert.EqualValuesf(t, 1, getValue(updatedQT, "EXPLAIN.ks_t1")-getValue(initialQT, "EXPLAIN.ks_t1"), "queryExecutionsByTable metric: %s", "asdasd") + }) + + t.Run("explain select id1, id2 from t1", func(t *testing.T) { + utils.Exec(t, conn, "explain select id1, id2 from t1") + updatedQP := getQPMetric(t, "QueryExecutions") + updatedQT := getQPMetric(t, "QueryExecutionsByTable") + assert.EqualValuesf(t, 2, getValue(updatedQP, "EXPLAIN.Passthrough.PRIMARY")-getValue(initialQP, "EXPLAIN.Passthrough.PRIMARY"), "queryExecutions metric: %s", "explain") + assert.EqualValuesf(t, 2, getValue(updatedQT, "EXPLAIN.ks_t1")-getValue(initialQT, "EXPLAIN.ks_t1"), "queryExecutionsByTable metric: %s", "asdasd") + }) +} + func getQPMetric(t *testing.T, metric string) map[string]any { t.Helper() diff --git a/go/test/endtoend/vtgate/unsharded/main_test.go b/go/test/endtoend/vtgate/unsharded/main_test.go index feebb3e4065..8d072907873 100644 --- a/go/test/endtoend/vtgate/unsharded/main_test.go +++ b/go/test/endtoend/vtgate/unsharded/main_test.go @@ -459,3 +459,70 @@ func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result } return res } + +// TestMetricForExplain verifies that query metrics are correctly published for explain queries. +func TestMetricForExplain(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + initialQP := getQPMetric(t, "QueryExecutions") + initialQT := getQPMetric(t, "QueryExecutionsByTable") + + t.Run("explain t1", func(t *testing.T) { + utils.Exec(t, conn, "explain t1") + updatedQP := getQPMetric(t, "QueryExecutions") + updatedQT := getQPMetric(t, "QueryExecutionsByTable") + assert.EqualValues(t, 1, getValue(updatedQP, "EXPLAIN.Passthrough.PRIMARY")-getValue(initialQP, "EXPLAIN.Passthrough.PRIMARY")) + assert.EqualValues(t, 1, getValue(updatedQT, "EXPLAIN.customer_t1")-getValue(initialQT, "EXPLAIN.customer_t1")) + }) + + t.Run("explain `select c1, c2 from t1`", func(t *testing.T) { + utils.ExecAllowError(t, conn, "explain `select c1, c2 from t1`") + updatedQP := getQPMetric(t, "QueryExecutions") + updatedQT := getQPMetric(t, "QueryExecutionsByTable") + assert.EqualValues(t, 2, getValue(updatedQP, "EXPLAIN.Passthrough.PRIMARY")-getValue(initialQP, "EXPLAIN.Passthrough.PRIMARY")) + assert.EqualValues(t, 1, getValue(updatedQT, "EXPLAIN.customer_t1")-getValue(initialQT, "EXPLAIN.customer_t1")) + }) + + t.Run("explain select c1, c2 from t1", func(t *testing.T) { + utils.Exec(t, conn, "explain select c1, c2 from t1") + updatedQP := getQPMetric(t, "QueryExecutions") + updatedQT := getQPMetric(t, "QueryExecutionsByTable") + assert.EqualValues(t, 3, getValue(updatedQP, "EXPLAIN.Passthrough.PRIMARY")-getValue(initialQP, "EXPLAIN.Passthrough.PRIMARY")) + assert.EqualValues(t, 2, getValue(updatedQT, "EXPLAIN.customer_t1")-getValue(initialQT, "EXPLAIN.customer_t1")) + }) +} + +func getQPMetric(t *testing.T, metric string) map[string]any { + t.Helper() + + vars := clusterInstance.VtgateProcess.GetVars() + require.NotNil(t, vars) + + qpVars, exists := vars[metric] + if !exists { + return nil + } + + qpMap, ok := qpVars.(map[string]any) + require.True(t, ok, "query queryMetric vars is not a map") + + return qpMap +} + +func getValue(m map[string]any, key string) float64 { + if m == nil { + return 0 + } + val, exists := m[key] + if !exists { + return 0 + } + f, ok := val.(float64) + if !ok { + return 0 + } + return f +} diff --git a/go/vt/vtgate/executor_stats_test.go b/go/vt/vtgate/executor_stats_test.go new file mode 100644 index 00000000000..060e44832cb --- /dev/null +++ b/go/vt/vtgate/executor_stats_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/sqltypes" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" +) + +// TestQueryExecutionsByTable_OnError verifies that queryExecutionsByTable counters +// are incremented on successful execution but NOT incremented on execution failure. +func TestQueryExecutionsByTable_OnError(t *testing.T) { + executor, sbc1, _, _, ctx := createExecutorEnv(t) + + // Set up successful result first + sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1")}) + + // Get initial counter values + initialCounts := getCurrentQueryExecutionsByTableCounts() + + // Execute a query successfully first + session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestSharded}) + result, err := executorExecSession(ctx, executor, session, "select id from user where id = 1", nil) + + // Verify successful execution + assert.NoError(t, err, "Expected query execution to succeed") + assert.NotNil(t, result, "Expected valid result") + + // Get counter values after successful execution + successCounts := getCurrentQueryExecutionsByTableCounts() + + // Check the specific counter that should be incremented for SELECT on user table + selectUserKey := "SELECT.TestExecutor_user" + initialUserCount := initialCounts[selectUserKey] + successUserCount := successCounts[selectUserKey] + + // Verify counter was incremented on successful execution + assert.Equal(t, initialUserCount+1, successUserCount, + "queryExecutionsByTable counter should be incremented on successful execution") + + // Now set up the sandbox connection to return an error on next execution + sbc1.MustFailCodes[vtrpcpb.Code_INTERNAL] = 1 + + // Execute the same query again, but this time it should fail + _, err = executorExecSession(ctx, executor, session, "select id from user where id = 1", nil) + + // Verify that the execution failed + assert.Error(t, err, "Expected query execution to fail") + + // Get counter values after failed execution + finalCounts := getCurrentQueryExecutionsByTableCounts() + + // Verify that queryExecutionsByTable counter was NOT incremented on execution error + // The counter should remain the same as after the successful execution + finalUserCount := finalCounts[selectUserKey] + assert.Equal(t, successUserCount, finalUserCount, + "queryExecutionsByTable counter should not be incremented on execution error") +} + +// getCurrentQueryExecutionsByTableCounts returns the current values of all queryExecutionsByTable counters +func getCurrentQueryExecutionsByTableCounts() map[string]int64 { + // queryExecutionsByTable is a global variable, so we can use its Counts() method + // to get all counter values. The keys are already formatted as "query.table" + return queryExecutionsByTable.Counts() +} diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 82bca2344a6..a3ab0f0c291 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -409,7 +409,6 @@ func (e *Executor) rollbackPartialExec(ctx context.Context, safeSession *econtex func (e *Executor) setLogStats(logStats *logstats.LogStats, plan *engine.Plan, vcursor *econtext.VCursorImpl, execStart time.Time, err error, qr *sqltypes.Result) { logStats.StmtType = plan.QueryType.String() logStats.ActiveKeyspace = vcursor.GetKeyspace() - logStats.TablesUsed = plan.TablesUsed logStats.TabletType = vcursor.TabletType().String() errCount := e.logExecutionEnd(logStats, execStart, plan, vcursor, err, qr) plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, logStats.RowsAffected, logStats.RowsReturned, errCount) @@ -418,8 +417,6 @@ func (e *Executor) setLogStats(logStats *logstats.LogStats, plan *engine.Plan, v func (e *Executor) logExecutionEnd(logStats *logstats.LogStats, execStart time.Time, plan *engine.Plan, vcursor *econtext.VCursorImpl, err error, qr *sqltypes.Result) uint64 { logStats.ExecuteTime = time.Since(execStart) - e.updateQueryStats(plan.QueryType.String(), plan.Type.String(), vcursor.TabletType().String(), int64(logStats.ShardQueries), plan.TablesUsed) - var errCount uint64 if err != nil { logStats.Error = err @@ -427,7 +424,12 @@ func (e *Executor) logExecutionEnd(logStats *logstats.LogStats, execStart time.T } else { logStats.RowsAffected = qr.RowsAffected logStats.RowsReturned = uint64(len(qr.Rows)) + // log the tables used in the plan for successful query execution. + logStats.TablesUsed = plan.TablesUsed } + + e.updateQueryStats(plan.QueryType.String(), plan.Type.String(), vcursor.TabletType().String(), int64(logStats.ShardQueries), logStats.TablesUsed) + return errCount }