diff --git a/doc/releasenotes/15_0_0_summary.md b/doc/releasenotes/15_0_0_summary.md index 75d6a2d5b9e..7054f0eb774 100644 --- a/doc/releasenotes/15_0_0_summary.md +++ b/doc/releasenotes/15_0_0_summary.md @@ -148,6 +148,24 @@ The reason you cannot change all the values together is because the restore proc should be used to process the previous backup. Please make sure you have thought out all possible scenarios for restore before transitioning from one compression engine to another. +#### Independent OLAP and OLTP transactional timeouts + +`--queryserver-config-olap-transaction-timeout` specifies the timeout applied +to a transaction created within an OLAP workload. The default value is `30` +seconds, but this can be raised, lowered, or set to zero to disable the timeout +altogether. + +Until now, while OLAP queries would bypass the query timeout, transactions +created within an OLAP session would be rolled back +`--queryserver-config-transaction-timeout` seconds after the transaction was +started. + +As of now, OLTP and OLAP transaction timeouts can be configured independently of each +other. + +The main use case is to run queries spanning a long period of time which +require transactional guarantees such as consistency or atomicity. + ### Online DDL changes #### Concurrent vitess migrations diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 1b8047b98ac..263542a0c11 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -135,6 +135,7 @@ Usage of vtctld: --queryserver-config-idle-timeout float query server idle timeout (in seconds), vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance. (default 1800) --queryserver-config-max-result-size int query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries. (default 10000) --queryserver-config-message-postpone-cap int query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4) + --queryserver-config-olap-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30) --queryserver-config-passthrough-dmls query server pass through all dml statements without rewriting --queryserver-config-pool-prefill-parallelism int (DEPRECATED) query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism. --queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16) @@ -156,7 +157,7 @@ Usage of vtctld: --queryserver-config-terse-errors prevent bind vars from escaping in client error messages --queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20) --queryserver-config-transaction-prefill-parallelism int (DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism. - --queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30) + --queryserver-config-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30) --queryserver-config-txpool-timeout float query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1) --queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000) --queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this diff --git a/go/flags/endtoend/vtexplain.txt b/go/flags/endtoend/vtexplain.txt index 8ebb333a3e6..3843c653448 100644 --- a/go/flags/endtoend/vtexplain.txt +++ b/go/flags/endtoend/vtexplain.txt @@ -132,6 +132,7 @@ Usage of vtexplain: --queryserver-config-idle-timeout float query server idle timeout (in seconds), vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance. (default 1800) --queryserver-config-max-result-size int query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries. (default 10000) --queryserver-config-message-postpone-cap int query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4) + --queryserver-config-olap-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30) --queryserver-config-passthrough-dmls query server pass through all dml statements without rewriting --queryserver-config-pool-prefill-parallelism int (DEPRECATED) query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism. --queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16) @@ -153,7 +154,7 @@ Usage of vtexplain: --queryserver-config-terse-errors prevent bind vars from escaping in client error messages --queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20) --queryserver-config-transaction-prefill-parallelism int (DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism. - --queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30) + --queryserver-config-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30) --queryserver-config-txpool-timeout float query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1) --queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000) --queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 98adc544e6f..f8accd737b4 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -314,6 +314,7 @@ Usage of vttablet: --queryserver-config-idle-timeout float query server idle timeout (in seconds), vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance. (default 1800) --queryserver-config-max-result-size int query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries. (default 10000) --queryserver-config-message-postpone-cap int query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4) + --queryserver-config-olap-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30) --queryserver-config-passthrough-dmls query server pass through all dml statements without rewriting --queryserver-config-pool-prefill-parallelism int (DEPRECATED) query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism. --queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16) @@ -335,7 +336,7 @@ Usage of vttablet: --queryserver-config-terse-errors prevent bind vars from escaping in client error messages --queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20) --queryserver-config-transaction-prefill-parallelism int (DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism. - --queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30) + --queryserver-config-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30) --queryserver-config-txpool-timeout float query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1) --queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000) --queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this diff --git a/go/pools/numbered.go b/go/pools/numbered.go index 233fe5149ba..6e1699a5dd8 100644 --- a/go/pools/numbered.go +++ b/go/pools/numbered.go @@ -34,12 +34,9 @@ type Numbered struct { } type numberedWrapper struct { - val any - inUse bool - purpose string - timeCreated time.Time - timeUsed time.Time - enforceTimeout bool + val any + inUse bool + purpose string } type unregistered struct { @@ -62,14 +59,10 @@ func NewNumbered() *Numbered { // Register starts tracking a resource by the supplied id. // It does not lock the object. // It returns an error if the id already exists. -func (nu *Numbered) Register(id int64, val any, enforceTimeout bool) error { +func (nu *Numbered) Register(id int64, val any) error { // Optimistically assume we're not double registering. - now := time.Now() resource := &numberedWrapper{ - val: val, - timeCreated: now, - timeUsed: now, - enforceTimeout: enforceTimeout, + val: val, } nu.mu.Lock() @@ -129,16 +122,15 @@ func (nu *Numbered) Get(id int64, purpose string) (val any, err error) { } // Put unlocks a resource for someone else to use. -func (nu *Numbered) Put(id int64, updateTime bool) { +func (nu *Numbered) Put(id int64) bool { nu.mu.Lock() defer nu.mu.Unlock() if nw, ok := nu.resources[id]; ok { nw.inUse = false nw.purpose = "" - if updateTime { - nw.timeUsed = time.Now() - } + return true } + return false } // GetAll returns the list of all resources in the pool. @@ -157,50 +149,11 @@ func (nu *Numbered) GetAll() (vals []any) { func (nu *Numbered) GetByFilter(purpose string, match func(val any) bool) (vals []any) { nu.mu.Lock() defer nu.mu.Unlock() - for _, nw := range nu.resources { - if nw.inUse || !nw.enforceTimeout { - continue - } - if match(nw.val) { - nw.inUse = true - nw.purpose = purpose - vals = append(vals, nw.val) - } - } - return vals -} - -// GetOutdated returns a list of resources that are older than age, and locks them. -// It does not return any resources that are already locked. -func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []any) { - nu.mu.Lock() - defer nu.mu.Unlock() - now := time.Now() - for _, nw := range nu.resources { - if nw.inUse || !nw.enforceTimeout { - continue - } - if nw.timeUsed.Add(age).Sub(now) <= 0 { - nw.inUse = true - nw.purpose = purpose - vals = append(vals, nw.val) - } - } - return vals -} - -// GetIdle returns a list of resurces that have been idle for longer -// than timeout, and locks them. It does not return any resources that -// are already locked. -func (nu *Numbered) GetIdle(timeout time.Duration, purpose string) (vals []any) { - nu.mu.Lock() - defer nu.mu.Unlock() - now := time.Now() for _, nw := range nu.resources { if nw.inUse { continue } - if nw.timeUsed.Add(timeout).Sub(now) <= 0 { + if match(nw.val) { nw.inUse = true nw.purpose = purpose vals = append(vals, nw.val) diff --git a/go/pools/numbered_test.go b/go/pools/numbered_test.go index e809fe30f64..826af8253b8 100644 --- a/go/pools/numbered_test.go +++ b/go/pools/numbered_test.go @@ -20,7 +20,6 @@ import ( "math/rand" "strings" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,10 +29,10 @@ func TestNumberedGeneral(t *testing.T) { id := int64(0) p := NewNumbered() - err := p.Register(id, id, true) + err := p.Register(id, id) require.NoError(t, err) - err = p.Register(id, id, true) + err = p.Register(id, id) assert.Contains(t, "already present", err.Error()) var v any @@ -44,7 +43,7 @@ func TestNumberedGeneral(t *testing.T) { _, err = p.Get(id, "test1") assert.Contains(t, "in use: test", err.Error()) - p.Put(id, true) + p.Put(id) _, err = p.Get(1, "test2") assert.Contains(t, "not found", err.Error()) p.Unregister(1, "test") // Should not fail @@ -55,61 +54,17 @@ func TestNumberedGeneral(t *testing.T) { t.Errorf("want prefix 'ended at' and suffix '(test)', got '%v'", err) } - id = 0 - p.Register(id, id, true) - id = 1 - p.Register(id, id, true) - id = 2 - p.Register(id, id, false) - time.Sleep(300 * time.Millisecond) - id = 3 - p.Register(id, id, true) - time.Sleep(100 * time.Millisecond) - - // p has 0, 1, 2, 3 (0, 1, 2 are aged, but 2 is not enforced) - vals := p.GetOutdated(200*time.Millisecond, "by outdated") - if num := len(vals); num != 2 { - t.Errorf("want 2, got %v", num) + if p.Size() != 0 { + t.Errorf("want 0, got %v", p.Size()) } - if _, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by outdated" { - t.Errorf("want 'in use: by outdated', got '%v'", err) - } - for _, v := range vals { - p.Put(v.(int64), true) - } - p.Put(2, true) // put to 2 to ensure it's not idle - time.Sleep(100 * time.Millisecond) - - // p has 0, 1, 2 (2 is idle) - vals = p.GetIdle(200*time.Millisecond, "by idle") - if len(vals) != 1 { - t.Errorf("want 1, got %v", len(vals)) - } - if _, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by idle" { - t.Errorf("want 'in use: by idle', got '%v'", err) - } - if vals[0].(int64) != 3 { - t.Errorf("want 3, got %v", vals[0]) - } - p.Unregister(vals[0].(int64), "test") - - // p has 0, 1, and 2 - if p.Size() != 3 { - t.Errorf("want 3, got %v", p.Size()) - } - go func() { - p.Unregister(0, "test") - p.Unregister(1, "test") - p.Unregister(2, "test") - }() p.WaitForEmpty() } func TestNumberedGetByFilter(t *testing.T) { p := NewNumbered() - p.Register(1, 1, true) - p.Register(2, 2, true) - p.Register(3, 3, true) + p.Register(1, 1) + p.Register(2, 2) + p.Register(3, 3) p.Get(1, "locked") vals := p.GetByFilter("filtered", func(v any) bool { @@ -133,7 +88,7 @@ func BenchmarkRegisterUnregister(b *testing.B) { id := int64(1) val := "foobarbazdummyval" for i := 0; i < b.N; i++ { - p.Register(id, val, false) + p.Register(id, val) p.Unregister(id, "some reason") } } @@ -145,7 +100,7 @@ func BenchmarkRegisterUnregisterParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { id := rand.Int63() - p.Register(id, val, false) + p.Register(id, val) p.Unregister(id, "some reason") } }) diff --git a/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go b/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go index c499f5f9a5e..7383c0b7818 100644 --- a/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go +++ b/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go @@ -227,7 +227,8 @@ func (bt *BufferingTest) createCluster() (*cluster.LocalProcessCluster, int) { SchemaSQL: sqlSchema, VSchema: bt.VSchema, } - clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s", + clusterInstance.VtTabletExtraArgs = []string{ + "--health_check_interval", "1s", "--queryserver-config-transaction-timeout", "20", } if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go index bbc5536a3fa..492a68662fc 100644 --- a/go/test/endtoend/vtgate/godriver/main_test.go +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -104,7 +104,9 @@ func TestMain(m *testing.M) { SchemaSQL: SchemaSQL, VSchema: VSchema, } - clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "3"} + clusterInstance.VtTabletExtraArgs = []string{ + "--queryserver-config-transaction-timeout", "3", + } if err := clusterInstance.StartKeyspace(*Keyspace, []string{"-80", "80-"}, 1, false); err != nil { log.Fatal(err.Error()) return 1 diff --git a/go/test/endtoend/vtgate/readafterwrite/raw_test.go b/go/test/endtoend/vtgate/readafterwrite/raw_test.go index 9d9eefff7cb..56f9b3a44cb 100644 --- a/go/test/endtoend/vtgate/readafterwrite/raw_test.go +++ b/go/test/endtoend/vtgate/readafterwrite/raw_test.go @@ -118,7 +118,9 @@ func TestMain(m *testing.M) { SchemaSQL: sqlSchema, VSchema: vSchema, } - clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"} + clusterInstance.VtTabletExtraArgs = []string{ + "--queryserver-config-transaction-timeout", "5", + } if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { return 1 } diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 78e317b5576..d134e051ccc 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -472,8 +472,11 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) { func TestShortTxTimeout(t *testing.T) { client := framework.NewClient() - defer framework.Server.SetTxTimeout(framework.Server.TxTimeout()) - framework.Server.SetTxTimeout(10 * time.Millisecond) + defer framework.Server.Config().SetTxTimeoutForWorkload( + framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP), + querypb.ExecuteOptions_OLTP, + ) + framework.Server.Config().SetTxTimeoutForWorkload(10*time.Millisecond, querypb.ExecuteOptions_OLTP) err := client.Begin(false) require.NoError(t, err) diff --git a/go/vt/vttablet/tabletserver/stateful_connection.go b/go/vt/vttablet/tabletserver/stateful_connection.go index 5edaebc9100..f3cd40ee6cd 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection.go +++ b/go/vt/vttablet/tabletserver/stateful_connection.go @@ -50,6 +50,8 @@ type StatefulConnection struct { reservedProps *Properties tainted bool enforceTimeout bool + timeout time.Duration + expiryTime time.Time } // Properties contains meta information about the connection @@ -77,6 +79,16 @@ func (sc *StatefulConnection) IsInTransaction() bool { return sc.txProps != nil } +func (sc *StatefulConnection) ElapsedTimeout() bool { + if !sc.enforceTimeout { + return false + } + if sc.timeout <= 0 { + return false + } + return sc.expiryTime.Before(time.Now()) +} + // Exec executes the statement in the dedicated connection func (sc *StatefulConnection) Exec(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { if sc.IsClosed() { @@ -273,6 +285,11 @@ func (sc *StatefulConnection) LogTransaction(reason tx.ReleaseReason) { tabletenv.TxLogger.Send(sc) } +func (sc *StatefulConnection) SetTimeout(timeout time.Duration) { + sc.timeout = timeout + sc.resetExpiryTime() +} + // logReservedConn logs reserved connection related stats. func (sc *StatefulConnection) logReservedConn() { if sc.reservedProps == nil { @@ -299,3 +316,7 @@ func (sc *StatefulConnection) ApplySettings(ctx context.Context, settings []stri } return sc.dbConn.ApplySettings(ctx, settings) } + +func (sc *StatefulConnection) resetExpiryTime() { + sc.expiryTime = time.Now().Add(sc.timeout) +} diff --git a/go/vt/vttablet/tabletserver/stateful_connection_pool.go b/go/vt/vttablet/tabletserver/stateful_connection_pool.go index 63ceb7a53e6..35b092daa2e 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection_pool.go +++ b/go/vt/vttablet/tabletserver/stateful_connection_pool.go @@ -21,6 +21,7 @@ import ( "vitess.io/vitess/go/pools" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "context" @@ -28,7 +29,6 @@ import ( "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) @@ -87,7 +87,7 @@ func (sf *StatefulConnectionPool) Open(appParams, dbaParams, appDebugParams dbco // Close closes the TxPool. A closed pool can be reopened. func (sf *StatefulConnectionPool) Close() { - for _, v := range sf.active.GetOutdated(time.Duration(0), "for closing") { + for _, v := range sf.active.GetByFilter("for closing", func(_ any) bool { return true }) { conn := v.(*StatefulConnection) thing := "connection" if conn.IsInTransaction() { @@ -135,16 +135,19 @@ func (sf *StatefulConnectionPool) AdjustLastID(id int64) { } } -// GetOutdated returns a list of connections that are older than age. -// It does not return any connections that are in use. +// GetElapsedTimeout returns sessions older than the timeout stored on the +// connection. Does not return any connections that are in use. // TODO(sougou): deprecate. -func (sf *StatefulConnectionPool) GetOutdated(age time.Duration, purpose string) []*StatefulConnection { - return mapToTxConn(sf.active.GetOutdated(age, purpose)) +func (sf *StatefulConnectionPool) GetElapsedTimeout(purpose string) []*StatefulConnection { + return mapToTxConn(sf.active.GetByFilter(purpose, func(val any) bool { + sc := val.(*StatefulConnection) + return sc.ElapsedTimeout() + })) } -func mapToTxConn(outdated []any) []*StatefulConnection { - result := make([]*StatefulConnection, len(outdated)) - for i, el := range outdated { +func mapToTxConn(vals []any) []*StatefulConnection { + result := make([]*StatefulConnection, len(vals)) + for i, el := range vals { result[i] = el.(*StatefulConnection) } return result @@ -169,7 +172,6 @@ func (sf *StatefulConnectionPool) GetAndLock(id int64, reason string) (*Stateful // NewConn creates a new StatefulConnection. It will be created from either the normal pool or // the found_rows pool, depending on the options provided func (sf *StatefulConnectionPool) NewConn(ctx context.Context, options *querypb.ExecuteOptions, settings []string) (*StatefulConnection, error) { - var conn *connpool.DBConn var err error @@ -190,12 +192,10 @@ func (sf *StatefulConnectionPool) NewConn(ctx context.Context, options *querypb. env: sf.env, enforceTimeout: options.GetWorkload() != querypb.ExecuteOptions_DBA, } + // This will set both the timeout and initialize the expiryTime. + sfConn.SetTimeout(sf.env.Config().TxTimeoutForWorkload(options.GetWorkload())) - err = sf.active.Register( - sfConn.ConnID, - sfConn, - sfConn.enforceTimeout, - ) + err = sf.active.Register(sfConn.ConnID, sfConn) if err != nil { sfConn.Release(tx.ConnInitFail) return nil, err @@ -234,7 +234,10 @@ func (sf *StatefulConnectionPool) markAsNotInUse(sc *StatefulConnection, updateT sc.Releasef("kill all") return } - sf.active.Put(sc.ConnID, updateTime) + if updateTime { + sc.resetExpiryTime() + } + sf.active.Put(sc.ConnID) } // Capacity returns the pool capacity. @@ -246,5 +249,6 @@ func (sf *StatefulConnectionPool) Capacity() int { func (sf *StatefulConnectionPool) renewConn(sc *StatefulConnection) error { sf.active.Unregister(sc.ConnID, "renew existing connection") sc.ConnID = sf.lastID.Add(1) - return sf.active.Register(sc.ConnID, sc, sc.enforceTimeout) + sc.resetExpiryTime() + return sf.active.Register(sc.ConnID, sc) } diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 4951bdcc7f2..79aa7d75706 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/throttler" ) @@ -81,7 +82,8 @@ func init() { flag.IntVar(¤tConfig.TxPool.Size, "queryserver-config-transaction-cap", defaultConfig.TxPool.Size, "query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout)") flag.IntVar(¤tConfig.TxPool.PrefillParallelism, "queryserver-config-transaction-prefill-parallelism", defaultConfig.TxPool.PrefillParallelism, "(DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism.") flag.IntVar(¤tConfig.MessagePostponeParallelism, "queryserver-config-message-postpone-cap", defaultConfig.MessagePostponeParallelism, "query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem.") - SecondsVar(¤tConfig.Oltp.TxTimeoutSeconds, "queryserver-config-transaction-timeout", defaultConfig.Oltp.TxTimeoutSeconds, "query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value") + SecondsVar(¤tConfig.Olap.TxTimeoutSeconds, "queryserver-config-olap-transaction-timeout", defaultConfig.Olap.TxTimeoutSeconds, "query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed") + SecondsVar(¤tConfig.Oltp.TxTimeoutSeconds, "queryserver-config-transaction-timeout", defaultConfig.Oltp.TxTimeoutSeconds, "query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed") SecondsVar(¤tConfig.GracePeriods.ShutdownSeconds, "shutdown_grace_period", defaultConfig.GracePeriods.ShutdownSeconds, "how long to wait (in seconds) for queries and transactions to complete during graceful shutdown.") flag.IntVar(¤tConfig.Oltp.MaxRows, "queryserver-config-max-result-size", defaultConfig.Oltp.MaxRows, "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.") flag.IntVar(¤tConfig.Oltp.WarnRows, "queryserver-config-warn-result-size", defaultConfig.Oltp.WarnRows, "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this") @@ -233,6 +235,7 @@ type TabletConfig struct { OlapReadPool ConnPoolConfig `json:"olapReadPool,omitempty"` TxPool ConnPoolConfig `json:"txPool,omitempty"` + Olap OlapConfig `json:"olap,omitempty"` Oltp OltpConfig `json:"oltp,omitempty"` HotRowProtection HotRowProtectionConfig `json:"hotRowProtection,omitempty"` @@ -294,6 +297,11 @@ type ConnPoolConfig struct { MaxWaiters int `json:"maxWaiters,omitempty"` } +// OlapConfig contains the config for olap settings. +type OlapConfig struct { + TxTimeoutSeconds Seconds `json:"txTimeoutSeconds,omitempty"` +} + // OltpConfig contains the config for oltp settings. type OltpConfig struct { QueryTimeoutSeconds Seconds `json:"queryTimeoutSeconds,omitempty"` @@ -371,6 +379,31 @@ func (c *TabletConfig) Clone() *TabletConfig { return &tc } +// SetTxTimeoutForWorkload updates workload transaction timeouts. Used in tests only. +func (c *TabletConfig) SetTxTimeoutForWorkload(val time.Duration, workload querypb.ExecuteOptions_Workload) { + switch workload { + case querypb.ExecuteOptions_OLAP: + c.Olap.TxTimeoutSeconds.Set(val) + case querypb.ExecuteOptions_OLTP: + c.Oltp.TxTimeoutSeconds.Set(val) + default: + panic(fmt.Sprintf("unsupported workload type: %v", workload)) + } +} + +// TxTimeoutForWorkload returns the transaction timeout for the given workload +// type. Defaults to returning OLTP timeout. +func (c *TabletConfig) TxTimeoutForWorkload(workload querypb.ExecuteOptions_Workload) time.Duration { + switch workload { + case querypb.ExecuteOptions_DBA: + return 0 + case querypb.ExecuteOptions_OLAP: + return c.Olap.TxTimeoutSeconds.Get() + default: + return c.Oltp.TxTimeoutSeconds.Get() + } +} + // Verify checks for contradicting flags. func (c *TabletConfig) Verify() error { if err := c.verifyTransactionLimitConfig(); err != nil { @@ -439,6 +472,9 @@ var defaultConfig = TabletConfig{ IdleTimeoutSeconds: 30 * 60, MaxWaiters: 5000, }, + Olap: OlapConfig{ + TxTimeoutSeconds: 30, + }, Oltp: OltpConfig{ QueryTimeoutSeconds: 30, TxTimeoutSeconds: 30, diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 6606c5fede0..232e955f447 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -74,6 +74,7 @@ func TestConfigParse(t *testing.T) { gracePeriods: {} healthcheck: {} hotRowProtection: {} +olap: {} olapReadPool: {} oltp: {} oltpReadPool: @@ -131,6 +132,8 @@ hotRowProtection: maxQueueSize: 20 mode: disable messagePostponeParallelism: 4 +olap: + txTimeoutSeconds: 30 olapReadPool: idleTimeoutSeconds: 1800 size: 200 @@ -202,6 +205,9 @@ func TestFlags(t *testing.T) { TimeoutSeconds: 1, MaxWaiters: 5000, }, + Olap: OlapConfig{ + TxTimeoutSeconds: 30, + }, Oltp: OltpConfig{ QueryTimeoutSeconds: 30, TxTimeoutSeconds: 30, diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 332b0181c84..c99743cf38f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -97,7 +97,6 @@ type TabletServer struct { config *tabletenv.TabletConfig stats *tabletenv.Stats QueryTimeout sync2.AtomicDuration - txTimeout sync2.AtomicDuration TerseErrors bool enableHotRowProtection bool topoServer *topo.Server @@ -153,7 +152,6 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to stats: tabletenv.NewStats(exporter), config: config, QueryTimeout: sync2.NewAtomicDuration(config.Oltp.QueryTimeoutSeconds.Get()), - txTimeout: sync2.NewAtomicDuration(config.Oltp.TxTimeoutSeconds.Get()), TerseErrors: config.TerseErrors, enableHotRowProtection: config.HotRowProtection.Mode != tabletenv.Disable, topoServer: topoServer, @@ -724,9 +722,12 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq timeout := tsv.QueryTimeout.Get() if transactionID != 0 { allowOnShutdown = true + // Execute calls happen for OLTP only, so we can directly fetch the + // OLTP TX timeout. + txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP) // Use the smaller of the two values (0 means infinity). // TODO(sougou): Assign deadlines to each transaction and set query timeout accordingly. - timeout = smallerTimeout(timeout, tsv.txTimeout.Get()) + timeout = smallerTimeout(timeout, txTimeout) } err = tsv.execRequest( ctx, timeout, @@ -820,8 +821,9 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ var timeout time.Duration if transactionID != 0 { allowOnShutdown = true - // Use the transaction timeout. - timeout = tsv.txTimeout.Get() + // Use the transaction timeout. StreamExecute calls happen for OLAP only, + // so we can directly fetch the OLAP TX timeout. + timeout = tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP) } return tsv.execRequest( @@ -1223,8 +1225,11 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar timeout := tsv.QueryTimeout.Get() if transactionID != 0 { allowOnShutdown = true + // ReserveExecute is for OLTP only, so we can directly fetch the OLTP + // TX timeout. + txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP) // Use the smaller of the two values (0 means infinity). - timeout = smallerTimeout(timeout, tsv.txTimeout.Get()) + timeout = smallerTimeout(timeout, txTimeout) } err = tsv.execRequest( @@ -1272,8 +1277,9 @@ func (tsv *TabletServer) ReserveStreamExecute( var timeout time.Duration if transactionID != 0 { allowOnShutdown = true - // Use the transaction timeout. - timeout = tsv.txTimeout.Get() + // Use the transaction timeout. ReserveStreamExecute is used for OLAP + // only, so we can directly fetch the OLAP TX timeout. + timeout = tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP) } err = tsv.execRequest( @@ -1895,17 +1901,6 @@ func (tsv *TabletServer) TxPoolSize() int { return tsv.te.txPool.scp.Capacity() } -// SetTxTimeout changes the transaction timeout to the specified value. -func (tsv *TabletServer) SetTxTimeout(val time.Duration) { - tsv.te.txPool.SetTimeout(val) - tsv.txTimeout.Set(val) -} - -// TxTimeout returns the transaction timeout. -func (tsv *TabletServer) TxTimeout() time.Duration { - return tsv.txTimeout.Get() -} - // SetQueryPlanCacheCap changes the plan cache capacity to the specified value. func (tsv *TabletServer) SetQueryPlanCacheCap(val int) { tsv.qe.SetQueryPlanCacheCap(val) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 876129db136..36e983d2ecf 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1728,11 +1728,11 @@ func TestConfigChanges(t *testing.T) { t.Errorf("tsv.te.txPool.pool.Capacity: %d, want %d", val, newSize) } - tsv.SetTxTimeout(newDuration) - if val := tsv.TxTimeout(); val != newDuration { + tsv.Config().SetTxTimeoutForWorkload(newDuration, querypb.ExecuteOptions_OLTP) + if val := tsv.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP); val != newDuration { t.Errorf("tsv.TxTimeout: %v, want %v", val, newDuration) } - if val := tsv.te.txPool.Timeout(); val != newDuration { + if val := tsv.te.txPool.env.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP); val != newDuration { t.Errorf("tsv.te.Pool().Timeout: %v, want %v", val, newDuration) } diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index a21f5bbb9be..5c2808a41a2 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -22,7 +22,7 @@ import ( "time" "vitess.io/vitess/go/pools" - "vitess.io/vitess/go/sync2" + "vitess.io/vitess/go/timer" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" @@ -54,11 +54,10 @@ type ( // concern itself with a connections life cycle. The two exceptions are Begin, which creates a new StatefulConnection, // and RollbackAndRelease, which does a Release after doing the rollback. TxPool struct { - env tabletenv.Env - scp *StatefulConnectionPool - transactionTimeout sync2.AtomicDuration - ticks *timer.Timer - limiter txlimiter.TxLimiter + env tabletenv.Env + scp *StatefulConnectionPool + ticks *timer.Timer + limiter txlimiter.TxLimiter logMu sync.Mutex lastLog time.Time @@ -73,18 +72,21 @@ type ( // NewTxPool creates a new TxPool. It's not operational until it's Open'd. func NewTxPool(env tabletenv.Env, limiter txlimiter.TxLimiter) *TxPool { config := env.Config() - transactionTimeout := config.Oltp.TxTimeoutSeconds.Get() axp := &TxPool{ - env: env, - scp: NewStatefulConnPool(env), - transactionTimeout: sync2.NewAtomicDuration(transactionTimeout), - ticks: timer.NewTimer(transactionTimeout / 10), - limiter: limiter, - txStats: env.Exporter().NewTimings("Transactions", "Transaction stats", "operation"), + env: env, + scp: NewStatefulConnPool(env), + ticks: timer.NewTimer(txKillerTimeoutInterval(config)), + limiter: limiter, + txStats: env.Exporter().NewTimings("Transactions", "Transaction stats", "operation"), } // Careful: conns also exports name+"xxx" vars, // but we know it doesn't export Timeout. - env.Exporter().NewGaugeDurationFunc("TransactionTimeout", "Transaction timeout", axp.transactionTimeout.Get) + env.Exporter().NewGaugeDurationFunc("OlapTransactionTimeout", "OLAP transaction timeout", func() time.Duration { + return config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP) + }) + env.Exporter().NewGaugeDurationFunc("TransactionTimeout", "Transaction timeout", func() time.Duration { + return config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP) + }) return axp } @@ -92,7 +94,9 @@ func NewTxPool(env tabletenv.Env, limiter txlimiter.TxLimiter) *TxPool { // that will kill long-running transactions. func (tp *TxPool) Open(appParams, dbaParams, appDebugParams dbconfigs.Connector) { tp.scp.Open(appParams, dbaParams, appDebugParams) - tp.ticks.Start(func() { tp.transactionKiller() }) + if tp.ticks.Interval() > 0 { + tp.ticks.Start(func() { tp.transactionKiller() }) + } } // Close closes the TxPool. A closed pool can be reopened. @@ -118,8 +122,8 @@ func (tp *TxPool) Shutdown(ctx context.Context) { func (tp *TxPool) transactionKiller() { defer tp.env.LogError() - for _, conn := range tp.scp.GetOutdated(tp.Timeout(), vterrors.TxKillerRollback) { - log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.String(tp.env.Config().SanitizeLogMessages)) + for _, conn := range tp.scp.GetElapsedTimeout(vterrors.TxKillerRollback) { + log.Warningf("killing transaction (exceeded timeout: %v): %s", conn.timeout, conn.String(tp.env.Config().SanitizeLogMessages)) switch { case conn.IsTainted(): conn.Close() @@ -138,7 +142,7 @@ func (tp *TxPool) transactionKiller() { if conn.IsInTransaction() { tp.txComplete(conn, tx.TxKill) } - conn.Releasef("exceeded timeout: %v", tp.Timeout()) + conn.Releasef("exceeded timeout: %v", conn.timeout) } } @@ -230,6 +234,9 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re if err != nil { return nil, "", "", vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction %d: %v", reservedID, err) } + // Update conn timeout. + timeout := tp.env.Config().TxTimeoutForWorkload(options.GetWorkload()) + conn.SetTimeout(timeout) } else { immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx) @@ -351,19 +358,15 @@ func (tp *TxPool) LogActive() { }) } -// Timeout returns the transaction timeout. -func (tp *TxPool) Timeout() time.Duration { - return tp.transactionTimeout.Get() -} - -// SetTimeout sets the transaction timeout. -func (tp *TxPool) SetTimeout(timeout time.Duration) { - tp.transactionTimeout.Set(timeout) - tp.ticks.SetInterval(timeout / 10) -} - func (tp *TxPool) txComplete(conn *StatefulConnection, reason tx.ReleaseReason) { conn.LogTransaction(reason) tp.limiter.Release(conn.TxProperties().ImmediateCaller, conn.TxProperties().EffectiveCaller) conn.CleanTxState() } + +func txKillerTimeoutInterval(config *tabletenv.TabletConfig) time.Duration { + return smallerTimeout( + config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP), + config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP), + ) / 10 +} diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index 8ab76fe7729..f7356e649ad 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -350,8 +350,10 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { assertErrorMatch(id, "transaction committed") - txPool, _ = newTxPool() - txPool.SetTimeout(1 * time.Millisecond) + env := txPool.env + env.Config().SetTxTimeoutForWorkload(1*time.Millisecond, querypb.ExecuteOptions_OLTP) + env.Config().SetTxTimeoutForWorkload(1*time.Millisecond, querypb.ExecuteOptions_OLAP) + txPool, _ = newTxPoolWithEnv(env) txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() @@ -407,7 +409,7 @@ func TestTxTimeoutKillsTransactions(t *testing.T) { // Let it time out and get killed by the tx killer. time.Sleep(1200 * time.Millisecond) - // Verify that the tx killer rand. + // Verify that the tx killer ran. require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills) // Regression test for #6727: make sure the tx limiter is decremented when the tx killer closes @@ -427,6 +429,192 @@ func TestTxTimeoutKillsTransactions(t *testing.T) { }, limiter.Actions()) } +func TestTxTimeoutDoesNotKillShortLivedTransactions(t *testing.T) { + env := newEnv("TabletServerTest") + env.Config().TxPool.Size = 1 + env.Config().TxPool.MaxWaiters = 0 + env.Config().Oltp.TxTimeoutSeconds = 1 + _, txPool, _, closer := setupWithEnv(t, env) + defer closer() + startingKills := txPool.env.Stats().KillCounters.Counts()["Transactions"] + + im := &querypb.VTGateCallerID{ + Username: "user", + } + ef := &vtrpcpb.CallerID{ + Principal: "principle", + } + + ctxWithCallerID := callerid.NewContext(ctx, ef, im) + + // Start transaction. + conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil) + require.NoError(t, err) + conn.Unlock() + + // Sleep for less than the tx timeout + time.Sleep(800 * time.Millisecond) + + // Verify that the tx killer did not run. + require.Equal(t, int64(0), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills) +} + +func TestTxTimeoutKillsOlapTransactions(t *testing.T) { + env := newEnv("TabletServerTest") + env.Config().TxPool.Size = 1 + env.Config().TxPool.MaxWaiters = 0 + env.Config().Oltp.TxTimeoutSeconds = 1 + env.Config().Olap.TxTimeoutSeconds = 2 + _, txPool, _, closer := setupWithEnv(t, env) + defer closer() + startingKills := txPool.env.Stats().KillCounters.Counts()["Transactions"] + + im := &querypb.VTGateCallerID{ + Username: "user", + } + ef := &vtrpcpb.CallerID{ + Principal: "principle", + } + + ctxWithCallerID := callerid.NewContext(ctx, ef, im) + + // Start transaction. + conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ + Workload: querypb.ExecuteOptions_OLAP, + }, false, 0, nil, nil) + require.NoError(t, err) + conn.Unlock() + + // After the OLTP timeout elapses, the tx should not have been killed. + time.Sleep(1200 * time.Millisecond) + require.Equal(t, int64(0), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills) + + // After the OLAP timeout elapses, the tx should have been killed. + time.Sleep(1000 * time.Millisecond) + require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills) +} + +func TestTxTimeoutNotEnforcedForZeroLengthTimeouts(t *testing.T) { + env := newEnv("TabletServerTest") + env.Config().TxPool.Size = 2 + env.Config().TxPool.MaxWaiters = 0 + env.Config().Oltp.TxTimeoutSeconds = 0 + env.Config().Olap.TxTimeoutSeconds = 0 + _, txPool, _, closer := setupWithEnv(t, env) + defer closer() + startingKills := txPool.env.Stats().KillCounters.Counts()["Transactions"] + + im := &querypb.VTGateCallerID{ + Username: "user", + } + ef := &vtrpcpb.CallerID{ + Principal: "principle", + } + + ctxWithCallerID := callerid.NewContext(ctx, ef, im) + + // Start transactions. + conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil) + require.NoError(t, err) + conn1, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ + Workload: querypb.ExecuteOptions_OLAP, + }, false, 0, nil, nil) + require.NoError(t, err) + conn0.Unlock() + conn1.Unlock() + + // Not really a great test, but we don't want to make unit tests take a + // long time by using a long sleep. Probably a better approach would be to + // either monkeypatch time.Now() or pass in a mock Clock to TxPool. + time.Sleep(2000 * time.Millisecond) + + // OLTP tx is not killed. + require.Equal(t, int64(0), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills) + // OLAP tx is not killed. + require.Equal(t, int64(0), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills) +} + +func TestTxTimeoutReservedConn(t *testing.T) { + env := newEnv("TabletServerTest") + env.Config().TxPool.Size = 1 + env.Config().TxPool.MaxWaiters = 0 + env.Config().Oltp.TxTimeoutSeconds = 1 + env.Config().Olap.TxTimeoutSeconds = 2 + _, txPool, _, closer := setupWithEnv(t, env) + defer closer() + startingRcKills := txPool.env.Stats().KillCounters.Counts()["ReservedConnection"] + startingTxKills := txPool.env.Stats().KillCounters.Counts()["Transactions"] + + im := &querypb.VTGateCallerID{ + Username: "user", + } + ef := &vtrpcpb.CallerID{ + Principal: "principle", + } + + ctxWithCallerID := callerid.NewContext(ctx, ef, im) + + // Start OLAP transaction and return it to pool right away. + conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ + Workload: querypb.ExecuteOptions_OLAP, + }, false, 0, nil, nil) + require.NoError(t, err) + // Taint the connection. + conn0.Taint(ctxWithCallerID, nil) + conn0.Unlock() + + // tx should not timeout after OLTP timeout. + time.Sleep(1200 * time.Millisecond) + require.Equal(t, int64(0), txPool.env.Stats().KillCounters.Counts()["ReservedConnection"]-startingRcKills) + require.Equal(t, int64(0), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingTxKills) + + // tx should timeout after OLAP timeout. + time.Sleep(1000 * time.Millisecond) + require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["ReservedConnection"]-startingRcKills) + require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingTxKills) +} + +func TestTxTimeoutReusedReservedConn(t *testing.T) { + env := newEnv("TabletServerTest") + env.Config().TxPool.Size = 1 + env.Config().TxPool.MaxWaiters = 0 + env.Config().Oltp.TxTimeoutSeconds = 1 + env.Config().Olap.TxTimeoutSeconds = 2 + _, txPool, _, closer := setupWithEnv(t, env) + defer closer() + startingRcKills := txPool.env.Stats().KillCounters.Counts()["ReservedConnection"] + startingTxKills := txPool.env.Stats().KillCounters.Counts()["Transactions"] + + im := &querypb.VTGateCallerID{ + Username: "user", + } + ef := &vtrpcpb.CallerID{ + Principal: "principle", + } + + ctxWithCallerID := callerid.NewContext(ctx, ef, im) + + // Start OLAP transaction and return it to pool right away. + conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ + Workload: querypb.ExecuteOptions_OLAP, + }, false, 0, nil, nil) + require.NoError(t, err) + // Taint the connection. + conn0.Taint(ctxWithCallerID, nil) + conn0.Unlock() + + // Reuse underlying connection in an OLTP transaction. + conn1, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, conn0.ReservedID(), nil, nil) + require.NoError(t, err) + require.Equal(t, conn1.ReservedID(), conn0.ReservedID()) + conn1.Unlock() + + // tx should timeout after OLTP timeout. + time.Sleep(1200 * time.Millisecond) + require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["ReservedConnection"]-startingRcKills) + require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingTxKills) +} + func newTxPool() (*TxPool, *fakeLimiter) { return newTxPoolWithEnv(newEnv("TabletServerTest")) }