diff --git a/data/test/vtexplain/multi-output/deletesharded-output.txt b/data/test/vtexplain/multi-output/deletesharded-output.txt index 52b6d862319..65df48ce6fa 100644 --- a/data/test/vtexplain/multi-output/deletesharded-output.txt +++ b/data/test/vtexplain/multi-output/deletesharded-output.txt @@ -3,7 +3,7 @@ delete from music_extra where id=1 1 ks_sharded/-40: begin 1 ks_sharded/-40: delete from music_extra where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- delete from music_extra where id=1 and extra='abc' @@ -11,7 +11,7 @@ delete from music_extra where id=1 and extra='abc' 1 ks_sharded/-40: begin 1 ks_sharded/-40: select id from music_extra where id = 1 and extra = 'abc' limit 10001 for update /* vtgate:: keyspace_id:166b40b44aba4bd6 */ 1 ks_sharded/-40: delete from music_extra where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- delete from user where id=1 diff --git a/data/test/vtexplain/multi-output/unsharded-output.txt b/data/test/vtexplain/multi-output/unsharded-output.txt index 4b6af782c14..e5ece7403a1 100644 --- a/data/test/vtexplain/multi-output/unsharded-output.txt +++ b/data/test/vtexplain/multi-output/unsharded-output.txt @@ -8,7 +8,7 @@ insert into t1 (id,intval,floatval) values (1,2,3.14) 1 ks_unsharded/-: begin 1 ks_unsharded/-: insert into t1(id, intval, floatval) values (1, 2, 3.14) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- update t1 set intval = 10 @@ -16,7 +16,7 @@ update t1 set intval = 10 1 ks_unsharded/-: begin 1 ks_unsharded/-: select id from t1 limit 10001 for update 1 ks_unsharded/-: update t1 set intval = 10 where id in (1) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- update t1 set floatval = 9.99 @@ -24,20 +24,20 @@ update t1 set floatval = 9.99 1 ks_unsharded/-: begin 1 ks_unsharded/-: select id from t1 limit 10001 for update 1 ks_unsharded/-: update t1 set floatval = 9.99 where id in (1) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- delete from t1 where id = 100 1 ks_unsharded/-: begin 1 ks_unsharded/-: delete from t1 where id in (100) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- insert into t1 (id,intval,floatval) values (1,2,3.14) on duplicate key update intval=3, floatval=3.14 1 ks_unsharded/-: begin 1 ks_unsharded/-: insert into t1(id, intval, floatval) values (1, 2, 3.14) on duplicate key update intval = 3, floatval = 3.14 -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- diff --git a/data/test/vtexplain/multi-output/updatesharded-output.txt b/data/test/vtexplain/multi-output/updatesharded-output.txt index 62b02b7bea9..6e5a18f3fec 100644 --- a/data/test/vtexplain/multi-output/updatesharded-output.txt +++ b/data/test/vtexplain/multi-output/updatesharded-output.txt @@ -3,7 +3,7 @@ update user set nickname='alice' where id=1 1 ks_sharded/-40: begin 1 ks_sharded/-40: update user set nickname = 'alice' where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- update user set nickname='alice' where name='alice' @@ -21,7 +21,7 @@ update user set pet='fido' where id=1 1 ks_sharded/-40: begin 1 ks_sharded/-40: update user set pet = 'fido' where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- update user set name='alicia' where id=1 diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 52696d7db05..7a745386cff 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -216,6 +216,26 @@ func (t *explainTablet) ReadTransaction(ctx context.Context, target *querypb.Tar return t.tsv.ReadTransaction(ctx, target, dtid) } +// ExecuteBatch is part of the QueryService interface. +func (t *explainTablet) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, transactionID int64, options *querypb.ExecuteOptions) ([]sqltypes.Result, error) { + t.mu.Lock() + t.currentTime = batchTime.Wait() + + // Since the query is simulated being "sent" over the wire we need to + // copy the bindVars into the executor to avoid a data race. + for _, query := range queries { + bindVariables := sqltypes.CopyBindVariables(query.BindVariables) + t.tabletQueries = append(t.tabletQueries, &TabletQuery{ + Time: t.currentTime, + SQL: query.Sql, + BindVars: bindVariables, + }) + } + t.mu.Unlock() + + return t.tsv.ExecuteBatch(ctx, target, queries, asTransaction, transactionID, options) +} + // BeginExecute is part of the QueryService interface. func (t *explainTablet) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) { t.mu.Lock() diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go new file mode 100644 index 00000000000..658c4ad313d --- /dev/null +++ b/go/vt/vtgate/autocommit_test.go @@ -0,0 +1,325 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "testing" + + "golang.org/x/net/context" + + "github.com/youtube/vitess/go/sqltypes" + + querypb "github.com/youtube/vitess/go/vt/proto/query" + vtgatepb "github.com/youtube/vitess/go/vt/proto/vtgate" +) + +// This file contains tests for all the autocommit code paths +// to make sure that single round-trip commits are executed +// correctly whenever possible. + +// TestAutocommitUpdateSharded: instant-commit. +func TestAutocommitUpdateSharded(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "update user set a=2 where id = 1"); err != nil { + t.Fatal(err) + } + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "update user set a = 2 where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitUpdateLookup: transaction: select before update. +func TestAutocommitUpdateLookup(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "update music set a=2 where id = 2"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "select user_id from music_user_map where music_id = :music_id", + BindVariables: map[string]*querypb.BindVariable{ + "music_id": sqltypes.Int64BindVariable(2), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "update music set a = 2 where id = 2 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +// TestAutocommitUpdateVindexChange: transaction: select & update before final update. +func TestAutocommitUpdateVindexChange(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "update user2 set name='myname', lastname='mylastname' where id = 1"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "delete from name_lastname_keyspace_id_map where name = :name and lastname = :lastname and keyspace_id = :keyspace_id", + BindVariables: map[string]*querypb.BindVariable{ + "lastname": sqltypes.StringBindVariable("foo"), + "name": sqltypes.Int32BindVariable(1), + "keyspace_id": sqltypes.BytesBindVariable([]byte("\026k@\264J\272K\326")), + }, + }, { + Sql: "insert into name_lastname_keyspace_id_map(name, lastname, keyspace_id) values (:name0, :lastname0, :keyspace_id0)", + BindVariables: map[string]*querypb.BindVariable{ + "name0": sqltypes.BytesBindVariable([]byte("myname")), + "lastname0": sqltypes.BytesBindVariable([]byte("mylastname")), + "keyspace_id0": sqltypes.BytesBindVariable([]byte("\026k@\264J\272K\326")), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "select name, lastname from user2 where id = 1 for update", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_name0": sqltypes.BytesBindVariable([]byte("myname")), + "_lastname0": sqltypes.BytesBindVariable([]byte("mylastname")), + }, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +// TestAutocommitDeleteSharded: instant-commit. +func TestAutocommitDeleteSharded(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "delete from user_extra where user_id = 1"); err != nil { + t.Fatal(err) + } + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "delete from user_extra where user_id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitDeleteLookup: transaction: select before update. +func TestAutocommitDeleteLookup(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "delete from music where id = 1"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "select user_id from music_user_map where music_id = :music_id", + BindVariables: map[string]*querypb.BindVariable{ + "music_id": sqltypes.Int64BindVariable(1), + }, + }, { + Sql: "delete from music_user_map where music_id = :music_id and user_id = :user_id", + BindVariables: map[string]*querypb.BindVariable{ + "music_id": sqltypes.Int32BindVariable(1), + "user_id": sqltypes.Uint64BindVariable(1), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "select id from music where id = 1 for update", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "delete from music where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +// TestAutocommitInsertSharded: instant-commit. +func TestAutocommitInsertSharded(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into user_extra(user_id, v) values (1, 2)"); err != nil { + t.Fatal(err) + } + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "insert into user_extra(user_id, v) values (:_user_id0, 2) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_user_id0": sqltypes.Int64BindVariable(1), + }, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitInsertLookup: transaction: select before update. +func TestAutocommitInsertLookup(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into user(id, v, name) values (1, 2, 'myname')"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0)", + BindVariables: map[string]*querypb.BindVariable{ + "name0": sqltypes.BytesBindVariable([]byte("myname")), + "user_id0": sqltypes.Uint64BindVariable(1), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "insert into user(id, v, name) values (:_Id0, 2, :_name0) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_Id0": sqltypes.Int64BindVariable(1), + "_name0": sqltypes.BytesBindVariable([]byte("myname")), + "__seq0": sqltypes.Int64BindVariable(1), + }, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +func TestAutocommitInsertMultishard(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into user_extra(user_id, v) values (1, 2), (3, 4)"); err != nil { + t.Fatal(err) + } + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "insert into user_extra(user_id, v) values (:_user_id0, 2) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_user_id0": sqltypes.Int64BindVariable(1), + "_user_id1": sqltypes.Int64BindVariable(3), + }, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) + + testQueries(t, "sbc2", sbc2, []*querypb.BoundQuery{{ + Sql: "insert into user_extra(user_id, v) values (:_user_id1, 4) /* vtgate:: keyspace_id:4eb190c9a2fa169c */", + BindVariables: map[string]*querypb.BindVariable{ + "_user_id0": sqltypes.Int64BindVariable(1), + "_user_id1": sqltypes.Int64BindVariable(3), + }, + }}) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc2", sbc2, 1) +} + +// TestAutocommitInsertAutoinc: instant-commit: sequence fetch is not transactional. +func TestAutocommitInsertAutoinc(t *testing.T) { + executor, _, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into main1(id, name) values (null, 'myname')"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "select next :n values from user_seq", + BindVariables: map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(1)}, + }}) + testBatchQuery(t, "sbclookup", sbclookup, &querypb.BoundQuery{ + Sql: "insert into main1(id, name) values (:__seq0, 'myname')", + BindVariables: map[string]*querypb.BindVariable{ + "__seq0": sqltypes.Int64BindVariable(1), + }, + }) + testAsTransactionCount(t, "sbclookup", sbclookup, 1) + testCommitCount(t, "sbclookup", sbclookup, 0) +} + +// TestAutocommitTransactionStarted: no instant-commit. +func TestAutocommitTransactionStarted(t *testing.T) { + executor, sbc1, _, _ := createExecutorEnv() + + session := &vtgatepb.Session{ + TargetString: "@master", + Autocommit: true, + InTransaction: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + sql := "update user set a=2 where id = 1" + + if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "update user set a = 2 where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitDirectTarget: no instant-commit. +func TestAutocommitDirectTarget(t *testing.T) { + executor, _, _, sbclookup := createExecutorEnv() + + session := &vtgatepb.Session{ + TargetString: "TestUnsharded/0@master", + Autocommit: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + sql := "insert into simple(val) values ('val')" + + if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil { + t.Error(err) + } + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: sql + "/* vtgate:: filtered_replication_unfriendly */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) +} + +func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) { + session := &vtgatepb.Session{ + TargetString: "@master", + Autocommit: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + + return executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}) +} diff --git a/go/vt/vtgate/engine/merge_sort_test.go b/go/vt/vtgate/engine/merge_sort_test.go index 5bdd050899a..bd900acb14a 100644 --- a/go/vt/vtgate/engine/merge_sort_test.go +++ b/go/vt/vtgate/engine/merge_sort_test.go @@ -342,7 +342,7 @@ func (t *fakeVcursor) Execute(method string, query string, bindvars map[string]* panic("unimplemented") } -func (t *fakeVcursor) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML bool) (*sqltypes.Result, error) { +func (t *fakeVcursor) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) { panic("unimplemented") } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 938a91ca553..07458c30ad7 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -43,7 +43,7 @@ type VCursor interface { // Context returns the context of the current request. Context() context.Context Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) - ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML bool) (*sqltypes.Result, error) + ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, keyspace, shard string) (*sqltypes.Result, error) StreamExecuteMulti(query string, keyspace string, shardVars map[string]map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error GetKeyspaceShards(vkeyspace *vindexes.Keyspace) (string, []*topodatapb.ShardReference, error) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 67a0d3c4737..1e51e73ef1e 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -333,7 +333,8 @@ func (route *Route) execute(vcursor VCursor, bindVars, joinVars map[string]*quer } shardQueries := route.getShardQueries(route.Query, params) - result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, isDML) + // canAutocommit should be true only if it's a DML. + result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, isDML, isDML /* canAutocommit */) if err != nil { return nil, err } @@ -388,7 +389,7 @@ func (route *Route) GetFields(vcursor VCursor, bindVars, joinVars map[string]*qu return nil, err } - qr, err := route.execShard(vcursor, route.FieldQuery, bindVars, ks, shard, false /* isDML */) + qr, err := route.execShard(vcursor, route.FieldQuery, bindVars, ks, shard, false /* isDML */, false /* canAutocommit */) if err != nil { return nil, err } @@ -467,7 +468,7 @@ func (route *Route) execUpdateEqual(vcursor VCursor, bindVars map[string]*queryp return result, nil } rewritten := sqlannotation.AddKeyspaceIDs(route.Query, [][]byte{ksid}, "") - return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */) + return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */, true /* canAutocommit */) } // execUpdateEqualChangedVindex performs an update when a vindex is being modified @@ -494,7 +495,7 @@ func (route *Route) execUpdateEqualChangedVindex(vcursor VCursor, query string, var err error rewritten := sqlannotation.AddKeyspaceIDs(route.Query, [][]byte{keyspaceID}, "") if route.Subquery != "" { - subQueryResult, err = route.execShard(vcursor, route.Subquery, bindVars, keyspace, shard, false /* isDML */) + subQueryResult, err = route.execShard(vcursor, route.Subquery, bindVars, keyspace, shard, false /* isDML */, false /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execUpdateEqual") } @@ -503,7 +504,7 @@ func (route *Route) execUpdateEqualChangedVindex(vcursor VCursor, query string, if err != nil { return nil, vterrors.Wrap(err, "execUpdateEqual") } - result, err := route.execShard(vcursor, rewritten, bindVars, keyspace, shard, true /* isDML */) + result, err := route.execShard(vcursor, rewritten, bindVars, keyspace, shard, true /* isDML */, true /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execUpdateEqual") } @@ -529,7 +530,7 @@ func (route *Route) execDeleteEqual(vcursor VCursor, bindVars map[string]*queryp } } rewritten := sqlannotation.AddKeyspaceIDs(route.Query, [][]byte{ksid}, "") - return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */) + return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */, true /* canAutocommit */) } func (route *Route) execInsertUnsharded(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { @@ -543,7 +544,7 @@ func (route *Route) execInsertUnsharded(vcursor VCursor, bindVars map[string]*qu } shardQueries := route.getShardQueries(route.Query, params) - result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, true /* isDML */) + result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, true /* isDML */, true /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execInsertUnsharded") } @@ -568,7 +569,7 @@ func (route *Route) execInsertSharded(vcursor VCursor, bindVars map[string]*quer return nil, vterrors.Wrap(err, "execInsertSharded") } - result, err := vcursor.ExecuteMultiShard(keyspace, shardQueries, true /* isDML */) + result, err := vcursor.ExecuteMultiShard(keyspace, shardQueries, true /* isDML */, true /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execInsertSharded") } @@ -689,7 +690,7 @@ func (route *Route) updateChangedVindexes(subQueryResult *sqltypes.Result, vcurs } func (route *Route) deleteVindexEntries(vcursor VCursor, bindVars map[string]*querypb.BindVariable, ks, shard string, ksid []byte) error { - result, err := route.execShard(vcursor, route.Subquery, bindVars, ks, shard, false /* isDML */) + result, err := route.execShard(vcursor, route.Subquery, bindVars, ks, shard, false /* isDML */, false /* canAutocommit */) if err != nil { return err } @@ -1080,13 +1081,13 @@ func (route *Route) execAnyShard(vcursor VCursor, bindVars map[string]*querypb.B return vcursor.ExecuteStandalone(route.Query, bindVars, ks, shard) } -func (route *Route) execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, keyspace, shard string, isDML bool) (*sqltypes.Result, error) { +func (route *Route) execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, keyspace, shard string, isDML, canAutocommit bool) (*sqltypes.Result, error) { return vcursor.ExecuteMultiShard(keyspace, map[string]*querypb.BoundQuery{ shard: { Sql: query, BindVariables: bindVars, }, - }, isDML) + }, isDML, canAutocommit) } func (route *Route) anyShard(vcursor VCursor, keyspace *vindexes.Keyspace) (string, string, error) { diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 89b990b0909..c12a96cb85e 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -166,6 +166,16 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st defer e.txConn.Rollback(ctx, safeSession) } + // The SetAutocommitable flag should be same as mustCommit. + // If we started a transaction because of autocommit, then mustCommit + // will be true, which means that we can autocommit. If we were already + // in a transaction, it means that the app started it, or we are being + // called recursively. If so, we cannot autocommit because whatever we + // do is likely not final. + // The control flow is such that autocommitable can only be turned on + // at the beginning, but never after. + safeSession.SetAutocommitable(mustCommit) + qr, err := e.handleExec(ctx, safeSession, sql, bindVars, target, logStats) if err != nil { return nil, err @@ -202,6 +212,8 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) { if target.Shard != "" { // V1 mode or V3 mode with a forced shard target + // TODO(sougou): change this flow to go through V3 functions + // which will allow us to benefit from the autocommitable flag. sql = sqlannotation.AnnotateIfDML(sql, nil) if e.normalize { diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 4a70de283d3..2f6fb6af06d 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -18,6 +18,7 @@ package vtgate import ( "fmt" + "reflect" "strconv" "strings" "testing" @@ -309,6 +310,40 @@ func executorStream(executor *Executor, sql string) (qr *sqltypes.Result, err er return qr, nil } +// testBatchQuery verifies that a single (or no) query ExecuteBatch was performed on the SandboxConn. +func testBatchQuery(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, boundQuery *querypb.BoundQuery) { + t.Helper() + + var wantQueries [][]*querypb.BoundQuery + if boundQuery != nil { + wantQueries = [][]*querypb.BoundQuery{{boundQuery}} + } + if !reflect.DeepEqual(sbc.BatchQueries, wantQueries) { + t.Errorf("%s.BatchQueries:\n%+v, want\n%+v\n", sbcName, sbc.BatchQueries, wantQueries) + } +} + +func testAsTransactionCount(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, want int) { + t.Helper() + if got, want := sbc.AsTransactionCount.Get(), int64(want); got != want { + t.Errorf("%s.AsTransactionCount: %d, want %d\n", sbcName, got, want) + } +} + +func testQueries(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, wantQueries []*querypb.BoundQuery) { + t.Helper() + if !reflect.DeepEqual(sbc.Queries, wantQueries) { + t.Errorf("%s.Queries:\n%+v, want\n%+v\n", sbcName, sbc.Queries, wantQueries) + } +} + +func testCommitCount(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, want int) { + t.Helper() + if got, want := sbc.CommitCount.Get(), int64(want); got != want { + t.Errorf("%s.CommitCount: %d, want %d\n", sbcName, got, want) + } +} + func testNonZeroDuration(t *testing.T, what, d string) { t.Helper() time, _ := strconv.ParseFloat(d, 64) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 15a249c5a16..3f3d0b2f0bd 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -385,7 +385,9 @@ func TestExecutorAutocommit(t *testing.T) { t.Errorf("Commit count: %d, want %d", got, want) } - startCount = sbclookup.CommitCount.Get() + // In the following section, we look at AsTransaction count instead of CommitCount because + // the update results in a single round-trip ExecuteBatch call. + startCount = sbclookup.AsTransactionCount.Get() _, err = executor.Execute(context.Background(), "TestExecute", session, "update main1 set id=1", nil) if err != nil { t.Fatal(err) @@ -394,7 +396,7 @@ func TestExecutorAutocommit(t *testing.T) { if !proto.Equal(session.Session, wantSession) { t.Errorf("autocommit=1: %v, want %v", session.Session, wantSession) } - if got, want := sbclookup.CommitCount.Get(), startCount+1; got != want { + if got, want := sbclookup.AsTransactionCount.Get(), startCount+1; got != want { t.Errorf("Commit count: %d, want %d", got, want) } @@ -407,6 +409,7 @@ func TestExecutorAutocommit(t *testing.T) { } // autocommit = 1, "begin" + session.Reset() startCount = sbclookup.CommitCount.Get() _, err = executor.Execute(context.Background(), "TestExecute", session, "begin", nil) if err != nil { diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index e829296e869..629676cf24e 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -32,11 +32,36 @@ import ( // (the use pattern is 'Find', if not found, then 'Append', // for a single shard) type SafeSession struct { - mu sync.Mutex - mustRollback bool + mu sync.Mutex + mustRollback bool + autocommitState autocommitState *vtgatepb.Session } +// autocommitState keeps track of whether a single round-trip +// commit to vttablet is possible. It starts as autocommitable +// if we started a transaction because of the autocommit flag +// being set. Otherwise, it starts as notAutocommitable. +// If execute is recursively called using the same session, +// like from a vindex, we will already be in a transaction, +// and this should cause the state to become notAutocommitable. +// +// SafeSession lets you request a commit token, which will +// be issued if the state is autocommitable, and ShardSessions +// is empty, implying that no intermediate transactions were started. +// If so, the state transitions to autocommited, which is terminal. +// If the token is succesfully issued, the caller has to perform +// the commit. If a token cannot be issued, then a traditional +// commit has to be performed at the outermost level where +// the autocommitable transition happened. +type autocommitState int + +const ( + notAutocommittable = autocommitState(iota) + autocommittable + autocommitted +) + // NewSafeSession returns a new SafeSession based on the Session func NewSafeSession(sessn *vtgatepb.Session) *SafeSession { return &SafeSession{Session: sessn} @@ -52,6 +77,41 @@ func NewAutocommitSession(sessn *vtgatepb.Session) *SafeSession { return NewSafeSession(newSession) } +// SetAutocommitable sets the state to autocommitable if true. +// Otherwise, it's notAutocommitable. +func (session *SafeSession) SetAutocommitable(flag bool) { + session.mu.Lock() + defer session.mu.Unlock() + + if session.autocommitState == autocommitted { + panic("BUG: SetAutocommitable: unexpected autocommit state") + } + + if flag { + session.autocommitState = autocommittable + } else { + session.autocommitState = notAutocommittable + } +} + +// AutocommitApproval returns true if we can perform a single round-trip +// autocommit. If so, the caller is responsible for commiting their +// transaction. +func (session *SafeSession) AutocommitApproval() bool { + session.mu.Lock() + defer session.mu.Unlock() + + if session.autocommitState == autocommitted { + panic("BUG: AutocommitToken: unexpected autocommit state") + } + + if session.autocommitState == autocommittable && len(session.ShardSessions) == 0 { + session.autocommitState = autocommitted + return true + } + return false +} + // InTransaction returns true if we are in a transaction func (session *SafeSession) InTransaction() bool { if session == nil || session.Session == nil { @@ -81,6 +141,11 @@ func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.T func (session *SafeSession) Append(shardSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error { session.mu.Lock() defer session.mu.Unlock() + + if session.autocommitState == autocommitted { + panic("BUG: SafeSession.Append: unexpected autocommit state") + } + // Always append, in order for rollback to succeed. session.ShardSessions = append(session.ShardSessions, shardSession) if session.isSingleDB(txMode) && len(session.ShardSessions) > 1 { @@ -124,6 +189,8 @@ func (session *SafeSession) Reset() { } session.mu.Lock() defer session.mu.Unlock() + session.mustRollback = false + session.autocommitState = notAutocommittable session.Session.InTransaction = false session.SingleDb = false session.ShardSessions = nil diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index ca17e1b26c1..167102652d8 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -166,6 +166,7 @@ func (stc *ScatterConn) ExecuteMultiShard( tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, + canAutocommit bool, ) (*sqltypes.Result, error) { // mu protects qr @@ -176,6 +177,8 @@ func (stc *ScatterConn) ExecuteMultiShard( shards = append(shards, shard) } + canCommit := len(shards) == 1 && canAutocommit && session.AutocommitApproval() + err := stc.multiGoTransaction( ctx, "Execute", @@ -185,23 +188,25 @@ func (stc *ScatterConn) ExecuteMultiShard( session, notInTransaction, func(target *querypb.Target, shouldBegin bool, transactionID int64) (int64, error) { - var innerqr *sqltypes.Result - var opts *querypb.ExecuteOptions + var ( + innerqr *sqltypes.Result + err error + opts *querypb.ExecuteOptions + ) if session != nil && session.Session != nil { opts = session.Session.Options } - if shouldBegin { - var err error + + switch { + case canCommit: + innerqr, err = stc.executeAutocommit(ctx, target, shardQueries[target.Shard].Sql, shardQueries[target.Shard].BindVariables, opts) + case shouldBegin: innerqr, transactionID, err = stc.gateway.BeginExecute(ctx, target, shardQueries[target.Shard].Sql, shardQueries[target.Shard].BindVariables, opts) - if err != nil { - return transactionID, err - } - } else { - var err error + default: innerqr, err = stc.gateway.Execute(ctx, target, shardQueries[target.Shard].Sql, shardQueries[target.Shard].BindVariables, transactionID, opts) - if err != nil { - return transactionID, err - } + } + if err != nil { + return transactionID, err } mu.Lock() @@ -212,6 +217,20 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, err } +func (stc *ScatterConn) executeAutocommit(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, error) { + queries := []*querypb.BoundQuery{{ + Sql: sql, + BindVariables: bindVariables, + }} + // ExecuteBatch is a stop-gap because it's the only function that can currently do + // single round-trip commit. + qrs, err := stc.gateway.ExecuteBatch(ctx, target, queries, true /* asTransaction */, 0, options) + if err != nil { + return nil, err + } + return &qrs[0], nil +} + // ExecuteEntityIds executes queries that are shard specific. func (stc *ScatterConn) ExecuteEntityIds( ctx context.Context, diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index b054162372c..a7cd74e44a2 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -57,7 +57,7 @@ func TestScatterConnExecuteMulti(t *testing.T) { shardQueries[shard] = query } - return sc.ExecuteMultiShard(context.Background(), "TestScatterConnExecuteMultiShard", shardQueries, topodatapb.TabletType_REPLICA, nil, false) + return sc.ExecuteMultiShard(context.Background(), "TestScatterConnExecuteMultiShard", shardQueries, topodatapb.TabletType_REPLICA, nil, false, false) }) } @@ -249,7 +249,7 @@ func TestMultiExecs(t *testing.T) { shardQueries[shard] = query } - _, _ = sc.ExecuteMultiShard(context.Background(), "TestMultiExecs", shardQueries, topodatapb.TabletType_REPLICA, nil, false) + _, _ = sc.ExecuteMultiShard(context.Background(), "TestMultiExecs", shardQueries, topodatapb.TabletType_REPLICA, nil, false, false) if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 { t.Fatalf("didn't get expected query") } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index e3d92f74c77..a2e642e22f3 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -110,9 +110,9 @@ func (vc *vcursorImpl) Execute(method string, query string, BindVars map[string] } // ExecuteMultiShard executes different queries on different shards and returns the combined result. -func (vc *vcursorImpl) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML bool) (*sqltypes.Result, error) { +func (vc *vcursorImpl) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) { atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(shardQueries))) - qr, err := vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, commentedShardQueries(shardQueries, vc.trailingComments), vc.target.TabletType, vc.safeSession, false) + qr, err := vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, commentedShardQueries(shardQueries, vc.trailingComments), vc.target.TabletType, vc.safeSession, false, canAutocommit) if err == nil { vc.hasPartialDML = true } @@ -127,7 +127,9 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, BindVars map[string]*quer BindVariables: BindVars, }, } - return vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, bq, vc.target.TabletType, NewAutocommitSession(vc.safeSession.Session), false) + // The canAutocommit flag is not significant because we currently don't execute DMLs through ExecuteStandalone. + // But we set it to true for future-proofing this function. + return vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, bq, vc.target.TabletType, NewAutocommitSession(vc.safeSession.Session), false, true /* canAutocommit */) } // StreamExeculteMulti is the streaming version of ExecuteMultiShard.