diff --git a/go/test/endtoend/vtgate/unsharded/main_test.go b/go/test/endtoend/vtgate/unsharded/main_test.go index f5c3ebce188..4dc3fd70287 100644 --- a/go/test/endtoend/vtgate/unsharded/main_test.go +++ b/go/test/endtoend/vtgate/unsharded/main_test.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -113,6 +114,7 @@ func TestMain(m *testing.M) { SchemaSQL: SchemaSQL, VSchema: VSchema, } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "3"} if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil { return 1 } @@ -215,6 +217,29 @@ func TestDDLUnsharded(t *testing.T) { assertMatches(t, conn, "show tables", `[[VARCHAR("allDefaults")] [VARCHAR("t1")]]`) } +func TestReservedConnDML(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + exec(t, conn, `set default_week_format = 1`) + exec(t, conn, `begin`) + exec(t, conn, `insert into allDefaults () values ()`) + exec(t, conn, `commit`) + + time.Sleep(6 * time.Second) + + exec(t, conn, `begin`) + exec(t, conn, `insert into allDefaults () values ()`) + exec(t, conn, `commit`) +} + func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 0902738fefc..1219fd4c96f 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/utils" "github.com/stretchr/testify/require" @@ -1737,3 +1739,55 @@ func TestDeleteLookupOwnedEqual(t *testing.T) { utils.MustMatch(t, sbc1.Queries, sbc1wantQueries, "") utils.MustMatch(t, sbc2.Queries, sbc2wantQueries, "") } + +func TestReservedConnDML(t *testing.T) { + executor, _, _, sbc := createExecutorEnv() + + logChan := QueryLogger.Subscribe("TestReservedConnDML") + defer QueryLogger.Unsubscribe(logChan) + + ctx := context.Background() + *sysVarSetEnabled = true + session := NewAutocommitSession(&vtgatepb.Session{}) + + _, err := executor.Execute(ctx, "TestReservedConnDML", session, "use "+KsTestUnsharded, nil) + require.NoError(t, err) + + wantQueries := []*querypb.BoundQuery{ + {Sql: "select 1 from dual where @@default_week_format != 1", BindVariables: map[string]*querypb.BindVariable{}}, + } + sbc.SetResults([]*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1"), + }) + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "set default_week_format = 1", nil) + require.NoError(t, err) + utils.MustMatch(t, wantQueries, sbc.Queries) + + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "begin", nil) + require.NoError(t, err) + + wantQueries = append(wantQueries, + &querypb.BoundQuery{Sql: "set @@default_week_format = 1", BindVariables: map[string]*querypb.BindVariable{}}, + &querypb.BoundQuery{Sql: "insert into simple values ()", BindVariables: map[string]*querypb.BindVariable{}}) + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "insert into simple() values ()", nil) + require.NoError(t, err) + utils.MustMatch(t, wantQueries, sbc.Queries) + + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "commit", nil) + require.NoError(t, err) + + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "begin", nil) + require.NoError(t, err) + + sbc.EphemeralShardErr = mysql.NewSQLError(mysql.CRServerGone, mysql.SSUnknownSQLState, "connection gone") + // as the first time the query fails due to connection loss i.e. reserved conn lost. It will be recreated to set statement will be executed again. + wantQueries = append(wantQueries, + &querypb.BoundQuery{Sql: "set @@default_week_format = 1", BindVariables: map[string]*querypb.BindVariable{}}, + &querypb.BoundQuery{Sql: "insert into simple values ()", BindVariables: map[string]*querypb.BindVariable{}}) + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "insert into simple() values ()", nil) + require.NoError(t, err) + utils.MustMatch(t, wantQueries, sbc.Queries) + + _, err = executor.Execute(ctx, "TestReservedConnDML", session, "commit", nil) + require.NoError(t, err) +} diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 12565c6af09..e63f5499f80 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -227,13 +227,25 @@ func (stc *ScatterConn) ExecuteMultiShard( innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) } if err != nil { - return nil, err + return info.updateReservedID(reservedID, alias), err } } case begin: innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, info.reservedID, opts) if err != nil { - return info.updateTransactionID(transactionID, alias), err + if transactionID != 0 { + return info.updateTransactionID(transactionID, alias), err + } + shouldRetry := checkAndResetShardSession(info, err, session) + if shouldRetry { + // we seem to have lost our connection. if it was a reserved connection, let's try to recreate it + info.actionNeeded = reserveBegin + innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts) + } + if err != nil { + return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err + } + } case reserve: innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, info.transactionID, opts) @@ -727,14 +739,26 @@ type shardActionInfo struct { } func (sai *shardActionInfo) updateTransactionID(txID int64, alias *topodatapb.TabletAlias) *shardActionInfo { + if txID == 0 { + // As transaction id is ZERO, there is nothing to update in session shard sessions. + return nil + } return sai.updateTransactionAndReservedID(txID, sai.reservedID, alias) } func (sai *shardActionInfo) updateReservedID(rID int64, alias *topodatapb.TabletAlias) *shardActionInfo { + if rID == 0 { + // As reserved id is ZERO, there is nothing to update in session shard sessions. + return nil + } return sai.updateTransactionAndReservedID(sai.transactionID, rID, alias) } func (sai *shardActionInfo) updateTransactionAndReservedID(txID int64, rID int64, alias *topodatapb.TabletAlias) *shardActionInfo { + if txID == 0 && rID == 0 { + // As transaction id and reserved id is ZERO, there is nothing to update in session shard sessions. + return nil + } newInfo := *sai newInfo.reservedID = rID newInfo.transactionID = txID diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 3a9816118b1..823cb312de1 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -441,7 +441,7 @@ func (sbc *SandboxConn) HandlePanic(err *error) { //ReserveBeginExecute implements the QueryService interface func (sbc *SandboxConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) { reservedID := sbc.reserve(ctx, target, preQueries, bindVariables, 0, options) - result, transactionID, alias, err := sbc.BeginExecute(ctx, target, preQueries, sql, bindVariables, reservedID, options) + result, transactionID, alias, err := sbc.BeginExecute(ctx, target, nil, sql, bindVariables, reservedID, options) if transactionID != 0 { sbc.txIDToRID[transactionID] = reservedID } diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 1a884038939..7962207137b 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -231,6 +231,9 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re var err error if reservedID != 0 { conn, err = tp.scp.GetAndLock(reservedID, "start transaction on reserve conn") + if err != nil { + return nil, "", vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction %d: %v", reservedID, err) + } } else { immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)