diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 1a15a330841..00f4cc8bf7e 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -68,7 +68,6 @@ var sequenceFields = []*querypb.Field{
// Execute performs a non-streaming query execution.
func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
- qre.logStats.TransactionID = qre.connID
planName := qre.plan.PlanID.String()
qre.logStats.PlanType = planName
defer func(start time.Time) {
diff --git a/go/vt/vttablet/tabletserver/querylogz.go b/go/vt/vttablet/tabletserver/querylogz.go
index a3736c68a3e..92d65840374 100644
--- a/go/vt/vttablet/tabletserver/querylogz.go
+++ b/go/vt/vttablet/tabletserver/querylogz.go
@@ -52,6 +52,7 @@ var (
RowsAffected |
Response Size |
Transaction ID |
+ Reserved ID |
Error |
@@ -80,6 +81,7 @@ var (
{{.RowsAffected}} |
{{.SizeOfResponse}} |
{{.TransactionID}} |
+ {{.ReservedID}} |
{{.ErrorStr}} |
`))
diff --git a/go/vt/vttablet/tabletserver/querylogz_test.go b/go/vt/vttablet/tabletserver/querylogz_test.go
index 249f1962f95..709ad2f8c0e 100644
--- a/go/vt/vttablet/tabletserver/querylogz_test.go
+++ b/go/vt/vttablet/tabletserver/querylogz_test.go
@@ -55,6 +55,7 @@ func TestQuerylogzHandler(t *testing.T) {
logStats.MysqlResponseTime = 1 * time.Millisecond
logStats.WaitingForConnection = 10 * time.Nanosecond
logStats.TransactionID = 131
+ logStats.ReservedID = 313
logStats.Ctx = callerid.NewContext(
context.Background(),
callerid.NewEffectiveCallerID("effective-caller", "component", "subcomponent"),
@@ -80,6 +81,7 @@ func TestQuerylogzHandler(t *testing.T) {
`1000 | `,
`0 | `,
`131 | `,
+ `313 | `,
` | `,
}
logStats.EndTime = logStats.StartTime.Add(1 * time.Millisecond)
@@ -110,6 +112,7 @@ func TestQuerylogzHandler(t *testing.T) {
`1000 | `,
`0 | `,
`131 | `,
+ `313 | `,
` | `,
}
logStats.EndTime = logStats.StartTime.Add(20 * time.Millisecond)
@@ -140,6 +143,7 @@ func TestQuerylogzHandler(t *testing.T) {
`1000 | `,
`0 | `,
`131 | `,
+ `313 | `,
` | `,
}
logStats.EndTime = logStats.StartTime.Add(500 * time.Millisecond)
diff --git a/go/vt/vttablet/tabletserver/stateful_connection.go b/go/vt/vttablet/tabletserver/stateful_connection.go
index e5b4809764c..df4f7f286c6 100644
--- a/go/vt/vttablet/tabletserver/stateful_connection.go
+++ b/go/vt/vttablet/tabletserver/stateful_connection.go
@@ -18,6 +18,13 @@ package tabletserver
import (
"fmt"
+ "time"
+
+ "vitess.io/vitess/go/vt/log"
+
+ "vitess.io/vitess/go/vt/callerid"
+ querypb "vitess.io/vitess/go/vt/proto/query"
+ "vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@@ -41,10 +48,19 @@ type StatefulConnection struct {
ConnID tx.ConnID
env tabletenv.Env
txProps *tx.Properties
+ reservedProps *Properties
tainted bool
enforceTimeout bool
}
+//Properties contains meta information about the connection
+type Properties struct {
+ EffectiveCaller *vtrpcpb.CallerID
+ ImmediateCaller *querypb.VTGateCallerID
+ StartTime time.Time
+ Stats *servenv.TimingsWrapper
+}
+
// Close closes the underlying connection. When the connection is Unblocked, it will be Released
func (sc *StatefulConnection) Close() {
if sc.dbConn != nil {
@@ -124,6 +140,7 @@ func (sc *StatefulConnection) Releasef(reasonFormat string, a ...interface{}) {
sc.pool.unregister(sc.ConnID, fmt.Sprintf(reasonFormat, a...))
sc.dbConn.Recycle()
sc.dbConn = nil
+ sc.logReservedConn()
}
//Renew the existing connection with new connection id.
@@ -171,15 +188,71 @@ func (sc *StatefulConnection) Stats() *tabletenv.Stats {
}
//Taint taints the existing connection.
-func (sc *StatefulConnection) Taint() {
+func (sc *StatefulConnection) Taint(ctx context.Context, stats *servenv.TimingsWrapper) error {
+ if sc.dbConn == nil {
+ return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "connection is closed")
+ }
+ if sc.tainted {
+ return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "connection is already reserved")
+ }
+ immediateCaller := callerid.ImmediateCallerIDFromContext(ctx)
+ effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)
+
sc.tainted = true
- // if we don't have an active dbConn, we can silently ignore this request
- if sc.dbConn != nil {
- sc.dbConn.Taint()
+ sc.reservedProps = &Properties{
+ EffectiveCaller: effectiveCaller,
+ ImmediateCaller: immediateCaller,
+ StartTime: time.Now(),
+ Stats: stats,
}
+ sc.dbConn.Taint()
+ sc.Stats().UserActiveReservedCount.Add(sc.getUsername(), 1)
+ return nil
}
//IsTainted tells us whether this connection is tainted
func (sc *StatefulConnection) IsTainted() bool {
return sc.tainted
}
+
+//LogTransaction logs transaction related stats
+func (sc *StatefulConnection) LogTransaction(reason tx.ReleaseReason) {
+ if sc.txProps == nil {
+ return //Nothing to log as no transaction exists on this connection.
+ }
+ sc.txProps.Conclusion = reason.Name()
+ sc.txProps.EndTime = time.Now()
+
+ username := callerid.GetPrincipal(sc.txProps.EffectiveCaller)
+ if username == "" {
+ username = callerid.GetUsername(sc.txProps.ImmediateCaller)
+ }
+ duration := sc.txProps.EndTime.Sub(sc.txProps.StartTime)
+ sc.Stats().UserTransactionCount.Add([]string{username, reason.Name()}, 1)
+ sc.Stats().UserTransactionTimesNs.Add([]string{username, reason.Name()}, int64(duration))
+ sc.txProps.Stats.Add(reason.Name(), duration)
+ if sc.txProps.LogToFile {
+ log.Infof("Logged transaction: %s", sc.String())
+ }
+ tabletenv.TxLogger.Send(sc)
+}
+
+//logReservedConn logs reserved connection related stats.
+func (sc *StatefulConnection) logReservedConn() {
+ if sc.reservedProps == nil {
+ return //Nothing to log as this connection is not reserved.
+ }
+ duration := time.Since(sc.reservedProps.StartTime)
+ username := sc.getUsername()
+ sc.Stats().UserActiveReservedCount.Add(username, -1)
+ sc.Stats().UserReservedCount.Add(username, 1)
+ sc.Stats().UserReservedTimesNs.Add(username, int64(duration))
+}
+
+func (sc *StatefulConnection) getUsername() string {
+ username := callerid.GetPrincipal(sc.reservedProps.EffectiveCaller)
+ if username != "" {
+ return username
+ }
+ return callerid.GetUsername(sc.reservedProps.ImmediateCaller)
+}
diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats.go b/go/vt/vttablet/tabletserver/tabletenv/logstats.go
index 8849b833651..85b65265750 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/logstats.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/logstats.go
@@ -59,6 +59,7 @@ type LogStats struct {
QuerySources byte
Rows [][]sqltypes.Value
TransactionID int64
+ ReservedID int64
Error error
}
diff --git a/go/vt/vttablet/tabletserver/tabletenv/stats.go b/go/vt/vttablet/tabletserver/tabletenv/stats.go
index a483e746a99..111bfbe7c91 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/stats.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/stats.go
@@ -43,6 +43,10 @@ type Stats struct {
TableaclAllowed *stats.CountersWithMultiLabels // Number of allows
TableaclDenied *stats.CountersWithMultiLabels // Number of denials
TableaclPseudoDenied *stats.CountersWithMultiLabels // Number of pseudo denials
+
+ UserActiveReservedCount *stats.CountersWithSingleLabel // Per CallerID active reserved connection counts
+ UserReservedCount *stats.CountersWithSingleLabel // Per CallerID reserved connection counts
+ UserReservedTimesNs *stats.CountersWithSingleLabel // Per CallerID reserved connection duration
}
// NewStats instantiates a new set of stats scoped by exporter.
@@ -51,7 +55,7 @@ func NewStats(exporter *servenv.Exporter) *Stats {
MySQLTimings: exporter.NewTimings("Mysql", "MySQl query time", "operation"),
QueryTimings: exporter.NewTimings("Queries", "MySQL query timings", "plan_type"),
WaitTimings: exporter.NewTimings("Waits", "Wait operations", "type"),
- KillCounters: exporter.NewCountersWithSingleLabel("Kills", "Number of connections being killed", "query_type", "Transactions", "Queries"),
+ KillCounters: exporter.NewCountersWithSingleLabel("Kills", "Number of connections being killed", "query_type", "Transactions", "Queries", "ReservedConnection"),
ErrorCounters: exporter.NewCountersWithSingleLabel(
"Errors",
"Critical errors",
@@ -85,6 +89,10 @@ func NewStats(exporter *servenv.Exporter) *Stats {
TableaclAllowed: exporter.NewCountersWithMultiLabels("TableACLAllowed", "ACL acceptances", []string{"TableName", "TableGroup", "PlanID", "Username"}),
TableaclDenied: exporter.NewCountersWithMultiLabels("TableACLDenied", "ACL denials", []string{"TableName", "TableGroup", "PlanID", "Username"}),
TableaclPseudoDenied: exporter.NewCountersWithMultiLabels("TableACLPseudoDenied", "ACL pseudodenials", []string{"TableName", "TableGroup", "PlanID", "Username"}),
+
+ UserActiveReservedCount: exporter.NewCountersWithSingleLabel("UserActiveReservedCount", "active reserved connection for each CallerID", "CallerID"),
+ UserReservedCount: exporter.NewCountersWithSingleLabel("UserReservedCount", "reserved connection received for each CallerID", "CallerID"),
+ UserReservedTimesNs: exporter.NewCountersWithSingleLabel("UserReservedTimesNs", "Total reserved connection latency for each CallerID", "CallerID"),
}
stats.QPSRates = exporter.NewRates("QPS", stats.QueryTimings, 15*60/5, 5*time.Second)
return stats
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index 4df776c186c..6ebd220acf8 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -763,6 +763,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, rese
var beginSQL string
transactionID, beginSQL, err = tsv.te.Begin(ctx, reservedID, options)
logStats.TransactionID = transactionID
+ logStats.ReservedID = reservedID
// Record the actual statements that were executed in the logStats.
// If nothing was actually executed, don't count the operation in
@@ -792,6 +793,10 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra
var commitSQL string
newReservedID, commitSQL, err = tsv.te.Commit(ctx, transactionID)
+ if newReservedID > 0 {
+ // commit executed on old reserved id.
+ logStats.ReservedID = transactionID
+ }
// If nothing was actually executed, don't count the operation in
// the tablet metrics, and clear out the logStats Method so that
@@ -817,6 +822,10 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t
defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now())
logStats.TransactionID = transactionID
newReservedID, err = tsv.te.Rollback(ctx, transactionID)
+ if newReservedID > 0 {
+ // rollback executed on old reserved id.
+ logStats.ReservedID = transactionID
+ }
return err
},
)
@@ -993,7 +1002,8 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
if transactionID != 0 {
connID = transactionID
}
-
+ logStats.ReservedID = reservedID
+ logStats.TransactionID = transactionID
qre := &QueryExecutor{
query: query,
marginComments: comments,
@@ -1389,12 +1399,13 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
"ReserveBegin", "begin", bindVariables,
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
- defer tsv.stats.QueryTimings.Record("ReserveBegin", time.Now())
+ defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
connID, err = tsv.te.ReserveBegin(ctx, options, preQueries)
if err != nil {
return err
}
logStats.TransactionID = connID
+ logStats.ReservedID = connID
return nil
},
)
@@ -1417,12 +1428,13 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
"Reserve", "", bindVariables,
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
- defer tsv.stats.QueryTimings.Record("Reserve", time.Now())
+ defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
connID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries)
if err != nil {
return err
}
- logStats.TransactionID = connID
+ logStats.TransactionID = transactionID
+ logStats.ReservedID = connID
return nil
},
)
@@ -1446,13 +1458,13 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RELEASE", time.Now())
+ logStats.TransactionID = transactionID
+ logStats.ReservedID = reservedID
if reservedID != 0 {
//Release to close the underlying connection.
- logStats.TransactionID = reservedID
- return tsv.te.Release(ctx, reservedID)
+ return tsv.te.Release(reservedID)
}
// Rollback to cleanup the transaction before returning to the pool.
- logStats.TransactionID = transactionID
_, err := tsv.te.Rollback(ctx, transactionID)
return err
},
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go
index 4e506ef1f21..f260135c636 100644
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go
@@ -30,6 +30,8 @@ import (
"testing"
"time"
+ "vitess.io/vitess/go/vt/callerid"
+
"vitess.io/vitess/go/test/utils"
"github.com/stretchr/testify/assert"
@@ -2524,7 +2526,7 @@ func TestReserveBeginExecute(t *testing.T) {
_, transactionID, reservedID, _, err := tsv.ReserveBeginExecute(ctx, &target, "select 42", []string{"select 43"}, nil, &querypb.ExecuteOptions{})
require.NoError(t, err)
- defer tsv.Release(ctx, &target, transactionID, reservedID)
+
assert.Greater(t, transactionID, int64(0), "transactionID")
assert.Equal(t, reservedID, transactionID, "reservedID should equal transactionID")
expected := []string{
@@ -2534,6 +2536,8 @@ func TestReserveBeginExecute(t *testing.T) {
"select 42 from dual limit 10001",
}
assert.Contains(t, db.QueryLog(), strings.Join(expected, ";"), "expected queries to run")
+ err = tsv.Release(ctx, &target, transactionID, reservedID)
+ require.NoError(t, err)
}
func TestReserveExecute_WithoutTx(t *testing.T) {
@@ -2549,7 +2553,6 @@ func TestReserveExecute_WithoutTx(t *testing.T) {
_, reservedID, _, err := tsv.ReserveExecute(ctx, &target, "select 42", []string{"select 43"}, nil, 0, &querypb.ExecuteOptions{})
require.NoError(t, err)
- defer tsv.Release(ctx, &target, 0, reservedID)
assert.NotEqual(t, int64(0), reservedID, "reservedID should not be zero")
expected := []string{
"select 43",
@@ -2557,6 +2560,8 @@ func TestReserveExecute_WithoutTx(t *testing.T) {
"select 42 from dual limit 10001",
}
assert.Contains(t, db.QueryLog(), strings.Join(expected, ";"), "expected queries to run")
+ err = tsv.Release(ctx, &target, 0, reservedID)
+ require.NoError(t, err)
}
func TestReserveExecute_WithTx(t *testing.T) {
@@ -2585,6 +2590,8 @@ func TestReserveExecute_WithTx(t *testing.T) {
"select 42 from dual limit 10001",
}
assert.Contains(t, db.QueryLog(), strings.Join(expected, ";"), "expected queries to run")
+ err = tsv.Release(ctx, &target, transactionID, reservedID)
+ require.NoError(t, err)
}
func TestRelease(t *testing.T) {
@@ -2660,6 +2667,64 @@ func TestRelease(t *testing.T) {
}
}
+func TestReserveStats(t *testing.T) {
+ db := setUpTabletServerTest(t)
+ defer db.Close()
+ config := tabletenv.NewDefaultConfig()
+ tsv := NewTabletServer("TabletServerTest", config, memorytopo.NewServer(""), topodatapb.TabletAlias{})
+ dbcfgs := newDBConfigs(db)
+ target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
+ err := tsv.StartService(target, dbcfgs)
+ require.NoError(t, err)
+ defer tsv.StopService()
+
+ callerID := &querypb.VTGateCallerID{
+ Username: "test",
+ }
+ ctx := callerid.NewContext(context.Background(), nil, callerID)
+
+ // Starts reserved connection and transaction
+ _, rbeTxID, rbeRID, _, err := tsv.ReserveBeginExecute(ctx, &target, "select 42", nil, nil, &querypb.ExecuteOptions{})
+ require.NoError(t, err)
+ assert.EqualValues(t, 1, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+
+ // Starts reserved connection
+ _, reRID, _, err := tsv.ReserveExecute(ctx, &target, "select 42", nil, nil, 0, &querypb.ExecuteOptions{})
+ require.NoError(t, err)
+ assert.EqualValues(t, 2, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+
+ // Use previous reserved connection to start transaction
+ _, reBeTxID, _, err := tsv.BeginExecute(ctx, &target, "select 42", nil, reRID, &querypb.ExecuteOptions{})
+ require.NoError(t, err)
+ assert.EqualValues(t, 2, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+
+ // Starts transaction.
+ _, beTxID, _, err := tsv.BeginExecute(ctx, &target, "select 42", nil, 0, &querypb.ExecuteOptions{})
+ require.NoError(t, err)
+ assert.EqualValues(t, 2, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+
+ // Reserved the connection on previous transaction
+ _, beReRID, _, err := tsv.ReserveExecute(ctx, &target, "select 42", nil, nil, beTxID, &querypb.ExecuteOptions{})
+ require.NoError(t, err)
+ assert.EqualValues(t, 3, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+
+ err = tsv.Release(ctx, &target, rbeTxID, rbeRID)
+ require.NoError(t, err)
+ assert.EqualValues(t, 2, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+ assert.EqualValues(t, 1, tsv.te.txPool.env.Stats().UserReservedCount.Counts()["test"])
+
+ err = tsv.Release(ctx, &target, reBeTxID, reRID)
+ require.NoError(t, err)
+ assert.EqualValues(t, 1, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+ assert.EqualValues(t, 2, tsv.te.txPool.env.Stats().UserReservedCount.Counts()["test"])
+
+ err = tsv.Release(ctx, &target, beTxID, beReRID)
+ require.NoError(t, err)
+ assert.Zero(t, tsv.te.txPool.env.Stats().UserActiveReservedCount.Counts()["test"])
+ assert.EqualValues(t, 3, tsv.te.txPool.env.Stats().UserReservedCount.Counts()["test"])
+ assert.NotEmpty(t, tsv.te.txPool.env.Stats().UserReservedTimesNs.Counts()["test"])
+}
+
func setUpTabletServerTest(t *testing.T) *fakesqldb.DB {
db := fakesqldb.New(t)
for query, result := range getSupportedQueries() {
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go
index 4396793fb67..379656a1048 100644
--- a/go/vt/vttablet/tabletserver/tx_engine.go
+++ b/go/vt/vttablet/tabletserver/tx_engine.go
@@ -21,6 +21,8 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/vt/servenv"
+
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"golang.org/x/net/context"
@@ -92,6 +94,9 @@ type TxEngine struct {
abandonAge time.Duration
ticks *timer.Timer
+ // reservedConnStats keeps statistics about reserved connections
+ reservedConnStats *servenv.TimingsWrapper
+
txPool *TxPool
preparedPool *TxPreparedPool
twoPC *TwoPC
@@ -103,6 +108,7 @@ func NewTxEngine(env tabletenv.Env) *TxEngine {
te := &TxEngine{
env: env,
shutdownGracePeriod: time.Duration(config.ShutdownGracePeriodSeconds * 1e9),
+ reservedConnStats: env.Exporter().NewTimings("ReservedConnections", "Reserved connections stats", "operation"),
}
limiter := txlimiter.New(env)
te.txPool = NewTxPool(env, limiter)
@@ -629,7 +635,10 @@ func (te *TxEngine) reserve(ctx context.Context, options *querypb.ExecuteOptions
}
func (te *TxEngine) taintConn(ctx context.Context, conn *StatefulConnection, preQueries []string) error {
- conn.Taint()
+ err := conn.Taint(ctx, te.reservedConnStats)
+ if err != nil {
+ return err
+ }
for _, query := range preQueries {
_, err := conn.Exec(ctx, query, 0 /*maxrows*/, false /*wantFields*/)
if err != nil {
@@ -641,11 +650,13 @@ func (te *TxEngine) taintConn(ctx context.Context, conn *StatefulConnection, pre
}
//Release closes the underlying connection.
-func (te *TxEngine) Release(ctx context.Context, connID int64) error {
+func (te *TxEngine) Release(connID int64) error {
conn, err := te.txPool.GetAndLock(connID, "for release")
if err != nil {
return err
}
+
conn.Release(tx.ConnRelease)
+
return nil
}
diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go
index fb1e1c79f9f..6998e3fe457 100644
--- a/go/vt/vttablet/tabletserver/tx_engine_test.go
+++ b/go/vt/vttablet/tabletserver/tx_engine_test.go
@@ -62,9 +62,15 @@ func TestTxEngineClose(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "begin", beginSQL)
c.Unlock()
+ c, beginSQL, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0)
+ require.NoError(t, err)
+ require.Equal(t, "begin", beginSQL)
+ c.Unlock()
start = time.Now()
te.shutdown(false)
assert.Less(t, int64(50*time.Millisecond), int64(time.Since(start)))
+ assert.EqualValues(t, 2, te.txPool.env.Stats().KillCounters.Counts()["Transactions"])
+ te.txPool.env.Stats().KillCounters.ResetAll()
// Immediate close.
te.open()
@@ -121,6 +127,21 @@ func TestTxEngineClose(t *testing.T) {
if diff := time.Since(start); diff < 100*time.Millisecond {
t.Errorf("Close time: %v, must be over 0.1", diff)
}
+
+ // Normal close with Reserved connection timeout wait.
+ te.shutdownGracePeriod = 0 * time.Millisecond
+ te.open()
+ te.AcceptReadWrite()
+ _, err = te.Reserve(ctx, &querypb.ExecuteOptions{}, 0, nil)
+ require.NoError(t, err)
+ _, err = te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil)
+ require.NoError(t, err)
+ start = time.Now()
+ te.shutdown(false)
+ assert.Less(t, int64(50*time.Millisecond), int64(time.Since(start)))
+ assert.EqualValues(t, 1, te.txPool.env.Stats().KillCounters.Counts()["Transactions"])
+ assert.EqualValues(t, 2, te.txPool.env.Stats().KillCounters.Counts()["ReservedConnection"])
+
}
func TestTxEngineBegin(t *testing.T) {
diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go
index 840604d3c26..b5a4686ac09 100644
--- a/go/vt/vttablet/tabletserver/tx_pool.go
+++ b/go/vt/vttablet/tabletserver/tx_pool.go
@@ -125,7 +125,12 @@ func (tp *TxPool) transactionKiller() {
defer tp.env.LogError()
for _, conn := range tp.scp.GetOutdated(tp.Timeout(), "for tx killer rollback") {
log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.String())
- tp.env.Stats().KillCounters.Add("Transactions", 1)
+ if conn.IsTainted() {
+ tp.env.Stats().KillCounters.Add("ReservedConnection", 1)
+ }
+ if conn.IsInTransaction() {
+ tp.env.Stats().KillCounters.Add("Transactions", 1)
+ }
conn.Close()
conn.Releasef("exceeded timeout: %v", tp.Timeout())
}
@@ -321,28 +326,7 @@ func (tp *TxPool) SetTimeout(timeout time.Duration) {
}
func (tp *TxPool) txComplete(conn *StatefulConnection, reason tx.ReleaseReason) {
- tp.log(conn, reason)
+ conn.LogTransaction(reason)
tp.limiter.Release(conn.TxProperties().ImmediateCaller, conn.TxProperties().EffectiveCaller)
conn.CleanTxState()
}
-
-func (tp *TxPool) log(txc *StatefulConnection, reason tx.ReleaseReason) {
- if txc.TxProperties() == nil {
- return //Nothing to log as no transaction exists on this connection.
- }
- txc.TxProperties().Conclusion = reason.Name()
- txc.TxProperties().EndTime = time.Now()
-
- username := callerid.GetPrincipal(txc.TxProperties().EffectiveCaller)
- if username == "" {
- username = callerid.GetUsername(txc.TxProperties().ImmediateCaller)
- }
- duration := txc.TxProperties().EndTime.Sub(txc.TxProperties().StartTime)
- txc.Stats().UserTransactionCount.Add([]string{username, reason.Name()}, 1)
- txc.Stats().UserTransactionTimesNs.Add([]string{username, reason.Name()}, int64(duration))
- txc.TxProperties().Stats.Add(reason.Name(), duration)
- if txc.TxProperties().LogToFile {
- log.Infof("Logged transaction: %s", txc.String())
- }
- tabletenv.TxLogger.Send(txc)
-}