From f2c350cc7b75437f74c97338ea3a6cbbae66059d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 31 Jul 2020 23:43:38 +0530 Subject: [PATCH 1/7] remove single shard session fom session shardSessions Signed-off-by: Harshit Gangal --- go/vt/vtgate/safe_session.go | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index 0d6aeae9909..a4de83364ee 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -360,3 +360,51 @@ func (session *SafeSession) SetPreQueries() []string { } return result } + +//ResetShard reset the shard session for the provided tablet alias. +func (session *SafeSession) ResetShard(tabletAlias *topodatapb.TabletAlias) error { + session.mu.Lock() + defer session.mu.Unlock() + + // Always append, in order for rollback to succeed. + switch session.commitOrder { + case vtgatepb.CommitOrder_NORMAL: + newSessions, err := removeShard(tabletAlias, session.ShardSessions) + if err != nil { + return err + } + session.ShardSessions = newSessions + case vtgatepb.CommitOrder_PRE: + newSessions, err := removeShard(tabletAlias, session.PreSessions) + if err != nil { + return err + } + session.PreSessions = newSessions + case vtgatepb.CommitOrder_POST: + newSessions, err := removeShard(tabletAlias, session.PostSessions) + if err != nil { + return err + } + session.PostSessions = newSessions + default: + // Should be unreachable + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.ResetShard: unexpected commitOrder") + } + return nil +} + +func removeShard(tabletAlias *topodatapb.TabletAlias, sessions []*vtgatepb.Session_ShardSession) ([]*vtgatepb.Session_ShardSession, error) { + idx := -1 + for i, session := range sessions { + if proto.Equal(session.TabletAlias, tabletAlias) { + if session.TransactionId != 0 { + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.ResetShard: in transaction") + } + idx = i + } + } + if idx == -1 { + return sessions, nil + } + return append(sessions[:idx], sessions[idx+1:]...), nil +} From 945eb59528c980ceb5a79631e93390a252e175dd Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 31 Jul 2020 23:46:03 +0530 Subject: [PATCH 2/7] reserved non transaction - removes the reserved shard session on conn failure Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 343e73122da..1d8ec9657ef 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/mysql" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -188,36 +190,30 @@ func (stc *ScatterConn) ExecuteMultiShard( } } + qs, err := getQueryService(rs, info) + if err != nil { + return nil, err + } + switch info.actionNeeded { case nothing: - qs, err := getQueryService(rs, info) - if err != nil { - return nil, err - } innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { + resetShardSession(info, err, session) return nil, err } case begin: - qs, err := getQueryService(rs, info) - if err != nil { - return nil, err - } innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, info.reservedID, opts) if err != nil { return info.updateTransactionID(transactionID, alias), err } case reserve: - qs, err := getQueryService(rs, info) - if err != nil { - return nil, err - } innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, info.transactionID, opts) if err != nil { return info.updateReservedID(reservedID, alias), err } case reserveBegin: - innerqr, transactionID, reservedID, alias, err = rs.Gateway.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) + innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) if err != nil { return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err } @@ -242,6 +238,15 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } +func resetShardSession(info *shardActionInfo, err error, session *SafeSession) { + if info.reservedID != 0 && info.transactionID == 0 { + sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost { + session.ResetShard(info.alias) + } + } +} + func getQueryService(rs *srvtopo.ResolvedShard, info *shardActionInfo) (queryservice.QueryService, error) { _, usingLegacyGw := rs.Gateway.(*DiscoveryGateway) if usingLegacyGw { From 6f67c0b7b685d26af595fbf7fcdbc388a015b732 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 31 Jul 2020 23:49:17 +0530 Subject: [PATCH 3/7] added endtoend to mock reserved connection closing Signed-off-by: Harshit Gangal --- .../vtgate/setstatement/sysvar_test.go | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index afc0f7e7e88..92596ba2728 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -211,6 +211,41 @@ func TestStartTxAndSetSystemVariableAndThenSuccessfulCommit(t *testing.T) { assertMatches(t, conn, "select @@sql_safe_updates", "[[INT64(1)]]") } +func TestSetSystemVarWithConnError(t *testing.T) { + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + checkedExec(t, conn, "delete from test") + checkedExec(t, conn, "insert into test (id, val1) values (1, null), (4, null)") + + checkedExec(t, conn, "set sql_safe_updates = 1") // this should force us into a reserved connection + assertMatches(t, conn, "select id from test order by id", "[[INT64(1)] [INT64(4)]]") + qr := checkedExec(t, conn, "select connection_id() from test where id = 1") + + // kill the mysql connection shard which has transaction open. + vttablet1 := clusterInstance.Keyspaces[0].Shards[0].MasterTablet() // -80 + _, err = vttablet1.VttabletProcess.QueryTablet(fmt.Sprintf("kill %s", qr.Rows[0][0].ToString()), keyspaceName, false) + require.NoError(t, err) + + // first query to 80- shard should pass + assertMatches(t, conn, "select id, val1 from test where id = 4", "[[INT64(4) NULL]]") + + // first query to -80 shard will fail + _, err = exec(t, conn, "insert into test (id, val1) values (2, null)") + require.Error(t, err) + + // subsequent queries on -80 will pass + assertMatches(t, conn, "select id from test where id = 2", "[]") + assertMatches(t, conn, "insert into test (id, val1) values (2, null)", "[]") + assertMatches(t, conn, "select id from test where id = 2", "[[INT64(2)]]") +} + func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { t.Helper() qr, err := exec(t, conn, query) From db38174ef93b9dc6c15b03eed1db03de2012311f Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sat, 1 Aug 2020 01:29:11 +0530 Subject: [PATCH 4/7] use new healthcheck gateway in tx_conn test for reserved connection Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn.go | 3 ++- go/vt/vtgate/tx_conn_test.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 1d8ec9657ef..3435b27daa8 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -175,6 +175,7 @@ func (stc *ScatterConn) ExecuteMultiShard( err error opts *querypb.ExecuteOptions alias *topodatapb.TabletAlias + qs queryservice.QueryService ) transactionID := info.transactionID reservedID := info.reservedID @@ -190,7 +191,7 @@ func (stc *ScatterConn) ExecuteMultiShard( } } - qs, err := getQueryService(rs, info) + qs, err = getQueryService(rs, info) if err != nil { return nil, err } diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index f1edd17aa73..1e0fefdebb2 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -112,7 +112,7 @@ func TestTxConnCommitSuccess(t *testing.T) { } func TestTxConnReservedCommitSuccess(t *testing.T) { - sc, sbc0, sbc1, rss0, _, rss01 := newLegacyTestTxConnEnv(t, "TestTxConn") + sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TestTxConn") sc.txConn.mode = vtgatepb.TransactionMode_MULTI // Sequence the executes to ensure commit order @@ -154,7 +154,7 @@ func TestTxConnReservedCommitSuccess(t *testing.T) { }, TransactionId: 1, ReservedId: 1, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbc1.Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") @@ -178,7 +178,7 @@ func TestTxConnReservedCommitSuccess(t *testing.T) { TabletType: topodatapb.TabletType_MASTER, }, ReservedId: 2, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbc1.Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") @@ -574,7 +574,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { } func TestTxConnReservedCommitOrderSuccess(t *testing.T) { - sc, sbc0, sbc1, rss0, rss1, _ := newLegacyTestTxConnEnv(t, "TestTxConn") + sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, "TestTxConn") sc.txConn.mode = vtgatepb.TransactionMode_MULTI queries := []*querypb.BoundQuery{{ @@ -661,7 +661,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { }, TransactionId: 1, ReservedId: 1, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbc1.Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") @@ -699,7 +699,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { TabletType: topodatapb.TabletType_MASTER, }, ReservedId: 2, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbc1.Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") @@ -857,7 +857,7 @@ func TestTxConnRollback(t *testing.T) { } func TestTxConnReservedRollback(t *testing.T) { - sc, sbc0, sbc1, rss0, _, rss01 := newLegacyTestTxConnEnv(t, "TxConnReservedRollback") + sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TxConnReservedRollback") session := NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) sc.ExecuteMultiShard(ctx, rss0, queries, session, false, false) @@ -892,19 +892,19 @@ func TestTxConnReservedRollback(t *testing.T) { } func TestTxConnReservedRollbackFailure(t *testing.T) { - sc, sbc0, sbc1, rss0, _, rss01 := newLegacyTestTxConnEnv(t, "TxConnReservedRollback") + sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TxConnReservedRollback") session := NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) sc.ExecuteMultiShard(ctx, rss0, queries, session, false, false) sc.ExecuteMultiShard(ctx, rss01, twoQueries, session, false, false) sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - require.Error(t, + assert.Error(t, sc.txConn.Rollback(ctx, session)) wantSession := vtgatepb.Session{ InReservedConn: true, Warnings: []*querypb.QueryWarning{{ - Message: "rollback encountered an error and connection to all shard for this session is released: Code: INVALID_ARGUMENT\nINVALID_ARGUMENT error\n\ntarget: TxConnReservedRollback.1.master, used tablet: aa-0 (1)", + Message: "rollback encountered an error and connection to all shard for this session is released: Code: INVALID_ARGUMENT\nINVALID_ARGUMENT error\n", }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") From 11da205cf5588e7bbbaeffa6ccf1c5c8d6eb56d2 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sat, 1 Aug 2020 03:26:23 +0530 Subject: [PATCH 5/7] added additional e2e test for reserved conn with partial shard with tx Signed-off-by: Harshit Gangal --- .../vtgate/setstatement/sysvar_test.go | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 92596ba2728..0eccc545ec6 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -211,7 +211,7 @@ func TestStartTxAndSetSystemVariableAndThenSuccessfulCommit(t *testing.T) { assertMatches(t, conn, "select @@sql_safe_updates", "[[INT64(1)]]") } -func TestSetSystemVarWithConnError(t *testing.T) { +func TestSetSystemVarAutocommitWithConnError(t *testing.T) { vtParams := mysql.ConnParams{ Host: "localhost", Port: clusterInstance.VtgateMySQLPort, @@ -243,7 +243,43 @@ func TestSetSystemVarWithConnError(t *testing.T) { // subsequent queries on -80 will pass assertMatches(t, conn, "select id from test where id = 2", "[]") assertMatches(t, conn, "insert into test (id, val1) values (2, null)", "[]") - assertMatches(t, conn, "select id from test where id = 2", "[[INT64(2)]]") + assertMatches(t, conn, "select id, @@sql_safe_updates from test where id = 2", "[[INT64(2) INT64(1)]]") +} + +func TestSetSystemVarInTxWithConnError(t *testing.T) { + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + checkedExec(t, conn, "delete from test") + checkedExec(t, conn, "insert into test (id, val1) values (1, null), (4, null)") + + checkedExec(t, conn, "set sql_safe_updates = 1") // this should force us into a reserved connection + qr := checkedExec(t, conn, "select connection_id() from test where id = 4") + checkedExec(t, conn, "begin") + checkedExec(t, conn, "insert into test (id, val1) values (2, null)") + + // kill the mysql connection shard which has transaction open. + vttablet1 := clusterInstance.Keyspaces[0].Shards[1].MasterTablet() // 80- + _, err = vttablet1.VttabletProcess.QueryTablet(fmt.Sprintf("kill %s", qr.Rows[0][0].ToString()), keyspaceName, false) + require.NoError(t, err) + + // query to -80 shard should pass and remain in transaction. + assertMatches(t, conn, "select id, val1 from test where id = 2", "[[INT64(2) NULL]]") + checkedExec(t, conn, "rollback") + assertMatches(t, conn, "select id, val1 from test where id = 2", "[]") + + // first query to 80- shard will fail + _, err = exec(t, conn, "select @@sql_safe_updates from test where id = 4") + require.Error(t, err) + + // subsequent queries on 80- will pass + assertMatches(t, conn, "select id, @@sql_safe_updates from test where id = 4", "[[INT64(4) INT64(1)]]") } func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { From f8bf23f0713b4f5895e19a1fe3b881cf56bed07b Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sat, 1 Aug 2020 03:52:29 +0530 Subject: [PATCH 6/7] added unit test on reserved conn failure Signed-off-by: Harshit Gangal --- go/vt/vtgate/legacy_scatter_conn_test.go | 7 ++++++- go/vt/vtgate/scatter_conn_test.go | 19 +++++++++++++++++++ go/vt/vttablet/sandboxconn/sandboxconn.go | 5 +++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 09bcfbfb963..f5bc4a4f2af 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -358,6 +358,11 @@ func TestLegaceHealthCheckFailsOnReservedConnections(t *testing.T) { } func executeOnShards(t *testing.T, res *srvtopo.Resolver, keyspace string, sc *ScatterConn, session *SafeSession, destinations []key.Destination) { + t.Helper() + require.Empty(t, executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)) +} + +func executeOnShardsReturnsErr(t *testing.T, res *srvtopo.Resolver, keyspace string, sc *ScatterConn, session *SafeSession, destinations []key.Destination) error { t.Helper() rss, _, err := res.ResolveDestinations(ctx, keyspace, topodatapb.TabletType_REPLICA, nil, destinations) require.NoError(t, err) @@ -372,7 +377,7 @@ func executeOnShards(t *testing.T, res *srvtopo.Resolver, keyspace string, sc *S } _, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false, false) - require.Empty(t, errs) + return vterrors.Aggregate(errs) } func TestMultiExecs(t *testing.T) { diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 0f0445d105a..6e88aa5db7d 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/test/utils" @@ -280,3 +281,21 @@ func TestReservedBeginTableDriven(t *testing.T) { }) } } + +func TestReservedConnFail(t *testing.T) { + keyspace := "keyspace" + createSandbox(keyspace) + hc := discovery.NewFakeHealthCheck() + sc := newTestScatterConn(hc, new(sandboxTopo), "aa") + sbc0 := hc.AddTestTablet("aa", "0", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + _ = hc.AddTestTablet("aa", "1", 1, keyspace, "1", topodatapb.TabletType_REPLICA, true, 1, nil) + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + + session := NewSafeSession(&vtgatepb.Session{InTransaction: false, InReservedConn: true}) + destinations := []key.Destination{key.DestinationShard("0")} + executeOnShards(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 1, len(session.ShardSessions)) + sbc0.ShardErr = mysql.NewSQLError(mysql.CRServerGone, mysql.SSUnknownSQLState, "lost connection") + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Zero(t, len(session.ShardSessions)) +} diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 2437e782553..7e58e839228 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -104,6 +104,8 @@ type SandboxConn struct { sExecMu sync.Mutex execMu sync.Mutex + + ShardErr error } var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check @@ -125,6 +127,9 @@ func (sbc *SandboxConn) getError() error { sbc.MustFailCodes[code] = count - 1 return vterrors.New(code, fmt.Sprintf("%v error", code)) } + if sbc.ShardErr != nil { + return sbc.ShardErr + } return nil } From b466a323850dcd898f46621403273f26a2ab5e8f Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 13 Aug 2020 22:07:03 +0530 Subject: [PATCH 7/7] addressed review comments Signed-off-by: Harshit Gangal --- go/vt/vtgate/scatter_conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 3435b27daa8..342f80778f5 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -200,7 +200,7 @@ func (stc *ScatterConn) ExecuteMultiShard( case nothing: innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { - resetShardSession(info, err, session) + checkAndResetShardSession(info, err, session) return nil, err } case begin: @@ -239,7 +239,7 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, allErrors.GetErrors() } -func resetShardSession(info *shardActionInfo, err error, session *SafeSession) { +func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSession) { if info.reservedID != 0 && info.transactionID == 0 { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) if sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost {