diff --git a/go/pools/numbered.go b/go/pools/numbered.go index f1e76488164..19f8bb1ef95 100644 --- a/go/pools/numbered.go +++ b/go/pools/numbered.go @@ -131,13 +131,15 @@ func (nu *Numbered) Get(id int64, purpose string) (val interface{}, err error) { } // Put unlocks a resource for someone else to use. -func (nu *Numbered) Put(id int64) { +func (nu *Numbered) Put(id int64, updateTime bool) { nu.mu.Lock() defer nu.mu.Unlock() if nw, ok := nu.resources[id]; ok { nw.inUse = false nw.purpose = "" - nw.timeUsed = time.Now() + if updateTime { + nw.timeUsed = time.Now() + } } } @@ -162,7 +164,7 @@ func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []inter if nw.inUse || !nw.enforceTimeout { continue } - if nw.timeCreated.Add(age).Sub(now) <= 0 { + if nw.timeUsed.Add(age).Sub(now) <= 0 { nw.inUse = true nw.purpose = purpose vals = append(vals, nw.val) diff --git a/go/pools/numbered_test.go b/go/pools/numbered_test.go index 467e220fed5..65ae5764cf9 100644 --- a/go/pools/numbered_test.go +++ b/go/pools/numbered_test.go @@ -44,7 +44,7 @@ func TestNumbered(t *testing.T) { if _, err = p.Get(id, "test1"); err.Error() != "in use: test" { t.Errorf("want 'in use: test', got '%v'", err) } - p.Put(id) + p.Put(id, true) if _, err = p.Get(1, "test2"); err.Error() != "not found" { t.Errorf("want 'not found', got '%v'", err) } @@ -75,9 +75,9 @@ func TestNumbered(t *testing.T) { t.Errorf("want 'in use: by outdated', got '%v'", err) } for _, v := range vals { - p.Put(v.(int64)) + p.Put(v.(int64), true) } - p.Put(2) // put to 2 to ensure it's not idle + p.Put(2, true) // put to 2 to ensure it's not idle time.Sleep(100 * time.Millisecond) // p has 0, 1, 2 (2 is idle) diff --git a/go/vt/vttablet/endtoend/connkilling/connkiller_test.go b/go/vt/vttablet/endtoend/connkilling/connkiller_test.go new file mode 100644 index 00000000000..d94051f3c53 --- /dev/null +++ b/go/vt/vttablet/endtoend/connkilling/connkiller_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +All tests in this package come with a three second time out for OLTP session +*/ +package connkilling + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" +) + +func TestTxKillerKillsTransactionsInReservedConnections(t *testing.T) { + client := framework.NewClient() + defer client.Release() + + _, err := client.ReserveBeginExecute("select 42", nil, nil) + require.NoError(t, err) + + assertIsKilledWithin5Seconds(t, client) +} + +func TestTxKillerDoesNotKillReservedConnectionsInUse(t *testing.T) { + client := framework.NewClient() + defer client.Release() + + _, err := client.ReserveExecute("select 42", nil, nil) + require.NoError(t, err) + + assertIsNotKilledOver5Second(t, client) +} + +func TestTxKillerCountsTimeFromTxStartedNotStatefulConnCreated(t *testing.T) { + client := framework.NewClient() + defer client.Release() + + // reserve connection at 0th second + _, err := client.ReserveExecute("select 42", nil, nil) + require.NoError(t, err) + + // elapsed 2 seconds + time.Sleep(2 * time.Second) + + // update the timer on tx start - new tx timer starts + _, err = client.BeginExecute("select 44", nil, nil) + require.NoError(t, err) + + // elapsed 1 second from tx and 3 second from reserved conn. + time.Sleep(1 * time.Second) + _, err = client.Execute("select 43", nil) + require.NoError(t, err) + + // elapsed 2 second from tx and 4 second from reserved conn. It does not fail. + time.Sleep(1 * time.Second) + _, err = client.Execute("select 43", nil) + require.NoError(t, err) + + assertIsKilledWithin5Seconds(t, client) +} + +func TestTxKillerKillsTransactionThreeSecondsAfterCreation(t *testing.T) { + client := framework.NewClient() + defer client.Release() + + _, err := client.BeginExecute("select 42", nil, nil) + require.NoError(t, err) + + assertIsKilledWithin5Seconds(t, client) +} + +func assertIsNotKilledOver5Second(t *testing.T, client *framework.QueryClient) { + for i := 0; i < 5; i++ { + _, err := client.Execute("select 43", nil) + require.NoError(t, err) + time.Sleep(1 * time.Second) + } +} + +func assertIsKilledWithin5Seconds(t *testing.T, client *framework.QueryClient) { + var err error + // when it is used once per second + for i := 0; i < 5; i++ { + _, err = client.Execute("select 43", nil) + if err != nil { + break + } + time.Sleep(1 * time.Second) + } + + // then it should still be killed. transactions are tracked per tx-creation time and not last-used time + require.Error(t, err) + require.Contains(t, err.Error(), "exceeded timeout: 3s") +} diff --git a/go/vt/vttablet/endtoend/connkilling/main_test.go b/go/vt/vttablet/endtoend/connkilling/main_test.go new file mode 100644 index 00000000000..9e4aa6c2e55 --- /dev/null +++ b/go/vt/vttablet/endtoend/connkilling/main_test.go @@ -0,0 +1,303 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connkilling + +import ( + "errors" + "flag" + "fmt" + "io/ioutil" + "os" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/tableacl" + "vitess.io/vitess/go/vt/tableacl/simpleacl" + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttest" + + vttestpb "vitess.io/vitess/go/vt/proto/vttest" +) + +var ( + connParams mysql.ConnParams + connAppDebugParams mysql.ConnParams +) + +func TestMain(m *testing.M) { + flag.Parse() // Do not remove this comment, import into google3 depends on it + tabletenv.Init() + + exitCode := func() int { + // Launch MySQL. + // We need a Keyspace in the topology, so the DbName is set. + // We need a Shard too, so the database 'vttest' is created. + cfg := vttest.Config{ + Topology: &vttestpb.VTTestTopology{ + Keyspaces: []*vttestpb.Keyspace{ + { + Name: "vttest", + Shards: []*vttestpb.Shard{ + { + Name: "0", + DbNameOverride: "vttest", + }, + }, + }, + }, + }, + OnlyMySQL: true, + } + if err := cfg.InitSchemas("vttest", testSchema, nil); err != nil { + fmt.Fprintf(os.Stderr, "InitSchemas failed: %v\n", err) + return 1 + } + defer os.RemoveAll(cfg.SchemaDir) + cluster := vttest.LocalCluster{ + Config: cfg, + } + if err := cluster.Setup(); err != nil { + fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err) + return 1 + } + defer cluster.TearDown() + + connParams = cluster.MySQLConnParams() + connAppDebugParams = cluster.MySQLAppDebugConnParams() + config := tabletenv.NewDefaultConfig() + config.Oltp.TxTimeoutSeconds = tabletenv.Seconds(3) + err := framework.StartCustomServer(connParams, connAppDebugParams, cluster.DbName(), config) + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + defer framework.StopServer() + + err = initTableACL() + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func initTableACL() error { + file, err := ioutil.TempFile("", "tableacl.json") + if err != nil { + return err + } + defer os.Remove(file.Name()) + + n, err := file.WriteString(tableACLConfig) + if err != nil { + return err + } + if n != len(tableACLConfig) { + return errors.New("table acl: short write") + } + file.Close() + tableacl.Register("simpleacl", &simpleacl.Factory{}) + tableacl.Init(file.Name(), func() { framework.Server.ClearQueryPlanCache() }) + return nil +} + +var testSchema = `create table vitess_test(intval int default 0, floatval float default null, charval varchar(10) default null, binval varbinary(256) default null, primary key(intval)); +create table vitess_test_debuguser(intval int default 0, floatval float default null, charval varchar(256) default null, binval varbinary(256) default null, primary key(intval)); +grant select, show databases, process on *.* to 'vt_appdebug'@'localhost'; +revoke select on *.* from 'vt_appdebug'@'localhost'; +insert into vitess_test_debuguser values(1, 1.12345, 0xC2A2, 0x00FF), (2, null, '', null), (3, null, null, null); +insert into vitess_test values(1, 1.12345, 0xC2A2, 0x00FF), (2, null, '', null), (3, null, null, null); + +create table vitess_a(eid bigint default 0, id int default 1, name varchar(128) default null, foo varbinary(128) default null, primary key(eid, id)); +create table vitess_b(eid bigint default 0, id int default 0, primary key(eid, id)); +create table vitess_c(eid bigint default 0, name varchar(128) default 'name', foo varbinary(128) default null, primary key(eid, name)); +create table vitess_d(eid bigint default null, id int default null); +create table vitess_e(eid bigint auto_increment, id int default 1, name varchar(128) default 'name', foo varchar(128) default null, primary key(eid, id, name)); +create table vitess_f(vb varbinary(16) default 'ab', id int default null, primary key(vb)); +create table upsert_test(id1 int default 0, id2 int default null, primary key (id1)); +create unique index id2_idx on upsert_test(id2); +insert into vitess_a(eid, id, name, foo) values(1, 1, 'abcd', 'efgh'), (1, 2, 'bcde', 'fghi'); +insert into vitess_b(eid, id) values(1, 1), (1, 2); +insert into vitess_c(eid, name, foo) values(10, 'abcd', '20'), (11, 'bcde', '30'); +create table vitess_mixed_case(Col1 int default 0, COL2 int default null, primary key(col1)); + +CREATE TABLE vitess_autoinc_seq ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + sequence bigint(20) unsigned NOT NULL DEFAULT '0', + PRIMARY KEY (id), + UNIQUE KEY name (name) +); + +create table vitess_big(id int default 0, string1 varchar(128) default null, string2 varchar(100) default null, string3 char(1) default null, string4 varchar(50) default null, string5 varchar(50) default null, string6 varchar(16) default null, string7 varchar(120) default null, bigint1 bigint(20) default null, bigint2 bigint(20) default null, integer1 int default null, tinyint1 tinyint(4) default null, primary key(id)); + +create table vitess_ints(tiny tinyint default 0, tinyu tinyint unsigned default null, small smallint default null, smallu smallint unsigned default null, medium mediumint default null, mediumu mediumint unsigned default null, normal int default null, normalu int unsigned default null, big bigint default null, bigu bigint unsigned default null, y year default null, primary key(tiny)); +create table vitess_fracts(id int default 0, deci decimal(5,2) default null, num numeric(5,2) default null, f float default null, d double default null, primary key(id)); +create table vitess_strings(vb varbinary(16) default 'vb', c char(16) default null, vc varchar(16) default null, b binary(4) default null, tb tinyblob default null, bl blob default null, ttx tinytext default null, tx text default null, en enum('a','b') default null, s set('a','b') default null, primary key(vb)); +create table vitess_misc(id int default 0, b bit(8) default null, d date default null, dt datetime default null, t time default null, g geometry default null, primary key(id)); +create table vitess_bit_default(id bit(8) default b'101', primary key(id)); + +create table vitess_bool(auto int auto_increment, bval tinyint(1) default 0, sval varchar(16) default '', ival int default null, primary key (auto)); + +create table vitess_seq(id int default 0, next_id bigint default null, cache bigint default null, increment bigint default null, primary key(id)) comment 'vitess_sequence'; +insert into vitess_seq(id, next_id, cache) values(0, 1, 3); + +create table vitess_reset_seq(id int default 0, next_id bigint default null, cache bigint default null, increment bigint default null, primary key(id)) comment 'vitess_sequence'; +insert into vitess_reset_seq(id, next_id, cache) values(0, 1, 3); + +create table vitess_part(id int, data varchar(16), primary key(id)); +alter table vitess_part partition by range (id) (partition p0 values less than (10), partition p1 values less than (maxvalue)); + +create table vitess_acl_no_access(key1 bigint default 0, key2 bigint default null, primary key(key1)); +create table vitess_acl_read_only(key1 bigint default 0, key2 bigint default null, primary key(key1)); +create table vitess_acl_read_write(key1 bigint default 0, key2 bigint default null, primary key(key1)); +create table vitess_acl_admin(key1 bigint default 0, key2 bigint default null, primary key(key1)); +create table vitess_acl_unmatched(key1 bigint default 0, key2 bigint default null, primary key(key1)); +create table vitess_acl_all_user_read_only(key1 bigint default 0, key2 bigint default null, primary key(key1));` + +var tableACLConfig = `{ + "table_groups": [ + { + "name": "mysql", + "table_names_or_prefixes": [""], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_cached", + "table_names_or_prefixes": ["vitess_nocache", "vitess_cached%"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_renamed", + "table_names_or_prefixes": ["vitess_renamed%"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_part", + "table_names_or_prefixes": ["vitess_part%"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess", + "table_names_or_prefixes": ["vitess_a", "vitess_b", "vitess_c", "dual", "vitess_d", "vitess_temp", "vitess_e", "vitess_f", "vitess_mixed_case", "upsert_test", "vitess_strings", "vitess_fracts", "vitess_ints", "vitess_misc", "vitess_bit_default", "vitess_big", "vitess_view", "vitess_json", "vitess_bool", "vitess_autoinc_seq"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_test", + "table_names_or_prefixes": ["vitess_test"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_test_ddl", + "table_names_or_prefixes": ["vitess_test_ddl"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_seq", + "table_names_or_prefixes": ["vitess_seq"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_reset_seq", + "table_names_or_prefixes": ["vitess_reset_seq"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_acl_unmatched", + "table_names_or_prefixes": ["vitess_acl_unmatched"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_acl_no_access", + "table_names_or_prefixes": ["vitess_acl_no_access"] + }, + { + "name": "vitess_acl_read_only", + "table_names_or_prefixes": ["vitess_acl_read_only"], + "readers": ["dev"] + }, + { + "name": "vitess_acl_read_write", + "table_names_or_prefixes": ["vitess_acl_read_write"], + "readers": ["dev"], + "writers": ["dev"] + }, + { + "name": "vitess_acl_admin", + "table_names_or_prefixes": ["vitess_acl_admin"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_acl_all_user_read_only", + "table_names_or_prefixes": ["vitess_acl_all_user_read_only"], + "readers": ["dev"] + }, + { + "name": "vitess_acl_appdebug", + "table_names_or_prefixes": ["vitess_test_debuguser"], + "readers": ["dev", "vt_appdebug"], + "writers": ["dev", "vt_appdebug"] + }, + { + "name": "version", + "table_names_or_prefixes": ["vitess_version"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "schema_version", + "table_names_or_prefixes": ["schema_version"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "historian_test1", + "table_names_or_prefixes": ["historian_test1"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + } + ] +}` diff --git a/go/vt/vttablet/endtoend/framework/server.go b/go/vt/vttablet/endtoend/framework/server.go index 32be799bb1c..ba60cdcc466 100644 --- a/go/vt/vttablet/endtoend/framework/server.go +++ b/go/vt/vttablet/endtoend/framework/server.go @@ -53,10 +53,10 @@ var ( TopoServer *topo.Server ) -// StartServer starts the server and initializes +// StartCustomServer starts the server and initializes // all the global variables. This function should only be called // once at the beginning of the test. -func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string) error { +func StartCustomServer(connParams, connAppDebugParams mysql.ConnParams, dbName string, config *tabletenv.TabletConfig) error { // Setup a fake vtgate server. protocol := "resolveTest" *vtgateconn.VtgateProtocol = protocol @@ -68,14 +68,6 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string) dbcfgs := dbconfigs.NewTestDBConfigs(connParams, connAppDebugParams, dbName) - config := tabletenv.NewDefaultConfig() - config.StrictTableACL = true - config.TwoPCEnable = true - config.TwoPCAbandonAge = 1 - config.TwoPCCoordinatorAddress = "fake" - config.HotRowProtection.Mode = tabletenv.Enable - config.TrackSchemaVersions = true - Target = querypb.Target{ Keyspace: "vttest", Shard: "0", @@ -108,6 +100,20 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string) return nil } +// StartServer starts the server and initializes +// all the global variables. This function should only be called +// once at the beginning of the test. +func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string) error { + config := tabletenv.NewDefaultConfig() + config.StrictTableACL = true + config.TwoPCEnable = true + config.TwoPCAbandonAge = 1 + config.TwoPCCoordinatorAddress = "fake" + config.HotRowProtection.Mode = tabletenv.Enable + config.TrackSchemaVersions = true + return StartCustomServer(connParams, connAppDebugParams, dbName, config) +} + // StopServer must be called once all the tests are done. func StopServer() { Server.StopService() diff --git a/go/vt/vttablet/tabletserver/stateful_connection.go b/go/vt/vttablet/tabletserver/stateful_connection.go index 17b353094ac..2fe980c544a 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection.go +++ b/go/vt/vttablet/tabletserver/stateful_connection.go @@ -116,13 +116,25 @@ func (sc *StatefulConnection) execWithRetry(ctx context.Context, query string, m // Unlock returns the connection to the pool. The connection remains active. // This method is idempotent and can be called multiple times func (sc *StatefulConnection) Unlock() { + // when in a transaction, we count from the time created, so each use of the connection does not update the time + updateTime := !sc.IsInTransaction() + sc.unlock(updateTime) +} + +// UnlockUpdateTime returns the connection to the pool. The connection remains active. +// This method is idempotent and can be called multiple times +func (sc *StatefulConnection) UnlockUpdateTime() { + sc.unlock(true) +} + +func (sc *StatefulConnection) unlock(updateTime bool) { if sc.dbConn == nil { return } if sc.dbConn.IsClosed() { sc.Releasef("unlocked closed connection") } else { - sc.pool.markAsNotInUse(sc.ConnID) + sc.pool.markAsNotInUse(sc.ConnID, updateTime) } } diff --git a/go/vt/vttablet/tabletserver/stateful_connection_pool.go b/go/vt/vttablet/tabletserver/stateful_connection_pool.go index f4e7acc32cb..3a8051589ed 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection_pool.go +++ b/go/vt/vttablet/tabletserver/stateful_connection_pool.go @@ -185,8 +185,8 @@ func (sf *StatefulConnectionPool) unregister(id tx.ConnID, reason string) { } //markAsNotInUse marks the connection as not in use at the moment -func (sf *StatefulConnectionPool) markAsNotInUse(id tx.ConnID) { - sf.active.Put(id) +func (sf *StatefulConnectionPool) markAsNotInUse(id tx.ConnID, updateTime bool) { + sf.active.Put(id, updateTime) } // Capacity returns the pool capacity. diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 88e9df05bd6..3437356d22d 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -247,7 +247,7 @@ func (te *TxEngine) Begin(ctx context.Context, preQueries []string, reservedID i if err != nil { return 0, "", err } - defer conn.Unlock() + defer conn.UnlockUpdateTime() return conn.ID(), beginSQL, err } @@ -570,7 +570,7 @@ func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOp if err != nil { return 0, vterrors.Wrap(err, "TxEngine.ReserveBegin") } - defer conn.Unlock() + defer conn.UnlockUpdateTime() _, err = te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, nil) if err != nil { conn.Close() diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index f9598297528..ecfbde88ce2 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -125,13 +125,21 @@ func (tp *TxPool) transactionKiller() { defer tp.env.LogError() for _, conn := range tp.scp.GetOutdated(tp.Timeout(), "for tx killer rollback") { log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.String()) - if conn.IsTainted() { + switch { + case conn.IsTainted(): + conn.Close() tp.env.Stats().KillCounters.Add("ReservedConnection", 1) + case conn.IsInTransaction(): + _, err := conn.Exec(context.Background(), "rollback", 1, false) + if err != nil { + conn.Close() + } + tp.env.Stats().KillCounters.Add("Transactions", 1) } - if conn.IsInTransaction() { + // For logging, as transaction is killed as the connection is closed. + if conn.IsTainted() && conn.IsInTransaction() { tp.env.Stats().KillCounters.Add("Transactions", 1) } - conn.Close() conn.Releasef("exceeded timeout: %v", tp.Timeout()) } }