From a1d50b1f93650419ce61032fdc03b3f2f40518dd Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 16 Apr 2021 10:47:50 +0530 Subject: [PATCH 01/21] added failing tests Signed-off-by: Harshit Gangal --- .../endtoend/vtgate/reservedconn/main_test.go | 2 +- .../vtgate/reservedconn/reserve_conn_test.go | 85 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go diff --git a/go/test/endtoend/vtgate/reservedconn/main_test.go b/go/test/endtoend/vtgate/reservedconn/main_test.go index e3b5ec78567..4e2ce26e9ab 100644 --- a/go/test/endtoend/vtgate/reservedconn/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/main_test.go @@ -121,7 +121,7 @@ func TestMain(m *testing.M) { VSchema: vSchema, } clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} - if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go b/go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go new file mode 100644 index 00000000000..5a9fbebc14b --- /dev/null +++ b/go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reservedconn + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" +) + +func TestServingChange(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + // enable reserved connection. + checkedExec(t, conn, "set enable_system_settings = true") + checkedExec(t, conn, fmt.Sprintf("use %s@rdonly", keyspaceName)) + checkedExec(t, conn, "set sql_mode = ''") + + // this will create reserved connection on rdonly on -80 and 80- shards. + checkedExec(t, conn, "select * from test") + + // changing rdonly tablet to spare (non serving). + rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "spare") + require.NoError(t, err) + + // this should fail as there is no rdonly present + _, err = exec(t, conn, "select * from test") + require.Error(t, err) + + // changing replica tablet to rdonly to make rdonly available for serving. + replicaTablet := clusterInstance.Keyspaces[0].Shards[0].Replica() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") + require.NoError(t, err) + + // this should pass now as there is rdonly present + _, err = exec(t, conn, "select * from test") + require.NoError(t, err) + + // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test +} + +func TestTabletChange(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + // enable reserved connection. + checkedExec(t, conn, "set enable_system_settings = true") + checkedExec(t, conn, fmt.Sprintf("use %s@master", keyspaceName)) + checkedExec(t, conn, "set sql_mode = ''") + + // this will create reserved connection on master on -80 and 80- shards. + checkedExec(t, conn, "select * from test") + + // Change Master + err = clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, "-80")) + require.NoError(t, err) + + // this should pass as there is new master tablet and is serving. + _, err = exec(t, conn, "select * from test") + require.NoError(t, err) + + // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test +} From e85e6b7861471bd0c5ca293c60c50ea398fa3e95 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 16 Apr 2021 08:25:13 +0200 Subject: [PATCH 02/21] reconnect reserved connection if tablet is not serving Signed-off-by: Andres Taylor --- go/test/endtoend/cluster/cluster_process.go | 2 +- go/vt/vtgate/scatter_conn.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index c92de019d90..d7a0ce7035d 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -42,7 +42,7 @@ const ( ) var ( - keepData = flag.Bool("keep-data", false, "don't delete the per-test VTDATAROOT subfolders") + keepData = flag.Bool("keep-data", true, "don't delete the per-test VTDATAROOT subfolders") topoFlavor = flag.String("topo-flavor", "etcd2", "choose a topo server from etcd2, zk2 or consul") isCoverage = flag.Bool("is-coverage", false, "whether coverage is required") forceVTDATAROOT = flag.String("force-vtdataroot", "", "force path for VTDATAROOT, which may already be populated") diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index ad8df6b6a0c..066581150af 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -278,7 +278,8 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } -var errRegx = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") +var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") +var notServing = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) bool { if info.reservedID != 0 && info.transactionID == 0 && wasConnectionClosed(err) { @@ -677,10 +678,12 @@ func (stc *ScatterConn) ExecuteLock( func wasConnectionClosed(err error) bool { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + message := sqlErr.Error() return sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || - (sqlErr.Number() == mysql.ERQueryInterrupted && errRegx.MatchString(sqlErr.Error())) + (sqlErr.Number() == mysql.ERQueryInterrupted && txClosed.MatchString(message)) || + (sqlErr.Number() == mysql.ERUnknownError && notServing.MatchString(message)) } // actionInfo looks at the current session, and returns information about what needs to be done for this tablet From 8f55ff93f8b2892899b935d577fe7238da560146 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 16 Apr 2021 08:59:07 +0200 Subject: [PATCH 03/21] separated end2end tests to make them less fragile Signed-off-by: Andres Taylor --- .../reservedconn/reconnect1/main_test.go | 187 ++++++++++++++++++ .../reservedconn/reconnect2/main_test.go | 177 +++++++++++++++++ .../vtgate/reservedconn/reserve_conn_test.go | 85 -------- go/vt/vtgate/scatter_conn_test.go | 7 + 4 files changed, 371 insertions(+), 85 deletions(-) create mode 100644 go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go create mode 100644 go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go delete mode 100644 go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go new file mode 100644 index 00000000000..024a8a56cca --- /dev/null +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reservedconn + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sqlSchema = ` + create table test( + id bigint, + val1 varchar(16), + val2 int, + val3 float, + primary key(id) + )Engine=InnoDB; + +CREATE TABLE test_vdx ( + val1 varchar(16) NOT NULL, + keyspace_id binary(8), + UNIQUE KEY (val1) +) ENGINE=Innodb; +` + + vSchema = ` + { + "sharded":true, + "vindexes": { + "hash_index": { + "type": "hash" + }, + "lookup1": { + "type": "consistent_lookup", + "params": { + "table": "test_vdx", + "from": "val1", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "test" + }, + "unicode_vdx":{ + "type": "unicode_loose_md5" + } + }, + "tables": { + "test":{ + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + }, + { + "column": "val1", + "name": "lookup1" + } + ] + }, + "test_vdx":{ + "column_vindexes": [ + { + "column": "val1", + "name": "unicode_vdx" + } + ] + } + } + } + ` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s"} + vtgateProcess := clusterInstance.NewVtgateInstance() + vtgateProcess.SysVarSetEnabled = true + if err := vtgateProcess.Setup(); err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestServingChange(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + // enable reserved connection. + checkedExec(t, conn, "set enable_system_settings = true") + checkedExec(t, conn, fmt.Sprintf("use %s@rdonly", keyspaceName)) + checkedExec(t, conn, "set sql_mode = ''") + + // this will create reserved connection on rdonly on -80 and 80- shards. + checkedExec(t, conn, "select * from test") + + // changing rdonly tablet to spare (non serving). + rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "spare") + require.NoError(t, err) + + // this should fail as there is no rdonly present + _, err = exec(t, conn, "select * from test") + require.Error(t, err) + + // changing replica tablet to rdonly to make rdonly available for serving. + replicaTablet := clusterInstance.Keyspaces[0].Shards[0].Replica() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") + require.NoError(t, err) + + // this should pass now as there is rdonly present + _, err = exec(t, conn, "select * from test") + require.NoError(t, err) + + // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test +} + +func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { + t.Helper() + return conn.ExecuteFetch(query, 1000, true) +} + +func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go new file mode 100644 index 00000000000..5e4a6118b25 --- /dev/null +++ b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go @@ -0,0 +1,177 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reservedconn + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sqlSchema = ` + create table test( + id bigint, + val1 varchar(16), + val2 int, + val3 float, + primary key(id) + )Engine=InnoDB; + +CREATE TABLE test_vdx ( + val1 varchar(16) NOT NULL, + keyspace_id binary(8), + UNIQUE KEY (val1) +) ENGINE=Innodb; +` + + vSchema = ` + { + "sharded":true, + "vindexes": { + "hash_index": { + "type": "hash" + }, + "lookup1": { + "type": "consistent_lookup", + "params": { + "table": "test_vdx", + "from": "val1", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "test" + }, + "unicode_vdx":{ + "type": "unicode_loose_md5" + } + }, + "tables": { + "test":{ + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + }, + { + "column": "val1", + "name": "lookup1" + } + ] + }, + "test_vdx":{ + "column_vindexes": [ + { + "column": "val1", + "name": "unicode_vdx" + } + ] + } + } + } + ` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s"} + vtgateProcess := clusterInstance.NewVtgateInstance() + vtgateProcess.SysVarSetEnabled = true + if err := vtgateProcess.Setup(); err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestTabletChange(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + // enable reserved connection. + checkedExec(t, conn, "set enable_system_settings = true") + checkedExec(t, conn, fmt.Sprintf("use %s@master", keyspaceName)) + checkedExec(t, conn, "set sql_mode = ''") + + // this will create reserved connection on master on -80 and 80- shards. + checkedExec(t, conn, "select * from test") + + // Change Master + err = clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, "-80")) + require.NoError(t, err) + + // this should pass as there is new master tablet and is serving. + _, err = exec(t, conn, "select * from test") + require.NoError(t, err) + + // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test +} + +func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { + t.Helper() + return conn.ExecuteFetch(query, 1000, true) +} + +func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} diff --git a/go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go b/go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go deleted file mode 100644 index 5a9fbebc14b..00000000000 --- a/go/test/endtoend/vtgate/reservedconn/reserve_conn_test.go +++ /dev/null @@ -1,85 +0,0 @@ -/* -Copyright 2021 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package reservedconn - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/mysql" -) - -func TestServingChange(t *testing.T) { - conn, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - defer conn.Close() - - // enable reserved connection. - checkedExec(t, conn, "set enable_system_settings = true") - checkedExec(t, conn, fmt.Sprintf("use %s@rdonly", keyspaceName)) - checkedExec(t, conn, "set sql_mode = ''") - - // this will create reserved connection on rdonly on -80 and 80- shards. - checkedExec(t, conn, "select * from test") - - // changing rdonly tablet to spare (non serving). - rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "spare") - require.NoError(t, err) - - // this should fail as there is no rdonly present - _, err = exec(t, conn, "select * from test") - require.Error(t, err) - - // changing replica tablet to rdonly to make rdonly available for serving. - replicaTablet := clusterInstance.Keyspaces[0].Shards[0].Replica() - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") - require.NoError(t, err) - - // this should pass now as there is rdonly present - _, err = exec(t, conn, "select * from test") - require.NoError(t, err) - - // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test -} - -func TestTabletChange(t *testing.T) { - conn, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - defer conn.Close() - - // enable reserved connection. - checkedExec(t, conn, "set enable_system_settings = true") - checkedExec(t, conn, fmt.Sprintf("use %s@master", keyspaceName)) - checkedExec(t, conn, "set sql_mode = ''") - - // this will create reserved connection on master on -80 and 80- shards. - checkedExec(t, conn, "select * from test") - - // Change Master - err = clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, "-80")) - require.NoError(t, err) - - // this should pass as there is new master tablet and is serving. - _, err = exec(t, conn, "select * from test") - require.NoError(t, err) - - // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test -} diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 5f84a56246d..31dfd7f6ed9 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -321,6 +321,13 @@ func TestReservedConnFail(t *testing.T) { assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + + sbc0.Queries = nil + sbc0.EphemeralShardErr = mysql.NewSQLError(mysql.ERUnknownError, mysql.SSUnknownSQLState, "operation not allowed in state NOT_SERVING during query: query1") + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") } func TestIsConnClosed(t *testing.T) { From 88385be7d0bd800253d7f0876a01e7dc0180cfbd Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 16 Apr 2021 10:30:10 +0200 Subject: [PATCH 04/21] add test config Signed-off-by: Andres Taylor --- test/config.json | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/config.json b/test/config.json index 83ea100469d..2dd92552847 100644 --- a/test/config.json +++ b/test/config.json @@ -606,6 +606,24 @@ "RetryMax": 0, "Tags": [] }, + "vtgate_reserved_conn1": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/reservedconn/reconnect1"], + "Command": [], + "Manual": false, + "Shard": "17", + "RetryMax": 0, + "Tags": [] + }, + "vtgate_reserved_conn2": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/reservedconn/reconnect2"], + "Command": [], + "Manual": false, + "Shard": "17", + "RetryMax": 0, + "Tags": [] + }, "vtgate_transaction": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction"], From 8f768dac9658438d40b2694d850918ed21aaafb7 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sun, 18 Apr 2021 17:32:09 +0530 Subject: [PATCH 05/21] upadte e2e test to be more compact Signed-off-by: Harshit Gangal --- go/test/endtoend/cluster/cluster_process.go | 2 +- .../endtoend/vtgate/reservedconn/main_test.go | 2 +- .../reservedconn/reconnect1/main_test.go | 66 ++++------------- .../reservedconn/reconnect2/main_test.go | 70 +++++-------------- 4 files changed, 33 insertions(+), 107 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index d7a0ce7035d..c92de019d90 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -42,7 +42,7 @@ const ( ) var ( - keepData = flag.Bool("keep-data", true, "don't delete the per-test VTDATAROOT subfolders") + keepData = flag.Bool("keep-data", false, "don't delete the per-test VTDATAROOT subfolders") topoFlavor = flag.String("topo-flavor", "etcd2", "choose a topo server from etcd2, zk2 or consul") isCoverage = flag.Bool("is-coverage", false, "whether coverage is required") forceVTDATAROOT = flag.String("force-vtdataroot", "", "force path for VTDATAROOT, which may already be populated") diff --git a/go/test/endtoend/vtgate/reservedconn/main_test.go b/go/test/endtoend/vtgate/reservedconn/main_test.go index 4e2ce26e9ab..e3b5ec78567 100644 --- a/go/test/endtoend/vtgate/reservedconn/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/main_test.go @@ -121,7 +121,7 @@ func TestMain(m *testing.M) { VSchema: vSchema, } clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} - if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go index 024a8a56cca..70bc4c8501b 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -19,10 +19,11 @@ package reservedconn import ( "context" "flag" - "fmt" "os" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -36,21 +37,7 @@ var ( keyspaceName = "ks" cell = "zone1" hostname = "localhost" - sqlSchema = ` - create table test( - id bigint, - val1 varchar(16), - val2 int, - val3 float, - primary key(id) - )Engine=InnoDB; - -CREATE TABLE test_vdx ( - val1 varchar(16) NOT NULL, - keyspace_id binary(8), - UNIQUE KEY (val1) -) ENGINE=Innodb; -` + sqlSchema = `create table test(id bigint primary key)Engine=InnoDB;` vSchema = ` { @@ -58,20 +45,7 @@ CREATE TABLE test_vdx ( "vindexes": { "hash_index": { "type": "hash" - }, - "lookup1": { - "type": "consistent_lookup", - "params": { - "table": "test_vdx", - "from": "val1", - "to": "keyspace_id", - "ignore_nulls": "true" - }, - "owner": "test" - }, - "unicode_vdx":{ - "type": "unicode_loose_md5" - } + } }, "tables": { "test":{ @@ -79,18 +53,6 @@ CREATE TABLE test_vdx ( { "column": "id", "name": "hash_index" - }, - { - "column": "val1", - "name": "lookup1" - } - ] - }, - "test_vdx":{ - "column_vindexes": [ - { - "column": "val1", - "name": "unicode_vdx" } ] } @@ -118,16 +80,13 @@ func TestMain(m *testing.M) { SchemaSQL: sqlSchema, VSchema: vSchema, } - clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "5"} if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil { return 1 } // Start vtgate - clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s"} - vtgateProcess := clusterInstance.NewVtgateInstance() - vtgateProcess.SysVarSetEnabled = true - if err := vtgateProcess.Setup(); err != nil { + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s", "-enable_system_settings=true"} // enable reserved connection. + if err := clusterInstance.StartVtgate(); err != nil { return 1 } @@ -145,9 +104,7 @@ func TestServingChange(t *testing.T) { require.NoError(t, err) defer conn.Close() - // enable reserved connection. - checkedExec(t, conn, "set enable_system_settings = true") - checkedExec(t, conn, fmt.Sprintf("use %s@rdonly", keyspaceName)) + checkedExec(t, conn, "use @rdonly") checkedExec(t, conn, "set sql_mode = ''") // this will create reserved connection on rdonly on -80 and 80- shards. @@ -167,10 +124,11 @@ func TestServingChange(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") require.NoError(t, err) - // this should pass now as there is rdonly present - _, err = exec(t, conn, "select * from test") - require.NoError(t, err) - + for i := 0; i < 10; i++ { + // this should pass now as there is rdonly present + _, err = exec(t, conn, "select * from test") + assert.NoError(t, err, "failed for case: %d", i) + } // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test } diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go index 5e4a6118b25..4ea17a66dfe 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go @@ -23,6 +23,8 @@ import ( "os" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -36,21 +38,7 @@ var ( keyspaceName = "ks" cell = "zone1" hostname = "localhost" - sqlSchema = ` - create table test( - id bigint, - val1 varchar(16), - val2 int, - val3 float, - primary key(id) - )Engine=InnoDB; - -CREATE TABLE test_vdx ( - val1 varchar(16) NOT NULL, - keyspace_id binary(8), - UNIQUE KEY (val1) -) ENGINE=Innodb; -` + sqlSchema = `create table test(id bigint primary key)Engine=InnoDB;` vSchema = ` { @@ -58,20 +46,7 @@ CREATE TABLE test_vdx ( "vindexes": { "hash_index": { "type": "hash" - }, - "lookup1": { - "type": "consistent_lookup", - "params": { - "table": "test_vdx", - "from": "val1", - "to": "keyspace_id", - "ignore_nulls": "true" - }, - "owner": "test" - }, - "unicode_vdx":{ - "type": "unicode_loose_md5" - } + } }, "tables": { "test":{ @@ -79,18 +54,6 @@ CREATE TABLE test_vdx ( { "column": "id", "name": "hash_index" - }, - { - "column": "val1", - "name": "lookup1" - } - ] - }, - "test_vdx":{ - "column_vindexes": [ - { - "column": "val1", - "name": "unicode_vdx" } ] } @@ -124,10 +87,8 @@ func TestMain(m *testing.M) { } // Start vtgate - clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s"} - vtgateProcess := clusterInstance.NewVtgateInstance() - vtgateProcess.SysVarSetEnabled = true - if err := vtgateProcess.Setup(); err != nil { + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s", "-enable_system_settings=true"} // enable reserved connection. + if err := clusterInstance.StartVtgate(); err != nil { return 1 } @@ -145,9 +106,7 @@ func TestTabletChange(t *testing.T) { require.NoError(t, err) defer conn.Close() - // enable reserved connection. - checkedExec(t, conn, "set enable_system_settings = true") - checkedExec(t, conn, fmt.Sprintf("use %s@master", keyspaceName)) + checkedExec(t, conn, "use @master") checkedExec(t, conn, "set sql_mode = ''") // this will create reserved connection on master on -80 and 80- shards. @@ -157,10 +116,19 @@ func TestTabletChange(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, "-80")) require.NoError(t, err) - // this should pass as there is new master tablet and is serving. - _, err = exec(t, conn, "select * from test") + // just to make sure that a new connection is able to successfully run these queries + conn2, err := mysql.Connect(context.Background(), &vtParams) require.NoError(t, err) - + defer conn2.Close() + checkedExec(t, conn2, "set enable_system_settings = true") + checkedExec(t, conn2, fmt.Sprintf("use %s@master", keyspaceName)) + checkedExec(t, conn2, "select * from test") + + for i := 0; i < 10; i++ { + // this should pass as there is new master tablet and is serving. + _, err = exec(t, conn, "select * from test") + assert.NoError(t, err, "failed for case: %d", i) + } // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test } From 9233352926a6522d597e37a97b622b2dba9e56a4 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sun, 18 Apr 2021 23:10:25 +0530 Subject: [PATCH 06/21] check and send to new tablet on the same targed based on the error received Signed-off-by: Harshit Gangal --- go/vt/vtgate/safe_session.go | 2 +- go/vt/vtgate/scatter_conn.go | 66 ++++++++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index 05db3b7ffa4..dd63c2cd301 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -533,7 +533,7 @@ func removeShard(tabletAlias *topodatapb.TabletAlias, sessions []*vtgatepb.Sessi } } if idx == -1 { - return sessions, nil + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] tried to remove missing shard") } return append(sessions[:idx], sessions[idx+1:]...), nil } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 066581150af..f2023bec177 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -145,6 +145,14 @@ func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.Al stc.timings.Record(statsKey, startTime) } +type reset int + +const ( + none reset = iota + shard + newQS +) + // ExecuteMultiShard is like Execute, // but each shard gets its own Sql Queries and BindVariables. // @@ -220,8 +228,13 @@ func (stc *ScatterConn) ExecuteMultiShard( case nothing: innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { - shouldRetry := checkAndResetShardSession(info, err, session) - if shouldRetry { + retry := checkAndResetShardSession(info, err, session) + switch retry { + case newQS: + // Current tablet is not available, try querying new tablet using gateway. + qs = rs.Gateway + fallthrough + case shard: // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it info.actionNeeded = reserve innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) @@ -236,8 +249,13 @@ func (stc *ScatterConn) ExecuteMultiShard( if transactionID != 0 { return info.updateTransactionID(transactionID, alias), err } - shouldRetry := checkAndResetShardSession(info, err, session) - if shouldRetry { + retry := checkAndResetShardSession(info, err, session) + switch retry { + case newQS: + // Current tablet is not available, try querying new tablet using gateway. + qs = rs.Gateway + fallthrough + case shard: // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it info.actionNeeded = reserveBegin innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) @@ -278,24 +296,27 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } -var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") -var notServing = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") - -func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) bool { - if info.reservedID != 0 && info.transactionID == 0 && wasConnectionClosed(err) { +func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) reset { + retry := none + if info.reservedID != 0 && info.transactionID == 0 { + if wasConnectionClosed(err) { + retry = shard + } + if requireNewQS(err) { + retry = newQS + } + } + if retry != none { session.ResetShard(info.alias) - return true } - return false + return retry } func getQueryService(rs *srvtopo.ResolvedShard, info *shardActionInfo) (queryservice.QueryService, error) { _, usingLegacyGw := rs.Gateway.(*DiscoveryGateway) - if usingLegacyGw { - switch info.actionNeeded { - case reserve, reserveBegin: - return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "reserved connections are not supported on old gen gateway") - } + if usingLegacyGw && + (info.actionNeeded == reserve || info.actionNeeded == reserveBegin) { + return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "reserved connections are not supported on old gen gateway") } if usingLegacyGw || info.alias == nil { return rs.Gateway, nil @@ -676,14 +697,23 @@ func (stc *ScatterConn) ExecuteLock( return qr, err } +var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") +var notServing = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") +var wrongTabletType = regexp.MustCompile("invalid tablet type:") + func wasConnectionClosed(err error) bool { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) message := sqlErr.Error() return sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost || - (sqlErr.Number() == mysql.ERQueryInterrupted && txClosed.MatchString(message)) || - (sqlErr.Number() == mysql.ERUnknownError && notServing.MatchString(message)) + (sqlErr.Number() == mysql.ERQueryInterrupted && txClosed.MatchString(message)) +} + +func requireNewQS(err error) bool { + code := vterrors.Code(err) + msg := err.Error() + return code == vtrpcpb.Code_FAILED_PRECONDITION && (notServing.MatchString(msg) || wrongTabletType.MatchString(msg)) } // actionInfo looks at the current session, and returns information about what needs to be done for this tablet From 968a32b92e0ebd01da7dd4abd24e74d1d340d75b Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sun, 18 Apr 2021 23:11:03 +0530 Subject: [PATCH 07/21] updated e2e test Signed-off-by: Harshit Gangal --- .../endtoend/vtgate/reservedconn/main_test.go | 6 ++---- .../vtgate/reservedconn/reconnect1/main_test.go | 13 +++++++------ .../vtgate/reservedconn/reconnect2/main_test.go | 17 +++-------------- 3 files changed, 12 insertions(+), 24 deletions(-) diff --git a/go/test/endtoend/vtgate/reservedconn/main_test.go b/go/test/endtoend/vtgate/reservedconn/main_test.go index e3b5ec78567..ebe133a2c81 100644 --- a/go/test/endtoend/vtgate/reservedconn/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/main_test.go @@ -126,10 +126,8 @@ func TestMain(m *testing.M) { } // Start vtgate - clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s"} - vtgateProcess := clusterInstance.NewVtgateInstance() - vtgateProcess.SysVarSetEnabled = true - if err := vtgateProcess.Setup(); err != nil { + clusterInstance.VtGateExtraArgs = []string{"-lock_heartbeat_time", "2s", "-enable_system_settings=true"} // enable reserved connection. + if err := clusterInstance.StartVtgate(); err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go index 70bc4c8501b..73fd2025bbf 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -21,6 +21,7 @@ import ( "flag" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -124,12 +125,12 @@ func TestServingChange(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") require.NoError(t, err) - for i := 0; i < 10; i++ { - // this should pass now as there is rdonly present - _, err = exec(t, conn, "select * from test") - assert.NoError(t, err, "failed for case: %d", i) - } - // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test + // added some sleep time for VTGate to know the healthy rdonly. + time.Sleep(5 * time.Second) + + // this should pass now as there is rdonly present + _, err = exec(t, conn, "select * from test") + assert.NoError(t, err) } func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go index 4ea17a66dfe..c2427a84f66 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go @@ -116,20 +116,9 @@ func TestTabletChange(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, "-80")) require.NoError(t, err) - // just to make sure that a new connection is able to successfully run these queries - conn2, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - defer conn2.Close() - checkedExec(t, conn2, "set enable_system_settings = true") - checkedExec(t, conn2, fmt.Sprintf("use %s@master", keyspaceName)) - checkedExec(t, conn2, "select * from test") - - for i := 0; i < 10; i++ { - // this should pass as there is new master tablet and is serving. - _, err = exec(t, conn, "select * from test") - assert.NoError(t, err, "failed for case: %d", i) - } - // This test currently failed with error: vttablet: rpc error: code = FailedPrecondition desc = operation not allowed in state NOT_SERVING (errno 1105) (sqlstate HY000) during query: select * from test + // this should pass as there is new master tablet and is serving. + _, err = exec(t, conn, "select * from test") + assert.NoError(t, err) } func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { From 85039b35bea4950f6a9e9af87728036ec7efbcfb Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 19 Apr 2021 08:57:48 +0530 Subject: [PATCH 08/21] fix unit test Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 31dfd7f6ed9..12d57ad40ab 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -19,6 +19,8 @@ package vtgate import ( "testing" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" @@ -323,7 +325,7 @@ func TestReservedConnFail(t *testing.T) { assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") sbc0.Queries = nil - sbc0.EphemeralShardErr = mysql.NewSQLError(mysql.ERUnknownError, mysql.SSUnknownSQLState, "operation not allowed in state NOT_SERVING during query: query1") + sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state NOT_SERVING during query: query1") _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) From 88be8f2919d67349cbe98764e4a18dfd4e1b1229 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 19 Apr 2021 09:13:53 +0530 Subject: [PATCH 09/21] added test for invalid target Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 12d57ad40ab..dc259f1a8de 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -330,6 +330,13 @@ func TestReservedConnFail(t *testing.T) { assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + + sbc0.Queries = nil + sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: REPLICA, want: MASTER or MASTER") + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") } func TestIsConnClosed(t *testing.T) { From e5de1c7c44efbede40af27bfb950e1032ac5356b Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 19 Apr 2021 10:11:38 +0530 Subject: [PATCH 10/21] added ping command to e2e test to make rdonly available Signed-off-by: Harshit Gangal --- .../vtgate/reservedconn/reconnect1/main_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go index 73fd2025bbf..001b47ae78f 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -21,7 +21,6 @@ import ( "flag" "os" "testing" - "time" "github.com/stretchr/testify/assert" @@ -108,11 +107,16 @@ func TestServingChange(t *testing.T) { checkedExec(t, conn, "use @rdonly") checkedExec(t, conn, "set sql_mode = ''") + rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() + + // to see/make the new rdonly available + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Ping", rdonlyTablet.Alias) + require.NoError(t, err) + // this will create reserved connection on rdonly on -80 and 80- shards. checkedExec(t, conn, "select * from test") // changing rdonly tablet to spare (non serving). - rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "spare") require.NoError(t, err) @@ -125,8 +129,9 @@ func TestServingChange(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "rdonly") require.NoError(t, err) - // added some sleep time for VTGate to know the healthy rdonly. - time.Sleep(5 * time.Second) + // to see/make the new rdonly available + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Ping", replicaTablet.Alias) + require.NoError(t, err) // this should pass now as there is rdonly present _, err = exec(t, conn, "select * from test") From f5bf2f490e1dc0d61622a21daad5ec39ca69a8a2 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 19 Apr 2021 17:47:52 +0530 Subject: [PATCH 11/21] return query service using alias only if the query service is serving traffic and also has valid target Signed-off-by: Harshit Gangal --- go/vt/discovery/fake_healthcheck.go | 2 +- go/vt/discovery/healthcheck.go | 110 +++++++++--------- go/vt/srvtopo/resolver.go | 2 +- go/vt/vtgate/discoverygateway.go | 2 +- go/vt/vtgate/gateway.go | 4 +- go/vt/vtgate/scatter_conn.go | 2 +- go/vt/vtgate/tabletgateway.go | 4 +- go/vt/vtgate/tx_conn.go | 2 +- go/vt/vttablet/sandboxconn/sandboxconn.go | 2 +- .../tabletconntest/fakequeryservice.go | 2 +- 10 files changed, 70 insertions(+), 62 deletions(-) diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index c762eb48bec..b8c3d1ce1bf 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -146,7 +146,7 @@ func (fhc *FakeHealthCheck) ReplaceTablet(old, new *topodatapb.Tablet) { } // TabletConnection returns the TabletConn of the given tablet. -func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { aliasStr := topoproto.TabletAliasString(alias) fhc.mu.RLock() defer fhc.mu.RUnlock() diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 4af9dfd7202..aa549d70c70 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -45,19 +45,19 @@ import ( "sync" "time" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/queryservice" + "github.com/golang/protobuf/proto" "vitess.io/vitess/go/flagutil" - - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/queryservice" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + topoprotopb "vitess.io/vitess/go/vt/topo/topoproto" ) var ( @@ -72,7 +72,7 @@ var ( //TODO(deepthi): change these vars back to unexported when discoveryGateway is removed // AllowedTabletTypes is the list of allowed tablet types. e.g. {MASTER, REPLICA} - AllowedTabletTypes []topodata.TabletType + AllowedTabletTypes []topodatapb.TabletType // TabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets TabletFilters flagutil.StringListValue // KeyspacesToWatch - if provided this specifies which keyspaces should be @@ -144,7 +144,7 @@ func init() { // Flags are not parsed at this point and the default value of the flag (just the hostname) will be used. ParseTabletURLTemplateFromFlag() flag.Var(&TabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch") - topoproto.TabletTypeListVar(&AllowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to") + topoprotopb.TabletTypeListVar(&AllowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to") flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema") } @@ -152,11 +152,11 @@ func init() { // It is separated out to enable unit testing. type TabletRecorder interface { // AddTablet adds the tablet. - AddTablet(tablet *topodata.Tablet) + AddTablet(tablet *topodatapb.Tablet) // RemoveTablet removes the tablet. - RemoveTablet(tablet *topodata.Tablet) + RemoveTablet(tablet *topodatapb.Tablet) // ReplaceTablet does an AddTablet and RemoveTablet in one call, effectively replacing the old tablet with the new. - ReplaceTablet(old, new *topodata.Tablet) + ReplaceTablet(old, new *topodatapb.Tablet) } type keyspaceShardTabletType string @@ -174,10 +174,10 @@ type HealthCheck interface { // each given target before returning. // It will return ctx.Err() if the context is canceled. // It will return an error if it can't read the necessary topology records. - WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error + WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error // TabletConnection returns the TabletConn of the given tablet. - TabletConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error) + TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) // RegisterStats registers the connection counts stats RegisterStats() @@ -188,7 +188,7 @@ type HealthCheck interface { // the most recent tablet of type master. // This returns a copy of the data so that callers can access without // synchronization - GetHealthyTabletStats(target *query.Target) []*TabletHealth + GetHealthyTabletStats(target *querypb.Target) []*TabletHealth // Subscribe adds a listener. Used by vtgate buffer to learn about master changes. Subscribe() chan *TabletHealth @@ -312,7 +312,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur // AddTablet adds the tablet, and starts health check. // It does not block on making connection. // name is an optional tag for the tablet, e.g. an alternative address. -func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { +func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet) { // check whether grpc port is present on tablet, if not return if tablet.PortMap["grpc"] == 0 { return @@ -326,7 +326,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { return } ctx, cancelFunc := context.WithCancel(context.Background()) - target := &query.Target{ + target := &querypb.Target{ Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type, @@ -340,7 +340,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { // add to our datastore key := hc.keyFromTarget(target) - tabletAlias := topoproto.TabletAliasString(tablet.Alias) + tabletAlias := topoprotopb.TabletAliasString(tablet.Alias) if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok { // We should not add a tablet that we already have log.Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias) @@ -363,23 +363,23 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { // RemoveTablet removes the tablet, and stops the health check. // It does not block. -func (hc *HealthCheckImpl) RemoveTablet(tablet *topodata.Tablet) { +func (hc *HealthCheckImpl) RemoveTablet(tablet *topodatapb.Tablet) { hc.deleteTablet(tablet) } // ReplaceTablet removes the old tablet and adds the new tablet. -func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) { +func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodatapb.Tablet) { hc.RemoveTablet(old) hc.AddTablet(new) } -func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { +func (hc *HealthCheckImpl) deleteTablet(tablet *topodatapb.Tablet) { log.Infof("Removing tablet from healthcheck: %v", tablet) hc.mu.Lock() defer hc.mu.Unlock() key := hc.keyFromTablet(tablet) - tabletAlias := tabletAliasString(topoproto.TabletAliasString(tablet.Alias)) + tabletAlias := tabletAliasString(topoprotopb.TabletAliasString(tablet.Alias)) // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] if !ok { @@ -404,17 +404,17 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { } } -func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Target, trivialUpdate bool, isPrimaryUp bool) { +func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *querypb.Target, trivialUpdate bool, isPrimaryUp bool) { // hc.healthByAlias is authoritative, it should be updated hc.mu.Lock() defer hc.mu.Unlock() - tabletAlias := tabletAliasString(topoproto.TabletAliasString(th.Tablet.Alias)) + tabletAlias := tabletAliasString(topoprotopb.TabletAliasString(th.Tablet.Alias)) targetKey := hc.keyFromTarget(th.Target) targetChanged := prevTarget.TabletType != th.Target.TabletType || prevTarget.Keyspace != th.Target.Keyspace || prevTarget.Shard != th.Target.Shard if targetChanged { // Error counter has to be set here in case we get a new tablet type for the first time in a stream response - hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoproto.TabletTypeLString(th.Target.TabletType)}, 0) + hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoprotopb.TabletTypeLString(th.Target.TabletType)}, 0) // keyspace and shard are not expected to change, but just in case ... // move this tabletHealthCheck to the correct map oldTargetKey := hc.keyFromTarget(prevTarget) @@ -427,7 +427,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // add it to the map by target hc.healthData[targetKey][tabletAlias] = th - isPrimary := th.Target.TabletType == topodata.TabletType_MASTER + isPrimary := th.Target.TabletType == topodatapb.TabletType_MASTER switch { case isPrimary && isPrimaryUp: if len(hc.healthy[targetKey]) == 0 { @@ -437,9 +437,9 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // need to replace it. if th.MasterTermStartTime < hc.healthy[targetKey][0].MasterTermStartTime { log.Warningf("not marking healthy master %s as Up for %s because its MasterTermStartTime is smaller than the highest known timestamp from previous MASTERs %s: %d < %d ", - topoproto.TabletAliasString(th.Tablet.Alias), - topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard), - topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias), + topoprotopb.TabletAliasString(th.Tablet.Alias), + topoprotopb.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard), + topoprotopb.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias), th.MasterTermStartTime, hc.healthy[targetKey][0].MasterTermStartTime) } else { @@ -456,18 +456,18 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // We re-sort the healthy tablet list whenever we get a health update for tablets we can route to. // Tablets from other cells for non-master targets should not trigger a re-sort; // they should also be excluded from healthy list. - if th.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { + if th.Target.TabletType != topodatapb.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { hc.recomputeHealthy(targetKey) } - if targetChanged && prevTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list + if targetChanged && prevTarget.TabletType != topodatapb.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list oldTargetKey := hc.keyFromTarget(prevTarget) hc.recomputeHealthy(oldTargetKey) } } - isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_MASTER + isNewPrimary := isPrimary && prevTarget.TabletType != topodatapb.TabletType_MASTER if isNewPrimary { - log.Errorf("Adding 1 to MasterPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) + log.Errorf("Adding 1 to MasterPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoprotopb.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) hcMasterPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1) } @@ -579,7 +579,7 @@ func (hc *HealthCheckImpl) Close() error { // the most recent tablet of type master. // This returns a copy of the data so that callers can access without // synchronization -func (hc *HealthCheckImpl) GetHealthyTabletStats(target *query.Target) []*TabletHealth { +func (hc *HealthCheckImpl) GetHealthyTabletStats(target *querypb.Target) []*TabletHealth { var result []*TabletHealth hc.mu.Lock() defer hc.mu.Unlock() @@ -593,7 +593,7 @@ func (hc *HealthCheckImpl) GetHealthyTabletStats(target *query.Target) []*Tablet // The returned array is owned by the caller. // For TabletType_MASTER, this will only return at most one entry, // the most recent tablet of type master. -func (hc *HealthCheckImpl) getTabletStats(target *query.Target) []*TabletHealth { +func (hc *HealthCheckImpl) getTabletStats(target *querypb.Target) []*TabletHealth { var result []*TabletHealth hc.mu.Lock() defer hc.mu.Unlock() @@ -607,8 +607,8 @@ func (hc *HealthCheckImpl) getTabletStats(target *query.Target) []*TabletHealth // WaitForTablets waits for at least one tablet in the given // keyspace / shard / tablet type before returning. The tablets do not // have to be healthy. It will return ctx.Err() if the context is canceled. -func (hc *HealthCheckImpl) WaitForTablets(ctx context.Context, keyspace, shard string, tabletType topodata.TabletType) error { - targets := []*query.Target{ +func (hc *HealthCheckImpl) WaitForTablets(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType) error { + targets := []*querypb.Target{ { Keyspace: keyspace, Shard: shard, @@ -622,13 +622,13 @@ func (hc *HealthCheckImpl) WaitForTablets(ctx context.Context, keyspace, shard s // each given target before returning. // It will return ctx.Err() if the context is canceled. // It will return an error if it can't read the necessary topology records. -func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error { +func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error { return hc.waitForTablets(ctx, targets, true) } // FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces -func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target { - filteredTargets := make([]*query.Target, 0) +func FilterTargetsByKeyspaces(keyspaces []string, targets []*querypb.Target) []*querypb.Target { + filteredTargets := make([]*querypb.Target, 0) // Keep them all if there are no keyspaces to watch if len(KeyspacesToWatch) == 0 { @@ -647,7 +647,7 @@ func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*qu } // waitForTablets is the internal method that polls for tablets. -func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error { +func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*querypb.Target, requireServing bool) error { targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets) for { @@ -693,25 +693,31 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. } // TabletConnection returns the Connection to a given tablet. -func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error) { +func (hc *HealthCheckImpl) TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { hc.mu.Lock() - thc := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))] + thc := hc.healthByAlias[tabletAliasString(topoprotopb.TabletAliasString(alias))] hc.mu.Unlock() if thc == nil || thc.Conn == nil { //TODO: test that throws this error - return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) + } + if !thc.Serving { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "tablet: %v is not serving", alias) + } + if !proto.Equal(thc.Target, target) { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "tablet: target mismatch %v vs %v", thc.Target, target) } return thc.Connection(), nil } // Target includes cell which we ignore here // because tabletStatsCache is intended to be per-cell -func (hc *HealthCheckImpl) keyFromTarget(target *query.Target) keyspaceShardTabletType { - return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType))) +func (hc *HealthCheckImpl) keyFromTarget(target *querypb.Target) keyspaceShardTabletType { + return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", target.Keyspace, target.Shard, topoprotopb.TabletTypeLString(target.TabletType))) } -func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardTabletType { - return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type))) +func (hc *HealthCheckImpl) keyFromTablet(tablet *topodatapb.Tablet) keyspaceShardTabletType { + return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoprotopb.TabletTypeLString(tablet.Type))) } // getAliasByCell should only be called while holding hc.mu @@ -728,8 +734,8 @@ func (hc *HealthCheckImpl) getAliasByCell(cell string) string { return alias } -func (hc *HealthCheckImpl) isIncluded(tabletType topodata.TabletType, tabletAlias *topodata.TabletAlias) bool { - if tabletType == topodata.TabletType_MASTER { +func (hc *HealthCheckImpl) isIncluded(tabletType topodatapb.TabletType, tabletAlias *topodatapb.TabletAlias) bool { + if tabletType == topodatapb.TabletType_MASTER { return true } if tabletAlias.Cell == hc.cell { diff --git a/go/vt/srvtopo/resolver.go b/go/vt/srvtopo/resolver.go index 7134b0b9852..e668e0e248c 100644 --- a/go/vt/srvtopo/resolver.go +++ b/go/vt/srvtopo/resolver.go @@ -40,7 +40,7 @@ type Gateway interface { queryservice.QueryService // QueryServiceByAlias returns a QueryService - QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) + QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) } // A Resolver can resolve keyspace ids and key ranges into ResolvedShard* diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index e324e998efd..b5d24c28829 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -405,6 +405,6 @@ func (dg *DiscoveryGateway) getStatsAggregator(target *querypb.Target) *TabletSt } // QueryServiceByAlias satisfies the Gateway interface -func (dg *DiscoveryGateway) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (dg *DiscoveryGateway) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "DiscoveryGateway does not implement QueryServiceByAlias") } diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go index 1398ad5419e..86a015ab77a 100644 --- a/go/vt/vtgate/gateway.go +++ b/go/vt/vtgate/gateway.go @@ -17,6 +17,8 @@ import ( "flag" "time" + querypb "vitess.io/vitess/go/vt/proto/query" + "context" "vitess.io/vitess/go/vt/log" @@ -68,7 +70,7 @@ type Gateway interface { TabletsCacheStatus() discovery.TabletsCacheStatusList // TabletByAlias returns a QueryService - QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) + QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) } // Creator is the factory method which can create the actual gateway object. diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index f2023bec177..56212260a1c 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -321,7 +321,7 @@ func getQueryService(rs *srvtopo.ResolvedShard, info *shardActionInfo) (queryser if usingLegacyGw || info.alias == nil { return rs.Gateway, nil } - return rs.Gateway.QueryServiceByAlias(info.alias) + return rs.Gateway.QueryServiceByAlias(info.alias, rs.Target) } func (stc *ScatterConn) processOneStreamingResult(mu *sync.Mutex, fieldSent *bool, qr *sqltypes.Result, callback func(*sqltypes.Result) error) error { diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 5556cafdb80..5efe4276421 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -132,8 +132,8 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop } // QueryServiceByAlias satisfies the Gateway interface -func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) { - return gw.hc.TabletConnection(alias) +func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { + return gw.hc.TabletConnection(alias, nil) } // RegisterStats registers the stats to export the lag since the last refresh diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 4747dcf2664..b056b8db508 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -87,7 +87,7 @@ func (txc *TxConn) queryService(alias *topodatapb.TabletAlias) (queryservice.Que if qs != nil { return qs, nil } - return txc.gateway.QueryServiceByAlias(alias) + return txc.gateway.QueryServiceByAlias(alias, nil) } func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSession) error { diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 63d8819d516..c99e826b0dd 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -481,7 +481,7 @@ func (sbc *SandboxConn) VStreamResults(ctx context.Context, target *querypb.Targ } // QueryServiceByAlias is part of the Gateway interface. -func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { return sbc, nil } diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index 84849ea06ea..9fb15d053d5 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -720,7 +720,7 @@ func (f *FakeQueryService) VStreamResults(ctx context.Context, target *querypb.T } // QueryServiceByAlias satisfies the Gateway interface -func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) { +func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { panic("not implemented") } From 44e99bc3ae239f2aa9d895542139555e2b1b8441 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 19 Apr 2021 23:52:27 +0530 Subject: [PATCH 12/21] moved few errors and regex to vterrors as were used at multiple place and were needed to be in sync Signed-off-by: Harshit Gangal --- go/vt/discovery/fake_healthcheck.go | 15 +++++---- go/vt/discovery/healthcheck.go | 4 +-- go/vt/vterrors/constants.go | 34 ++++++++++++++++++++ go/vt/vtgate/scatter_conn.go | 4 +-- go/vt/vtgate/tabletgateway.go | 2 +- go/vt/vttablet/tabletserver/state_manager.go | 6 ++-- 6 files changed, 49 insertions(+), 16 deletions(-) create mode 100644 go/vt/vterrors/constants.go diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index b8c3d1ce1bf..d5d3a9d9549 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -21,20 +21,18 @@ import ( "sort" "sync" - "vitess.io/vitess/go/sync2" - "github.com/golang/protobuf/proto" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/sandboxconn" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var ( @@ -146,16 +144,19 @@ func (fhc *FakeHealthCheck) ReplaceTablet(old, new *topodatapb.Tablet) { } // TabletConnection returns the TabletConn of the given tablet. -func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { +func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { aliasStr := topoproto.TabletAliasString(alias) fhc.mu.RLock() defer fhc.mu.RUnlock() for _, item := range fhc.items { if proto.Equal(alias, item.ts.Tablet.Alias) { + if !item.ts.Serving { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) + } return item.ts.Conn, nil } } - return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet %v not found", aliasStr) + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "tablet %v not found", aliasStr) } // CacheStatus returns the status for each tablet diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index aa549d70c70..a63710c728d 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -702,10 +702,10 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodatapb.TabletAlias, targe return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } if !thc.Serving { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "tablet: %v is not serving", alias) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) } if !proto.Equal(thc.Target, target) { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "tablet: target mismatch %v vs %v", thc.Target, target) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, thc.Target, target) } return thc.Connection(), nil } diff --git a/go/vt/vterrors/constants.go b/go/vt/vterrors/constants.go new file mode 100644 index 00000000000..f602b9f0054 --- /dev/null +++ b/go/vt/vterrors/constants.go @@ -0,0 +1,34 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vterrors + +import "regexp" + +// Operation not allowed error +const ( + NotServing = "operation not allowed in state NOT_SERVING" + ShuttingDown = "operation not allowed in state SHUTTING_DOWN" +) + +// RxOp regex for operation not allowed error +var RxOp = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") + +// WrongTablet for invalid tablet type error +const WrongTablet = "invalid tablet type" + +// RxWrongTablet regex for invalid tablet type error +var RxWrongTablet = regexp.MustCompile("invalid tablet type") diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 56212260a1c..32d99951d60 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -698,8 +698,6 @@ func (stc *ScatterConn) ExecuteLock( } var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") -var notServing = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") -var wrongTabletType = regexp.MustCompile("invalid tablet type:") func wasConnectionClosed(err error) bool { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) @@ -713,7 +711,7 @@ func wasConnectionClosed(err error) bool { func requireNewQS(err error) bool { code := vterrors.Code(err) msg := err.Error() - return code == vtrpcpb.Code_FAILED_PRECONDITION && (notServing.MatchString(msg) || wrongTabletType.MatchString(msg)) + return code == vtrpcpb.Code_FAILED_PRECONDITION && (vterrors.RxOp.MatchString(msg) || vterrors.RxWrongTablet.MatchString(msg)) } // actionInfo looks at the current session, and returns information about what needs to be done for this tablet diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 5efe4276421..504945bd70a 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -133,7 +133,7 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop // QueryServiceByAlias satisfies the Gateway interface func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { - return gw.hc.TabletConnection(alias, nil) + return gw.hc.TabletConnection(alias, target) } // RegisterStats registers the stats to export the lag since the last refresh diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index c1498e43f44..87433ebb858 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -348,13 +348,13 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target if sm.state != StateServing || !sm.replHealthy { // This specific error string needs to be returned for vtgate buffering to work. - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state NOT_SERVING") + return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) } shuttingDown := sm.wantState != StateServing if shuttingDown && !allowOnShutdown { // This specific error string needs to be returned for vtgate buffering to work. - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state SHUTTING_DOWN") + return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.ShuttingDown) } if target != nil { @@ -369,7 +369,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target goto ok } } - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: %v, want: %v or %v", target.TabletType, sm.target.TabletType, sm.alsoAllow) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: %v, want: %v or %v", vterrors.WrongTablet, target.TabletType, sm.target.TabletType, sm.alsoAllow) } } else { if !tabletenv.IsLocalContext(ctx) { From cb593e0c04e8550ffec2733571fd5289acc84ed0 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 20 Apr 2021 00:03:26 +0530 Subject: [PATCH 13/21] added unit test for query served through different tablet if the current tablet is not servicable without querying the tablet and using heathcheck at vtgate Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index dc259f1a8de..3512bbab1dd 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -323,6 +323,7 @@ func TestReservedConnFail(t *testing.T) { assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId sbc0.Queries = nil sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "operation not allowed in state NOT_SERVING during query: query1") @@ -330,6 +331,7 @@ func TestReservedConnFail(t *testing.T) { assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId sbc0.Queries = nil sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: REPLICA, want: MASTER or MASTER") @@ -337,6 +339,24 @@ func TestReservedConnFail(t *testing.T) { assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId + + tablet0 := sbc0.Tablet() + ths := hc.GetHealthyTabletStats(&querypb.Target{ + Keyspace: tablet0.GetKeyspace(), + Shard: tablet0.GetShard(), + TabletType: tablet0.GetType(), + }) + ths[0].Serving = false + sbc0Rep := hc.AddTestTablet("aa", "0", 2, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + + sbc0.Queries = nil + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 0, len(sbc0.Queries), "one for the failed attempt, and one for the retry") + assert.Equal(t, 1, len(sbc0Rep.Queries), "one for the failed attempt, and one for the retry") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + } func TestIsConnClosed(t *testing.T) { From 8f15813731951da6eaeaf4c3981c2358ea589e45 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 20 Apr 2021 00:13:07 +0530 Subject: [PATCH 14/21] added logic to check and reset shard session and also to acquire new query service connection if it matches the criteria Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 32d99951d60..88e0a5dd5cf 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -221,7 +221,19 @@ func (stc *ScatterConn) ExecuteMultiShard( qs, err = getQueryService(rs, info) if err != nil { - return nil, err + switch info.actionNeeded { + case nothing: + info.actionNeeded = reserve + case begin: + info.actionNeeded = reserveBegin + default: + return nil, err + } + retry := checkAndResetShardSession(info, err, session) + if retry != newQS { + return nil, err + } + qs = rs.Gateway } switch info.actionNeeded { From 3311045f940b1e6dd7ede3edd82e2d3bdb4ef610 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 20 Apr 2021 00:28:14 +0530 Subject: [PATCH 15/21] added unit test to check query serving if the alias tablet target is changed Signed-off-by: Harshit Gangal --- go/vt/discovery/fake_healthcheck.go | 6 ++++- go/vt/vtgate/scatter_conn_test.go | 34 ++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index d5d3a9d9549..8420a1bb294 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -144,7 +144,7 @@ func (fhc *FakeHealthCheck) ReplaceTablet(old, new *topodatapb.Tablet) { } // TabletConnection returns the TabletConn of the given tablet. -func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, _ *querypb.Target) (queryservice.QueryService, error) { +func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { aliasStr := topoproto.TabletAliasString(alias) fhc.mu.RLock() defer fhc.mu.RUnlock() @@ -153,6 +153,10 @@ func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, _ *q if !item.ts.Serving { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) } + if !proto.Equal(item.ts.Target, target) { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, item.ts.Target, target) + } + return item.ts.Conn, nil } } diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 3512bbab1dd..4a5a8dcd8d5 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -340,23 +340,51 @@ func TestReservedConnFail(t *testing.T) { require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") oldRId = session.Session.ShardSessions[0].ReservedId + oldAlias := session.Session.ShardSessions[0].TabletAlias + // Test Setup tablet0 := sbc0.Tablet() ths := hc.GetHealthyTabletStats(&querypb.Target{ Keyspace: tablet0.GetKeyspace(), Shard: tablet0.GetShard(), TabletType: tablet0.GetType(), }) - ths[0].Serving = false + sbc0Th := ths[0] + sbc0Th.Serving = false sbc0Rep := hc.AddTestTablet("aa", "0", 2, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) sbc0.Queries = nil _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) - assert.Equal(t, 0, len(sbc0.Queries), "one for the failed attempt, and one for the retry") - assert.Equal(t, 1, len(sbc0Rep.Queries), "one for the failed attempt, and one for the retry") + assert.Equal(t, 0, len(sbc0.Queries), "no attempt should be made as the tablet is not serving") + assert.Equal(t, 1, len(sbc0Rep.Queries), "first attempt should pass as it is healthy") require.Equal(t, 1, len(session.ShardSessions)) assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + assert.NotEqual(t, oldAlias, session.Session.ShardSessions[0].TabletAlias, "tablet alias should have changed as this is a different tablet") + oldRId = session.Session.ShardSessions[0].ReservedId + oldAlias = session.Session.ShardSessions[0].TabletAlias + + // Test Setup + tablet0Rep := sbc0Rep.Tablet() + newThs := hc.GetHealthyTabletStats(&querypb.Target{ + Keyspace: tablet0Rep.GetKeyspace(), + Shard: tablet0Rep.GetShard(), + TabletType: tablet0Rep.GetType(), + }) + sbc0RepTh := newThs[0] + sbc0RepTh.Target = &querypb.Target{ + Keyspace: tablet0Rep.GetKeyspace(), + Shard: tablet0Rep.GetShard(), + TabletType: topodatapb.TabletType_SPARE, + } + sbc0Th.Serving = true + sbc0Rep.Queries = nil + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 1, len(sbc0.Queries), "first attempt should pass as it is healthy and matches the target") + assert.Equal(t, 0, len(sbc0Rep.Queries), " no attempt should be made as the tablet target is changed") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + assert.NotEqual(t, oldAlias, session.Session.ShardSessions[0].TabletAlias, "tablet alias should have changed as this is a different tablet") } func TestIsConnClosed(t *testing.T) { From c161abafd99204ad6cde35c8d2e3ee2c6cb18ef0 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 20 Apr 2021 10:46:46 +0530 Subject: [PATCH 16/21] check target only if it is not nil Signed-off-by: Harshit Gangal --- go/vt/discovery/fake_healthcheck.go | 2 +- go/vt/discovery/healthcheck.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index 8420a1bb294..bbb1ec98baa 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -153,7 +153,7 @@ func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, targ if !item.ts.Serving { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) } - if !proto.Equal(item.ts.Target, target) { + if target != nil && !proto.Equal(item.ts.Target, target) { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, item.ts.Target, target) } diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index a63710c728d..9db28a1494e 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -704,7 +704,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodatapb.TabletAlias, targe if !thc.Serving { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) } - if !proto.Equal(thc.Target, target) { + if target != nil && !proto.Equal(thc.Target, target) { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, thc.Target, target) } return thc.Connection(), nil From 10a03d3ad56f56b17bd428286e00b061006a0201 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 20 Apr 2021 08:43:59 +0200 Subject: [PATCH 17/21] refactor Signed-off-by: Andres Taylor --- go/vt/vtgate/scatter_conn.go | 59 ++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 88e0a5dd5cf..197a4f9c540 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -221,6 +221,10 @@ func (stc *ScatterConn) ExecuteMultiShard( qs, err = getQueryService(rs, info) if err != nil { + // an error here could mean that the tablet we were targeting earlier has changed type. + // if we have a transaction, we'll have to fail, but if we only had a reserved connection, + // we can create a new reserved connection to a new tablet that is on the right shard + // and has the right type switch info.actionNeeded { case nothing: info.actionNeeded = reserve @@ -236,49 +240,60 @@ func (stc *ScatterConn) ExecuteMultiShard( qs = rs.Gateway } + retryConnection := func(resetTabletConnAndExec func(), failUpdate func() *shardActionInfo) { + retry := checkAndResetShardSession(info, err, session) + switch retry { + case newQS: + // Current tablet is not available, try querying new tablet using gateway. + qs = rs.Gateway + fallthrough + case shard: + // if we need to reset a reserved connection, here is our chance to try executing again, + // against a new connection + resetTabletConnAndExec() + } + // err will have been changed by the call above + if err != nil { + info = failUpdate() + } + } + switch info.actionNeeded { case nothing: innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { - retry := checkAndResetShardSession(info, err, session) - switch retry { - case newQS: - // Current tablet is not available, try querying new tablet using gateway. - qs = rs.Gateway - fallthrough - case shard: - // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it + retryConnection(func() { + // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserve innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) - } + }, func() *shardActionInfo { + // we failed, let's clear out any lingering reserve ID + return info.updateReservedID(reservedID, alias) + }) if err != nil { - return info.updateReservedID(reservedID, alias), err + return info, err } } + case begin: innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, info.reservedID, opts) if err != nil { if transactionID != 0 { return info.updateTransactionID(transactionID, alias), err } - retry := checkAndResetShardSession(info, err, session) - switch retry { - case newQS: - // Current tablet is not available, try querying new tablet using gateway. - qs = rs.Gateway - fallthrough - case shard: - // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it + retryConnection(func() { + // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserveBegin innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) - } + }, func() *shardActionInfo { + return info.updateTransactionAndReservedID(transactionID, reservedID, alias) + }) if err != nil { - return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err + return info, err } - } case reserve: - innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, info.transactionID, opts) + innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) if err != nil { return info.updateReservedID(reservedID, alias), err } From 761415539977da63e12136a15c26c95d983f688d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 20 Apr 2021 15:11:32 +0530 Subject: [PATCH 18/21] more refactor Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn.go | 71 ++++++++++-------------------------- 1 file changed, 20 insertions(+), 51 deletions(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 197a4f9c540..ea8e27b26c9 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -214,8 +214,8 @@ func (stc *ScatterConn) ExecuteMultiShard( if autocommit { // As this is auto-commit, the transactionID is supposed to be zero. - if info.transactionID != int64(0) { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", info.transactionID) + if transactionID != int64(0) { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", transactionID) } } @@ -240,7 +240,7 @@ func (stc *ScatterConn) ExecuteMultiShard( qs = rs.Gateway } - retryConnection := func(resetTabletConnAndExec func(), failUpdate func() *shardActionInfo) { + retryRequest := func(exec func()) { retry := checkAndResetShardSession(info, err, session) switch retry { case newQS: @@ -250,11 +250,7 @@ func (stc *ScatterConn) ExecuteMultiShard( case shard: // if we need to reset a reserved connection, here is our chance to try executing again, // against a new connection - resetTabletConnAndExec() - } - // err will have been changed by the call above - if err != nil { - info = failUpdate() + exec() } } @@ -262,49 +258,38 @@ func (stc *ScatterConn) ExecuteMultiShard( case nothing: innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { - retryConnection(func() { + retryRequest(func() { // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserve innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) - }, func() *shardActionInfo { - // we failed, let's clear out any lingering reserve ID - return info.updateReservedID(reservedID, alias) }) - if err != nil { - return info, err - } } - case begin: - innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, info.reservedID, opts) + innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, reservedID, opts) if err != nil { if transactionID != 0 { - return info.updateTransactionID(transactionID, alias), err + // if we had an open transaction, we can't repair anything and have to exit here. + // we still keep the transaction open - an error doesn't immediately close the transaction + break } - retryConnection(func() { + retryRequest(func() { // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserveBegin innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) - }, func() *shardActionInfo { - return info.updateTransactionAndReservedID(transactionID, reservedID, alias) }) - if err != nil { - return info, err - } } case reserve: - innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) - if err != nil { - return info.updateReservedID(reservedID, alias), err - } + innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts) case reserveBegin: innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) - if err != nil { - return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err - } default: return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded) } + // We need to new shard info irrespective of the error. + newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias) + if err != nil { + return newInfo, err + } mu.Lock() defer mu.Unlock() @@ -312,7 +297,7 @@ func (stc *ScatterConn) ExecuteMultiShard( if ignoreMaxMemoryRows || len(qr.Rows) <= *maxMemoryRows { qr.AppendResult(innerqr) } - return info.updateTransactionAndReservedID(transactionID, reservedID, alias), nil + return newInfo, nil }, ) @@ -334,7 +319,7 @@ func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSe } } if retry != none { - session.ResetShard(info.alias) + _ = session.ResetShard(info.alias) } return retry } @@ -796,25 +781,9 @@ type shardActionInfo struct { alias *topodatapb.TabletAlias } -func (sai *shardActionInfo) updateTransactionID(txID int64, alias *topodatapb.TabletAlias) *shardActionInfo { - if txID == 0 { - // As transaction id is ZERO, there is nothing to update in session shard sessions. - return nil - } - return sai.updateTransactionAndReservedID(txID, sai.reservedID, alias) -} - -func (sai *shardActionInfo) updateReservedID(rID int64, alias *topodatapb.TabletAlias) *shardActionInfo { - if rID == 0 { - // As reserved id is ZERO, there is nothing to update in session shard sessions. - return nil - } - return sai.updateTransactionAndReservedID(sai.transactionID, rID, alias) -} - func (sai *shardActionInfo) updateTransactionAndReservedID(txID int64, rID int64, alias *topodatapb.TabletAlias) *shardActionInfo { - if txID == 0 && rID == 0 { - // As transaction id and reserved id is ZERO, there is nothing to update in session shard sessions. + if txID == sai.transactionID && rID == sai.reservedID { + // As transaction id and reserved id have not changed, there is nothing to update in session shard sessions. return nil } newInfo := *sai From 992ed58430375c1a0d6c41f114539f29e21dc749 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 21 Apr 2021 19:54:40 +0200 Subject: [PATCH 19/21] addressed review comments Signed-off-by: Andres Taylor --- .../reservedconn/reconnect1/main_test.go | 2 +- .../reservedconn/reconnect2/main_test.go | 2 +- go/vt/discovery/healthcheck.go | 99 +++++++++---------- go/vt/vterrors/constants.go | 4 +- 4 files changed, 53 insertions(+), 54 deletions(-) diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go index 001b47ae78f..cd0d4f29665 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Vitess Authors. +Copyright 2021 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go index c2427a84f66..627f203647e 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Vitess Authors. +Copyright 2021 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 9db28a1494e..90a64a788aa 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -50,14 +50,13 @@ import ( "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/queryservice" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - topoprotopb "vitess.io/vitess/go/vt/topo/topoproto" ) var ( @@ -72,7 +71,7 @@ var ( //TODO(deepthi): change these vars back to unexported when discoveryGateway is removed // AllowedTabletTypes is the list of allowed tablet types. e.g. {MASTER, REPLICA} - AllowedTabletTypes []topodatapb.TabletType + AllowedTabletTypes []topodata.TabletType // TabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets TabletFilters flagutil.StringListValue // KeyspacesToWatch - if provided this specifies which keyspaces should be @@ -144,7 +143,7 @@ func init() { // Flags are not parsed at this point and the default value of the flag (just the hostname) will be used. ParseTabletURLTemplateFromFlag() flag.Var(&TabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch") - topoprotopb.TabletTypeListVar(&AllowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to") + topoproto.TabletTypeListVar(&AllowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to") flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema") } @@ -152,11 +151,11 @@ func init() { // It is separated out to enable unit testing. type TabletRecorder interface { // AddTablet adds the tablet. - AddTablet(tablet *topodatapb.Tablet) + AddTablet(tablet *topodata.Tablet) // RemoveTablet removes the tablet. - RemoveTablet(tablet *topodatapb.Tablet) + RemoveTablet(tablet *topodata.Tablet) // ReplaceTablet does an AddTablet and RemoveTablet in one call, effectively replacing the old tablet with the new. - ReplaceTablet(old, new *topodatapb.Tablet) + ReplaceTablet(old, new *topodata.Tablet) } type keyspaceShardTabletType string @@ -174,10 +173,10 @@ type HealthCheck interface { // each given target before returning. // It will return ctx.Err() if the context is canceled. // It will return an error if it can't read the necessary topology records. - WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error + WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error // TabletConnection returns the TabletConn of the given tablet. - TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) + TabletConnection(alias *topodata.TabletAlias, target *query.Target) (queryservice.QueryService, error) // RegisterStats registers the connection counts stats RegisterStats() @@ -188,7 +187,7 @@ type HealthCheck interface { // the most recent tablet of type master. // This returns a copy of the data so that callers can access without // synchronization - GetHealthyTabletStats(target *querypb.Target) []*TabletHealth + GetHealthyTabletStats(target *query.Target) []*TabletHealth // Subscribe adds a listener. Used by vtgate buffer to learn about master changes. Subscribe() chan *TabletHealth @@ -312,7 +311,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur // AddTablet adds the tablet, and starts health check. // It does not block on making connection. // name is an optional tag for the tablet, e.g. an alternative address. -func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet) { +func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { // check whether grpc port is present on tablet, if not return if tablet.PortMap["grpc"] == 0 { return @@ -326,7 +325,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet) { return } ctx, cancelFunc := context.WithCancel(context.Background()) - target := &querypb.Target{ + target := &query.Target{ Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type, @@ -340,7 +339,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet) { // add to our datastore key := hc.keyFromTarget(target) - tabletAlias := topoprotopb.TabletAliasString(tablet.Alias) + tabletAlias := topoproto.TabletAliasString(tablet.Alias) if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok { // We should not add a tablet that we already have log.Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias) @@ -363,23 +362,23 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet) { // RemoveTablet removes the tablet, and stops the health check. // It does not block. -func (hc *HealthCheckImpl) RemoveTablet(tablet *topodatapb.Tablet) { +func (hc *HealthCheckImpl) RemoveTablet(tablet *topodata.Tablet) { hc.deleteTablet(tablet) } // ReplaceTablet removes the old tablet and adds the new tablet. -func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodatapb.Tablet) { +func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) { hc.RemoveTablet(old) hc.AddTablet(new) } -func (hc *HealthCheckImpl) deleteTablet(tablet *topodatapb.Tablet) { +func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { log.Infof("Removing tablet from healthcheck: %v", tablet) hc.mu.Lock() defer hc.mu.Unlock() key := hc.keyFromTablet(tablet) - tabletAlias := tabletAliasString(topoprotopb.TabletAliasString(tablet.Alias)) + tabletAlias := tabletAliasString(topoproto.TabletAliasString(tablet.Alias)) // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] if !ok { @@ -404,17 +403,17 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodatapb.Tablet) { } } -func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *querypb.Target, trivialUpdate bool, isPrimaryUp bool) { +func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Target, trivialUpdate bool, isPrimaryUp bool) { // hc.healthByAlias is authoritative, it should be updated hc.mu.Lock() defer hc.mu.Unlock() - tabletAlias := tabletAliasString(topoprotopb.TabletAliasString(th.Tablet.Alias)) + tabletAlias := tabletAliasString(topoproto.TabletAliasString(th.Tablet.Alias)) targetKey := hc.keyFromTarget(th.Target) targetChanged := prevTarget.TabletType != th.Target.TabletType || prevTarget.Keyspace != th.Target.Keyspace || prevTarget.Shard != th.Target.Shard if targetChanged { // Error counter has to be set here in case we get a new tablet type for the first time in a stream response - hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoprotopb.TabletTypeLString(th.Target.TabletType)}, 0) + hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoproto.TabletTypeLString(th.Target.TabletType)}, 0) // keyspace and shard are not expected to change, but just in case ... // move this tabletHealthCheck to the correct map oldTargetKey := hc.keyFromTarget(prevTarget) @@ -427,7 +426,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *querypb.Ta // add it to the map by target hc.healthData[targetKey][tabletAlias] = th - isPrimary := th.Target.TabletType == topodatapb.TabletType_MASTER + isPrimary := th.Target.TabletType == topodata.TabletType_MASTER switch { case isPrimary && isPrimaryUp: if len(hc.healthy[targetKey]) == 0 { @@ -437,9 +436,9 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *querypb.Ta // need to replace it. if th.MasterTermStartTime < hc.healthy[targetKey][0].MasterTermStartTime { log.Warningf("not marking healthy master %s as Up for %s because its MasterTermStartTime is smaller than the highest known timestamp from previous MASTERs %s: %d < %d ", - topoprotopb.TabletAliasString(th.Tablet.Alias), - topoprotopb.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard), - topoprotopb.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias), + topoproto.TabletAliasString(th.Tablet.Alias), + topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard), + topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias), th.MasterTermStartTime, hc.healthy[targetKey][0].MasterTermStartTime) } else { @@ -456,18 +455,18 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *querypb.Ta // We re-sort the healthy tablet list whenever we get a health update for tablets we can route to. // Tablets from other cells for non-master targets should not trigger a re-sort; // they should also be excluded from healthy list. - if th.Target.TabletType != topodatapb.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { + if th.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { hc.recomputeHealthy(targetKey) } - if targetChanged && prevTarget.TabletType != topodatapb.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list + if targetChanged && prevTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list oldTargetKey := hc.keyFromTarget(prevTarget) hc.recomputeHealthy(oldTargetKey) } } - isNewPrimary := isPrimary && prevTarget.TabletType != topodatapb.TabletType_MASTER + isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_MASTER if isNewPrimary { - log.Errorf("Adding 1 to MasterPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoprotopb.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) + log.Errorf("Adding 1 to MasterPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) hcMasterPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1) } @@ -579,7 +578,7 @@ func (hc *HealthCheckImpl) Close() error { // the most recent tablet of type master. // This returns a copy of the data so that callers can access without // synchronization -func (hc *HealthCheckImpl) GetHealthyTabletStats(target *querypb.Target) []*TabletHealth { +func (hc *HealthCheckImpl) GetHealthyTabletStats(target *query.Target) []*TabletHealth { var result []*TabletHealth hc.mu.Lock() defer hc.mu.Unlock() @@ -593,7 +592,7 @@ func (hc *HealthCheckImpl) GetHealthyTabletStats(target *querypb.Target) []*Tabl // The returned array is owned by the caller. // For TabletType_MASTER, this will only return at most one entry, // the most recent tablet of type master. -func (hc *HealthCheckImpl) getTabletStats(target *querypb.Target) []*TabletHealth { +func (hc *HealthCheckImpl) getTabletStats(target *query.Target) []*TabletHealth { var result []*TabletHealth hc.mu.Lock() defer hc.mu.Unlock() @@ -607,8 +606,8 @@ func (hc *HealthCheckImpl) getTabletStats(target *querypb.Target) []*TabletHealt // WaitForTablets waits for at least one tablet in the given // keyspace / shard / tablet type before returning. The tablets do not // have to be healthy. It will return ctx.Err() if the context is canceled. -func (hc *HealthCheckImpl) WaitForTablets(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType) error { - targets := []*querypb.Target{ +func (hc *HealthCheckImpl) WaitForTablets(ctx context.Context, keyspace, shard string, tabletType topodata.TabletType) error { + targets := []*query.Target{ { Keyspace: keyspace, Shard: shard, @@ -622,13 +621,13 @@ func (hc *HealthCheckImpl) WaitForTablets(ctx context.Context, keyspace, shard s // each given target before returning. // It will return ctx.Err() if the context is canceled. // It will return an error if it can't read the necessary topology records. -func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error { +func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error { return hc.waitForTablets(ctx, targets, true) } // FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces -func FilterTargetsByKeyspaces(keyspaces []string, targets []*querypb.Target) []*querypb.Target { - filteredTargets := make([]*querypb.Target, 0) +func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target { + filteredTargets := make([]*query.Target, 0) // Keep them all if there are no keyspaces to watch if len(KeyspacesToWatch) == 0 { @@ -647,7 +646,7 @@ func FilterTargetsByKeyspaces(keyspaces []string, targets []*querypb.Target) []* } // waitForTablets is the internal method that polls for tablets. -func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*querypb.Target, requireServing bool) error { +func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error { targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets) for { @@ -693,31 +692,31 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*queryp } // TabletConnection returns the Connection to a given tablet. -func (hc *HealthCheckImpl) TabletConnection(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { +func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target *query.Target) (queryservice.QueryService, error) { hc.mu.Lock() - thc := hc.healthByAlias[tabletAliasString(topoprotopb.TabletAliasString(alias))] + thc := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))] hc.mu.Unlock() if thc == nil || thc.Conn == nil { //TODO: test that throws this error - return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) + return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } if !thc.Serving { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing) + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, vterrors.NotServing) } if target != nil && !proto.Equal(thc.Target, target) { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, thc.Target, target) + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, thc.Target, target) } return thc.Connection(), nil } // Target includes cell which we ignore here // because tabletStatsCache is intended to be per-cell -func (hc *HealthCheckImpl) keyFromTarget(target *querypb.Target) keyspaceShardTabletType { - return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", target.Keyspace, target.Shard, topoprotopb.TabletTypeLString(target.TabletType))) +func (hc *HealthCheckImpl) keyFromTarget(target *query.Target) keyspaceShardTabletType { + return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType))) } -func (hc *HealthCheckImpl) keyFromTablet(tablet *topodatapb.Tablet) keyspaceShardTabletType { - return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoprotopb.TabletTypeLString(tablet.Type))) +func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardTabletType { + return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type))) } // getAliasByCell should only be called while holding hc.mu @@ -734,8 +733,8 @@ func (hc *HealthCheckImpl) getAliasByCell(cell string) string { return alias } -func (hc *HealthCheckImpl) isIncluded(tabletType topodatapb.TabletType, tabletAlias *topodatapb.TabletAlias) bool { - if tabletType == topodatapb.TabletType_MASTER { +func (hc *HealthCheckImpl) isIncluded(tabletType topodata.TabletType, tabletAlias *topodata.TabletAlias) bool { + if tabletType == topodata.TabletType_MASTER { return true } if tabletAlias.Cell == hc.cell { diff --git a/go/vt/vterrors/constants.go b/go/vt/vterrors/constants.go index f602b9f0054..63df8d02727 100644 --- a/go/vt/vterrors/constants.go +++ b/go/vt/vterrors/constants.go @@ -28,7 +28,7 @@ const ( var RxOp = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)") // WrongTablet for invalid tablet type error -const WrongTablet = "invalid tablet type" +const WrongTablet = "wrong tablet type" // RxWrongTablet regex for invalid tablet type error -var RxWrongTablet = regexp.MustCompile("invalid tablet type") +var RxWrongTablet = regexp.MustCompile(WrongTablet) From 987b0ed2473caccc8c21376666b13bb427112850 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 22 Apr 2021 12:20:16 +0530 Subject: [PATCH 20/21] check for invalid and wrong keyworkd in regex Signed-off-by: Harshit Gangal --- go/vt/vterrors/constants.go | 2 +- go/vt/vttablet/tabletserver/state_manager.go | 29 +++++-------------- .../tabletserver/state_manager_test.go | 4 +-- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/go/vt/vterrors/constants.go b/go/vt/vterrors/constants.go index 63df8d02727..d66a97464d7 100644 --- a/go/vt/vterrors/constants.go +++ b/go/vt/vterrors/constants.go @@ -31,4 +31,4 @@ var RxOp = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTT const WrongTablet = "wrong tablet type" // RxWrongTablet regex for invalid tablet type error -var RxWrongTablet = regexp.MustCompile(WrongTablet) +var RxWrongTablet = regexp.MustCompile("(wrong|invalid) tablet type") diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 87433ebb858..39cab911fd2 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -357,27 +357,10 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.ShuttingDown) } - if target != nil { - switch { - case target.Keyspace != sm.target.Keyspace: - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid keyspace %v does not match expected %v", target.Keyspace, sm.target.Keyspace) - case target.Shard != sm.target.Shard: - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid shard %v does not match expected %v", target.Shard, sm.target.Shard) - case target.TabletType != sm.target.TabletType: - for _, otherType := range sm.alsoAllow { - if target.TabletType == otherType { - goto ok - } - } - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: %v, want: %v or %v", vterrors.WrongTablet, target.TabletType, sm.target.TabletType, sm.alsoAllow) - } - } else { - if !tabletenv.IsLocalContext(ctx) { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") - } + err = sm.verifyTargetLocked(ctx, target) + if err != nil { + return err } - -ok: sm.requests.Add(1) return nil } @@ -392,6 +375,10 @@ func (sm *stateManager) EndRequest() { func (sm *stateManager) VerifyTarget(ctx context.Context, target *querypb.Target) error { sm.mu.Lock() defer sm.mu.Unlock() + return sm.verifyTargetLocked(ctx, target) +} + +func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb.Target) error { if target != nil { switch { case target.Keyspace != sm.target.Keyspace: @@ -404,7 +391,7 @@ func (sm *stateManager) VerifyTarget(ctx context.Context, target *querypb.Target return nil } } - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid tablet type: %v, want: %v or %v", target.TabletType, sm.target.TabletType, sm.alsoAllow) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: %v, want: %v or %v", vterrors.WrongTablet, sm.target.TabletType, sm.target.TabletType, sm.alsoAllow) } } else { if !tabletenv.IsLocalContext(ctx) { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 577dbc8f6ad..53396e052ba 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -531,9 +531,9 @@ func TestStateManagerValidations(t *testing.T) { target.Shard = "" target.TabletType = topodatapb.TabletType_REPLICA err = sm.StartRequest(ctx, target, false) - assert.Contains(t, err.Error(), "invalid tablet type") + assert.Contains(t, err.Error(), "wrong tablet type") err = sm.VerifyTarget(ctx, target) - assert.Contains(t, err.Error(), "invalid tablet type") + assert.Contains(t, err.Error(), "wrong tablet type") sm.alsoAllow = []topodatapb.TabletType{topodatapb.TabletType_REPLICA} err = sm.StartRequest(ctx, target, false) From a225828978140674132913d8e407dd4255740634 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 22 Apr 2021 13:07:38 +0530 Subject: [PATCH 21/21] making test more robust Signed-off-by: Harshit Gangal --- .../vtgate/reservedconn/reconnect1/main_test.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go index cd0d4f29665..598d23345fe 100644 --- a/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go +++ b/go/test/endtoend/vtgate/reservedconn/reconnect1/main_test.go @@ -107,16 +107,15 @@ func TestServingChange(t *testing.T) { checkedExec(t, conn, "use @rdonly") checkedExec(t, conn, "set sql_mode = ''") - rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() - - // to see/make the new rdonly available - err = clusterInstance.VtctlclientProcess.ExecuteCommand("Ping", rdonlyTablet.Alias) - require.NoError(t, err) - - // this will create reserved connection on rdonly on -80 and 80- shards. - checkedExec(t, conn, "select * from test") + // to see rdonly is available and + // also this will create reserved connection on rdonly on -80 and 80- shards. + _, err = exec(t, conn, "select * from test") + for err != nil { + _, err = exec(t, conn, "select * from test") + } // changing rdonly tablet to spare (non serving). + rdonlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonlyTablet.Alias, "spare") require.NoError(t, err)