diff --git a/changelog/23.0/23.0.0/summary.md b/changelog/23.0/23.0.0/summary.md
index 5a50e326d78..3be034ba1fa 100644
--- a/changelog/23.0/23.0.0/summary.md
+++ b/changelog/23.0/23.0.0/summary.md
@@ -1,12 +1,15 @@
## Summary
### Table of Contents
+
- **[Minor Changes](#minor-changes)**
- **[Deletions](#deletions)**
- [Metrics](#deleted-metrics)
- - **[VTTablet](#minor-changes-vttablet)**
- - [CLI Flags](#flags-vttablet)
- - [Managed MySQL configuration defaults to caching-sha2-password](#mysql-caching-sha2-password)
+ - **[New Metrics](#new-metrics)**
+ - [VTGate](#new-vtgate-metrics)
+ - **[VTTablet](#minor-changes-vttablet)**
+ - [CLI Flags](#flags-vttablet)
+ - [Managed MySQL configuration defaults to caching-sha2-password](#mysql-caching-sha2-password)
## Minor Changes
@@ -21,6 +24,14 @@
| `vtgate` | `QueriesProcessedByTable` | `v22.0.0` | [#17727](https://github.com/vitessio/vitess/pull/17727) |
| `vtgate` | `QueriesRoutedByTable` | `v22.0.0` | [#17727](https://github.com/vitessio/vitess/pull/17727) |
+### New Metrics
+
+#### VTGate
+
+| Name | Dimensions | Description | PR |
+|:-----------------------:|:---------------:|:-----------------------------------------------------------------------------------:|:-------------------------------------------------------:|
+| `TransactionsProcessed` | `Shard`, `Type` | Counts transactions processed at VTGate by shard distribution and transaction type. | [#18171](https://github.com/vitessio/vitess/pull/18171) |
+
### VTTablet
#### CLI Flags
diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go
index 27e4040a35e..5803252b6aa 100644
--- a/go/vt/vtgate/executor.go
+++ b/go/vt/vtgate/executor.go
@@ -75,6 +75,7 @@ var (
queryExecutions = stats.NewCountersWithMultiLabels("QueryExecutions", "Counts queries executed at VTGate by query type, plan type, and tablet type.", []string{"Query", "Plan", "Tablet"})
queryRoutes = stats.NewCountersWithMultiLabels("QueryRoutes", "Counts queries routed from VTGate to VTTablet by query type, plan type, and tablet type.", []string{"Query", "Plan", "Tablet"})
queryExecutionsByTable = stats.NewCountersWithMultiLabels("QueryExecutionsByTable", "Counts queries executed at VTGate per table by query type and table.", []string{"Query", "Table"})
+ txProcessed = stats.NewCountersWithMultiLabels("TransactionsProcessed", "Counts transactions processed at VTGate by shard distribution (single or cross), transaction type (read write or read only)", []string{"Shard", "Type"})
// commitMode records the timing of the commit phase of a transaction.
// It also tracks between different transaction mode i.e. Single, Multi and TwoPC
diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go
index 4da63c055ad..a60a0a70cff 100644
--- a/go/vt/vtgate/tx_conn.go
+++ b/go/vt/vtgate/tx_conn.go
@@ -62,6 +62,29 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt
sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY,
}
+const (
+ SingleShardTransaction = "Single"
+ CrossShardTransaction = "Cross"
+)
+
+type txType int
+
+const (
+ TXReadOnly txType = iota
+ TXReadWrite
+)
+
+func (tt txType) String() string {
+ switch tt {
+ case TXReadOnly:
+ return "ReadOnly"
+ case TXReadWrite:
+ return "ReadWrite"
+ default:
+ return "Unknown"
+ }
+}
+
type commitPhase int
const (
@@ -126,10 +149,12 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er
return err
}
+ shardDistribution := getShardDistribution(session.ShardSessions)
+ var txnType txType
if twopc {
- err = txc.commit2PC(ctx, session)
+ txnType, err = txc.commit2PC(ctx, session)
} else {
- err = txc.commitNormal(ctx, session)
+ txnType, err = txc.commitNormal(ctx, session)
}
if err != nil {
@@ -146,9 +171,17 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er
_ = txc.Release(ctx, session)
}
}
+ txProcessed.Add([]string{shardDistribution, txnType.String()}, 1)
return nil
}
+func getShardDistribution(sessions []*vtgatepb.Session_ShardSession) string {
+ if len(sessions) > 1 {
+ return CrossShardTransaction
+ }
+ return SingleShardTransaction
+}
+
func recordCommitTime(session *econtext.SafeSession, twopc bool, startTime time.Time) {
switch {
case len(session.ShardSessions) == 0:
@@ -193,9 +226,13 @@ func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSes
return nil
}
-func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) error {
+func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) (txType, error) {
+ txnType := TXReadOnly
// Retain backward compatibility on commit order for the normal session.
for i, shardSession := range session.ShardSessions {
+ if txnType == TXReadOnly && shardSession.RowsAffected {
+ txnType = TXReadWrite
+ }
if err := txc.commitShard(ctx, shardSession, session.GetLogger()); err != nil {
if i > 0 {
nShards := i
@@ -217,14 +254,14 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi
})
warnings.Add("NonAtomicCommit", 1)
}
- return err
+ return txnType, err
}
}
- return nil
+ return txnType, nil
}
// commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice
-func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) (err error) {
+func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) (txnType txType, err error) {
// If the number of participants is one or less, then it's a normal commit.
if len(session.ShardSessions) <= 1 {
return txc.commitNormal(ctx, session)
@@ -233,8 +270,14 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
mmShard := session.ShardSessions[0]
rmShards := session.ShardSessions[1:]
dtid := dtids.New(mmShard)
+ if mmShard.RowsAffected {
+ txnType = TXReadWrite
+ }
participants := make([]*querypb.Target, len(rmShards))
for i, s := range rmShards {
+ if s.RowsAffected {
+ txnType = TXReadWrite
+ }
participants[i] = s.Target
}
@@ -249,12 +292,12 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
txPhase = Commit2pcCreateTransaction
if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil {
- return err
+ return txnType, err
}
if DebugTwoPc { // Test code to simulate a failure after RM prepare
if terr := checkTestFailure(ctx, "TRCreated_FailNow", nil); terr != nil {
- return terr
+ return txnType, terr
}
}
@@ -268,24 +311,24 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid)
}
if err = txc.runSessions(ctx, rmShards, session.GetLogger(), prepareAction); err != nil {
- return err
+ return txnType, err
}
if DebugTwoPc { // Test code to simulate a failure after RM prepare
if terr := checkTestFailure(ctx, "RMPrepared_FailNow", nil); terr != nil {
- return terr
+ return txnType, terr
}
}
txPhase = Commit2pcStartCommit
startCommitState, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
if err != nil {
- return err
+ return txnType, err
}
if DebugTwoPc { // Test code to simulate a failure after MM commit
if terr := checkTestFailure(ctx, "MMCommitted_FailNow", nil); terr != nil {
- return terr
+ return txnType, terr
}
}
@@ -299,7 +342,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid)
}
if err = txc.runSessions(ctx, rmShards, session.GetLogger(), prepareCommitAction); err != nil {
- return err
+ return txnType, err
}
// At this point, application can continue forward.
@@ -307,7 +350,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
// This step is to clean up the transaction metadata.
txPhase = Commit2pcConclude
_ = txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid)
- return nil
+ return txnType, nil
}
func (txc *TxConn) errActionAndLogWarn(
diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go
index 6d31aa4e543..ec215b2e521 100644
--- a/go/vt/vtgate/tx_conn_test.go
+++ b/go/vt/vtgate/tx_conn_test.go
@@ -1554,6 +1554,69 @@ func TestTxConnAccessModeReset(t *testing.T) {
}
}
+// TestTxConnMetrics tests the `TransactionProcessed` metrics.
+func TestTxConnMetrics(t *testing.T) {
+ ctx := utils.LeakCheckContext(t)
+
+ sc, _, _, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn")
+ session := &vtgatepb.Session{}
+
+ tcases := []struct {
+ name string
+ queries []*querypb.BoundQuery
+ rss []*srvtopo.ResolvedShard
+ expMetric string
+ expVal int
+ }{{
+ name: "oneReadQuery",
+ queries: []*querypb.BoundQuery{{Sql: "select 1"}},
+ rss: rss0,
+ expMetric: "Single.ReadOnly",
+ expVal: 1,
+ }, {
+ name: "twoReadQuery",
+ queries: []*querypb.BoundQuery{{Sql: "select 2"}, {Sql: "select 3"}},
+ rss: append(rss0, rss1...),
+ expMetric: "Cross.ReadOnly",
+ expVal: 1,
+ }, {
+ name: "oneWriteQuery",
+ queries: []*querypb.BoundQuery{{Sql: "update t set col = 1"}},
+ rss: rss0,
+ expMetric: "Single.ReadWrite",
+ expVal: 1,
+ }, {
+ name: "twoWriteQuery",
+ queries: []*querypb.BoundQuery{{Sql: "update t set col = 2"}, {Sql: "update t set col = 3"}},
+ rss: append(rss0, rss1...),
+ expMetric: "Cross.ReadWrite",
+ expVal: 1,
+ }, {
+ name: "oneReadOneWriteQuery",
+ queries: []*querypb.BoundQuery{{Sql: "select 4"}, {Sql: "update t set col = 4"}},
+ rss: append(rss0, rss1...),
+ expMetric: "Cross.ReadWrite",
+ expVal: 2,
+ }}
+
+ txProcessed.ResetAll()
+ for _, tc := range tcases {
+ t.Run(tc.name, func(t *testing.T) {
+ // begin
+ safeSession := econtext.NewAutocommitSession(session)
+ err := sc.txConn.Begin(ctx, safeSession, nil)
+ require.NoError(t, err)
+ _, errors := sc.ExecuteMultiShard(ctx, nil, tc.rss, tc.queries, safeSession, false, false, nullResultsObserver{}, false)
+ require.Empty(t, errors)
+ require.NoError(t,
+ sc.txConn.Commit(ctx, safeSession))
+ txCountMap := txProcessed.Counts()
+ fmt.Printf("%v", txCountMap)
+ assert.EqualValues(t, tc.expVal, txCountMap[tc.expMetric])
+ })
+ }
+}
+
func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) {
t.Helper()
createSandbox(name)