From 3f869229b5e8124e62e0373aed46a1d507ee52ad Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 16 Jan 2018 21:19:21 -0800 Subject: [PATCH] v3: instant-commit for autocommit If we are in autocommit mode and vtgate does not break a DML into smaller parts, then it has the opportunity to send that statement through to a vttablet as autocommit in a single round-trip. Reviewer instructions: @demmer: I'm not too sure about the vtexplain fixes, or if additional tests are required there. Extra scrutiny may be required there. Implementation notes: * SafeSession has a state machine for tracking autocommit state. * The autocommit state is initialized by executor as needed. * VCursor API has been changed for ExecMultiShard. It now accepts an extra canCommit flag that should be set to true if the engine is executing its final DML. This, combined with the autocommit state will decide if an instant autocommit is possible. --- .../multi-output/deletesharded-output.txt | 4 +- .../multi-output/unsharded-output.txt | 10 +- .../multi-output/updatesharded-output.txt | 4 +- go/vt/vtexplain/vtexplain_vttablet.go | 20 ++ go/vt/vtgate/autocommit_test.go | 325 ++++++++++++++++++ go/vt/vtgate/engine/merge_sort_test.go | 2 +- go/vt/vtgate/engine/primitive.go | 2 +- go/vt/vtgate/engine/route.go | 23 +- go/vt/vtgate/executor.go | 12 + go/vt/vtgate/executor_framework_test.go | 35 ++ go/vt/vtgate/executor_test.go | 7 +- go/vt/vtgate/safe_session.go | 71 +++- go/vt/vtgate/scatter_conn.go | 43 ++- go/vt/vtgate/scatter_conn_test.go | 4 +- go/vt/vtgate/vcursor_impl.go | 8 +- 15 files changed, 527 insertions(+), 43 deletions(-) create mode 100644 go/vt/vtgate/autocommit_test.go diff --git a/data/test/vtexplain/multi-output/deletesharded-output.txt b/data/test/vtexplain/multi-output/deletesharded-output.txt index 52b6d862319..65df48ce6fa 100644 --- a/data/test/vtexplain/multi-output/deletesharded-output.txt +++ b/data/test/vtexplain/multi-output/deletesharded-output.txt @@ -3,7 +3,7 @@ delete from music_extra where id=1 1 ks_sharded/-40: begin 1 ks_sharded/-40: delete from music_extra where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- delete from music_extra where id=1 and extra='abc' @@ -11,7 +11,7 @@ delete from music_extra where id=1 and extra='abc' 1 ks_sharded/-40: begin 1 ks_sharded/-40: select id from music_extra where id = 1 and extra = 'abc' limit 10001 for update /* vtgate:: keyspace_id:166b40b44aba4bd6 */ 1 ks_sharded/-40: delete from music_extra where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- delete from user where id=1 diff --git a/data/test/vtexplain/multi-output/unsharded-output.txt b/data/test/vtexplain/multi-output/unsharded-output.txt index 4b6af782c14..e5ece7403a1 100644 --- a/data/test/vtexplain/multi-output/unsharded-output.txt +++ b/data/test/vtexplain/multi-output/unsharded-output.txt @@ -8,7 +8,7 @@ insert into t1 (id,intval,floatval) values (1,2,3.14) 1 ks_unsharded/-: begin 1 ks_unsharded/-: insert into t1(id, intval, floatval) values (1, 2, 3.14) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- update t1 set intval = 10 @@ -16,7 +16,7 @@ update t1 set intval = 10 1 ks_unsharded/-: begin 1 ks_unsharded/-: select id from t1 limit 10001 for update 1 ks_unsharded/-: update t1 set intval = 10 where id in (1) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- update t1 set floatval = 9.99 @@ -24,20 +24,20 @@ update t1 set floatval = 9.99 1 ks_unsharded/-: begin 1 ks_unsharded/-: select id from t1 limit 10001 for update 1 ks_unsharded/-: update t1 set floatval = 9.99 where id in (1) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- delete from t1 where id = 100 1 ks_unsharded/-: begin 1 ks_unsharded/-: delete from t1 where id in (100) -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- insert into t1 (id,intval,floatval) values (1,2,3.14) on duplicate key update intval=3, floatval=3.14 1 ks_unsharded/-: begin 1 ks_unsharded/-: insert into t1(id, intval, floatval) values (1, 2, 3.14) on duplicate key update intval = 3, floatval = 3.14 -2 ks_unsharded/-: commit +1 ks_unsharded/-: commit ---------------------------------------------------------------------- diff --git a/data/test/vtexplain/multi-output/updatesharded-output.txt b/data/test/vtexplain/multi-output/updatesharded-output.txt index 62b02b7bea9..6e5a18f3fec 100644 --- a/data/test/vtexplain/multi-output/updatesharded-output.txt +++ b/data/test/vtexplain/multi-output/updatesharded-output.txt @@ -3,7 +3,7 @@ update user set nickname='alice' where id=1 1 ks_sharded/-40: begin 1 ks_sharded/-40: update user set nickname = 'alice' where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- update user set nickname='alice' where name='alice' @@ -21,7 +21,7 @@ update user set pet='fido' where id=1 1 ks_sharded/-40: begin 1 ks_sharded/-40: update user set pet = 'fido' where id in (1) /* vtgate:: keyspace_id:166b40b44aba4bd6 */ -2 ks_sharded/-40: commit +1 ks_sharded/-40: commit ---------------------------------------------------------------------- update user set name='alicia' where id=1 diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 52696d7db05..7a745386cff 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -216,6 +216,26 @@ func (t *explainTablet) ReadTransaction(ctx context.Context, target *querypb.Tar return t.tsv.ReadTransaction(ctx, target, dtid) } +// ExecuteBatch is part of the QueryService interface. +func (t *explainTablet) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, transactionID int64, options *querypb.ExecuteOptions) ([]sqltypes.Result, error) { + t.mu.Lock() + t.currentTime = batchTime.Wait() + + // Since the query is simulated being "sent" over the wire we need to + // copy the bindVars into the executor to avoid a data race. + for _, query := range queries { + bindVariables := sqltypes.CopyBindVariables(query.BindVariables) + t.tabletQueries = append(t.tabletQueries, &TabletQuery{ + Time: t.currentTime, + SQL: query.Sql, + BindVars: bindVariables, + }) + } + t.mu.Unlock() + + return t.tsv.ExecuteBatch(ctx, target, queries, asTransaction, transactionID, options) +} + // BeginExecute is part of the QueryService interface. func (t *explainTablet) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) { t.mu.Lock() diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go new file mode 100644 index 00000000000..658c4ad313d --- /dev/null +++ b/go/vt/vtgate/autocommit_test.go @@ -0,0 +1,325 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "testing" + + "golang.org/x/net/context" + + "github.com/youtube/vitess/go/sqltypes" + + querypb "github.com/youtube/vitess/go/vt/proto/query" + vtgatepb "github.com/youtube/vitess/go/vt/proto/vtgate" +) + +// This file contains tests for all the autocommit code paths +// to make sure that single round-trip commits are executed +// correctly whenever possible. + +// TestAutocommitUpdateSharded: instant-commit. +func TestAutocommitUpdateSharded(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "update user set a=2 where id = 1"); err != nil { + t.Fatal(err) + } + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "update user set a = 2 where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitUpdateLookup: transaction: select before update. +func TestAutocommitUpdateLookup(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "update music set a=2 where id = 2"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "select user_id from music_user_map where music_id = :music_id", + BindVariables: map[string]*querypb.BindVariable{ + "music_id": sqltypes.Int64BindVariable(2), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "update music set a = 2 where id = 2 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +// TestAutocommitUpdateVindexChange: transaction: select & update before final update. +func TestAutocommitUpdateVindexChange(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "update user2 set name='myname', lastname='mylastname' where id = 1"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "delete from name_lastname_keyspace_id_map where name = :name and lastname = :lastname and keyspace_id = :keyspace_id", + BindVariables: map[string]*querypb.BindVariable{ + "lastname": sqltypes.StringBindVariable("foo"), + "name": sqltypes.Int32BindVariable(1), + "keyspace_id": sqltypes.BytesBindVariable([]byte("\026k@\264J\272K\326")), + }, + }, { + Sql: "insert into name_lastname_keyspace_id_map(name, lastname, keyspace_id) values (:name0, :lastname0, :keyspace_id0)", + BindVariables: map[string]*querypb.BindVariable{ + "name0": sqltypes.BytesBindVariable([]byte("myname")), + "lastname0": sqltypes.BytesBindVariable([]byte("mylastname")), + "keyspace_id0": sqltypes.BytesBindVariable([]byte("\026k@\264J\272K\326")), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "select name, lastname from user2 where id = 1 for update", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_name0": sqltypes.BytesBindVariable([]byte("myname")), + "_lastname0": sqltypes.BytesBindVariable([]byte("mylastname")), + }, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +// TestAutocommitDeleteSharded: instant-commit. +func TestAutocommitDeleteSharded(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "delete from user_extra where user_id = 1"); err != nil { + t.Fatal(err) + } + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "delete from user_extra where user_id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitDeleteLookup: transaction: select before update. +func TestAutocommitDeleteLookup(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "delete from music where id = 1"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "select user_id from music_user_map where music_id = :music_id", + BindVariables: map[string]*querypb.BindVariable{ + "music_id": sqltypes.Int64BindVariable(1), + }, + }, { + Sql: "delete from music_user_map where music_id = :music_id and user_id = :user_id", + BindVariables: map[string]*querypb.BindVariable{ + "music_id": sqltypes.Int32BindVariable(1), + "user_id": sqltypes.Uint64BindVariable(1), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "select id from music where id = 1 for update", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "delete from music where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +// TestAutocommitInsertSharded: instant-commit. +func TestAutocommitInsertSharded(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into user_extra(user_id, v) values (1, 2)"); err != nil { + t.Fatal(err) + } + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "insert into user_extra(user_id, v) values (:_user_id0, 2) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_user_id0": sqltypes.Int64BindVariable(1), + }, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitInsertLookup: transaction: select before update. +func TestAutocommitInsertLookup(t *testing.T) { + executor, sbc1, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into user(id, v, name) values (1, 2, 'myname')"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0)", + BindVariables: map[string]*querypb.BindVariable{ + "name0": sqltypes.BytesBindVariable([]byte("myname")), + "user_id0": sqltypes.Uint64BindVariable(1), + }, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "insert into user(id, v, name) values (:_Id0, 2, :_name0) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_Id0": sqltypes.Int64BindVariable(1), + "_name0": sqltypes.BytesBindVariable([]byte("myname")), + "__seq0": sqltypes.Int64BindVariable(1), + }, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + +func TestAutocommitInsertMultishard(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into user_extra(user_id, v) values (1, 2), (3, 4)"); err != nil { + t.Fatal(err) + } + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "insert into user_extra(user_id, v) values (:_user_id0, 2) /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{ + "_user_id0": sqltypes.Int64BindVariable(1), + "_user_id1": sqltypes.Int64BindVariable(3), + }, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) + + testQueries(t, "sbc2", sbc2, []*querypb.BoundQuery{{ + Sql: "insert into user_extra(user_id, v) values (:_user_id1, 4) /* vtgate:: keyspace_id:4eb190c9a2fa169c */", + BindVariables: map[string]*querypb.BindVariable{ + "_user_id0": sqltypes.Int64BindVariable(1), + "_user_id1": sqltypes.Int64BindVariable(3), + }, + }}) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc2", sbc2, 1) +} + +// TestAutocommitInsertAutoinc: instant-commit: sequence fetch is not transactional. +func TestAutocommitInsertAutoinc(t *testing.T) { + executor, _, _, sbclookup := createExecutorEnv() + + if _, err := autocommitExec(executor, "insert into main1(id, name) values (null, 'myname')"); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: "select next :n values from user_seq", + BindVariables: map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(1)}, + }}) + testBatchQuery(t, "sbclookup", sbclookup, &querypb.BoundQuery{ + Sql: "insert into main1(id, name) values (:__seq0, 'myname')", + BindVariables: map[string]*querypb.BindVariable{ + "__seq0": sqltypes.Int64BindVariable(1), + }, + }) + testAsTransactionCount(t, "sbclookup", sbclookup, 1) + testCommitCount(t, "sbclookup", sbclookup, 0) +} + +// TestAutocommitTransactionStarted: no instant-commit. +func TestAutocommitTransactionStarted(t *testing.T) { + executor, sbc1, _, _ := createExecutorEnv() + + session := &vtgatepb.Session{ + TargetString: "@master", + Autocommit: true, + InTransaction: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + sql := "update user set a=2 where id = 1" + + if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil { + t.Fatal(err) + } + + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: "update user set a = 2 where id = 1 /* vtgate:: keyspace_id:166b40b44aba4bd6 */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 0) +} + +// TestAutocommitDirectTarget: no instant-commit. +func TestAutocommitDirectTarget(t *testing.T) { + executor, _, _, sbclookup := createExecutorEnv() + + session := &vtgatepb.Session{ + TargetString: "TestUnsharded/0@master", + Autocommit: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + sql := "insert into simple(val) values ('val')" + + if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil { + t.Error(err) + } + testQueries(t, "sbclookup", sbclookup, []*querypb.BoundQuery{{ + Sql: sql + "/* vtgate:: filtered_replication_unfriendly */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbclookup", sbclookup, 0) + testCommitCount(t, "sbclookup", sbclookup, 1) +} + +func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) { + session := &vtgatepb.Session{ + TargetString: "@master", + Autocommit: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + + return executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}) +} diff --git a/go/vt/vtgate/engine/merge_sort_test.go b/go/vt/vtgate/engine/merge_sort_test.go index 5bdd050899a..bd900acb14a 100644 --- a/go/vt/vtgate/engine/merge_sort_test.go +++ b/go/vt/vtgate/engine/merge_sort_test.go @@ -342,7 +342,7 @@ func (t *fakeVcursor) Execute(method string, query string, bindvars map[string]* panic("unimplemented") } -func (t *fakeVcursor) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML bool) (*sqltypes.Result, error) { +func (t *fakeVcursor) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) { panic("unimplemented") } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 938a91ca553..07458c30ad7 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -43,7 +43,7 @@ type VCursor interface { // Context returns the context of the current request. Context() context.Context Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error) - ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML bool) (*sqltypes.Result, error) + ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, keyspace, shard string) (*sqltypes.Result, error) StreamExecuteMulti(query string, keyspace string, shardVars map[string]map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error GetKeyspaceShards(vkeyspace *vindexes.Keyspace) (string, []*topodatapb.ShardReference, error) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 67a0d3c4737..1e51e73ef1e 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -333,7 +333,8 @@ func (route *Route) execute(vcursor VCursor, bindVars, joinVars map[string]*quer } shardQueries := route.getShardQueries(route.Query, params) - result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, isDML) + // canAutocommit should be true only if it's a DML. + result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, isDML, isDML /* canAutocommit */) if err != nil { return nil, err } @@ -388,7 +389,7 @@ func (route *Route) GetFields(vcursor VCursor, bindVars, joinVars map[string]*qu return nil, err } - qr, err := route.execShard(vcursor, route.FieldQuery, bindVars, ks, shard, false /* isDML */) + qr, err := route.execShard(vcursor, route.FieldQuery, bindVars, ks, shard, false /* isDML */, false /* canAutocommit */) if err != nil { return nil, err } @@ -467,7 +468,7 @@ func (route *Route) execUpdateEqual(vcursor VCursor, bindVars map[string]*queryp return result, nil } rewritten := sqlannotation.AddKeyspaceIDs(route.Query, [][]byte{ksid}, "") - return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */) + return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */, true /* canAutocommit */) } // execUpdateEqualChangedVindex performs an update when a vindex is being modified @@ -494,7 +495,7 @@ func (route *Route) execUpdateEqualChangedVindex(vcursor VCursor, query string, var err error rewritten := sqlannotation.AddKeyspaceIDs(route.Query, [][]byte{keyspaceID}, "") if route.Subquery != "" { - subQueryResult, err = route.execShard(vcursor, route.Subquery, bindVars, keyspace, shard, false /* isDML */) + subQueryResult, err = route.execShard(vcursor, route.Subquery, bindVars, keyspace, shard, false /* isDML */, false /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execUpdateEqual") } @@ -503,7 +504,7 @@ func (route *Route) execUpdateEqualChangedVindex(vcursor VCursor, query string, if err != nil { return nil, vterrors.Wrap(err, "execUpdateEqual") } - result, err := route.execShard(vcursor, rewritten, bindVars, keyspace, shard, true /* isDML */) + result, err := route.execShard(vcursor, rewritten, bindVars, keyspace, shard, true /* isDML */, true /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execUpdateEqual") } @@ -529,7 +530,7 @@ func (route *Route) execDeleteEqual(vcursor VCursor, bindVars map[string]*queryp } } rewritten := sqlannotation.AddKeyspaceIDs(route.Query, [][]byte{ksid}, "") - return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */) + return route.execShard(vcursor, rewritten, bindVars, ks, shard, true /* isDML */, true /* canAutocommit */) } func (route *Route) execInsertUnsharded(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { @@ -543,7 +544,7 @@ func (route *Route) execInsertUnsharded(vcursor VCursor, bindVars map[string]*qu } shardQueries := route.getShardQueries(route.Query, params) - result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, true /* isDML */) + result, err := vcursor.ExecuteMultiShard(params.ks, shardQueries, true /* isDML */, true /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execInsertUnsharded") } @@ -568,7 +569,7 @@ func (route *Route) execInsertSharded(vcursor VCursor, bindVars map[string]*quer return nil, vterrors.Wrap(err, "execInsertSharded") } - result, err := vcursor.ExecuteMultiShard(keyspace, shardQueries, true /* isDML */) + result, err := vcursor.ExecuteMultiShard(keyspace, shardQueries, true /* isDML */, true /* canAutocommit */) if err != nil { return nil, vterrors.Wrap(err, "execInsertSharded") } @@ -689,7 +690,7 @@ func (route *Route) updateChangedVindexes(subQueryResult *sqltypes.Result, vcurs } func (route *Route) deleteVindexEntries(vcursor VCursor, bindVars map[string]*querypb.BindVariable, ks, shard string, ksid []byte) error { - result, err := route.execShard(vcursor, route.Subquery, bindVars, ks, shard, false /* isDML */) + result, err := route.execShard(vcursor, route.Subquery, bindVars, ks, shard, false /* isDML */, false /* canAutocommit */) if err != nil { return err } @@ -1080,13 +1081,13 @@ func (route *Route) execAnyShard(vcursor VCursor, bindVars map[string]*querypb.B return vcursor.ExecuteStandalone(route.Query, bindVars, ks, shard) } -func (route *Route) execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, keyspace, shard string, isDML bool) (*sqltypes.Result, error) { +func (route *Route) execShard(vcursor VCursor, query string, bindVars map[string]*querypb.BindVariable, keyspace, shard string, isDML, canAutocommit bool) (*sqltypes.Result, error) { return vcursor.ExecuteMultiShard(keyspace, map[string]*querypb.BoundQuery{ shard: { Sql: query, BindVariables: bindVars, }, - }, isDML) + }, isDML, canAutocommit) } func (route *Route) anyShard(vcursor VCursor, keyspace *vindexes.Keyspace) (string, string, error) { diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 89b990b0909..c12a96cb85e 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -166,6 +166,16 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st defer e.txConn.Rollback(ctx, safeSession) } + // The SetAutocommitable flag should be same as mustCommit. + // If we started a transaction because of autocommit, then mustCommit + // will be true, which means that we can autocommit. If we were already + // in a transaction, it means that the app started it, or we are being + // called recursively. If so, we cannot autocommit because whatever we + // do is likely not final. + // The control flow is such that autocommitable can only be turned on + // at the beginning, but never after. + safeSession.SetAutocommitable(mustCommit) + qr, err := e.handleExec(ctx, safeSession, sql, bindVars, target, logStats) if err != nil { return nil, err @@ -202,6 +212,8 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) { if target.Shard != "" { // V1 mode or V3 mode with a forced shard target + // TODO(sougou): change this flow to go through V3 functions + // which will allow us to benefit from the autocommitable flag. sql = sqlannotation.AnnotateIfDML(sql, nil) if e.normalize { diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 4a70de283d3..2f6fb6af06d 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -18,6 +18,7 @@ package vtgate import ( "fmt" + "reflect" "strconv" "strings" "testing" @@ -309,6 +310,40 @@ func executorStream(executor *Executor, sql string) (qr *sqltypes.Result, err er return qr, nil } +// testBatchQuery verifies that a single (or no) query ExecuteBatch was performed on the SandboxConn. +func testBatchQuery(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, boundQuery *querypb.BoundQuery) { + t.Helper() + + var wantQueries [][]*querypb.BoundQuery + if boundQuery != nil { + wantQueries = [][]*querypb.BoundQuery{{boundQuery}} + } + if !reflect.DeepEqual(sbc.BatchQueries, wantQueries) { + t.Errorf("%s.BatchQueries:\n%+v, want\n%+v\n", sbcName, sbc.BatchQueries, wantQueries) + } +} + +func testAsTransactionCount(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, want int) { + t.Helper() + if got, want := sbc.AsTransactionCount.Get(), int64(want); got != want { + t.Errorf("%s.AsTransactionCount: %d, want %d\n", sbcName, got, want) + } +} + +func testQueries(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, wantQueries []*querypb.BoundQuery) { + t.Helper() + if !reflect.DeepEqual(sbc.Queries, wantQueries) { + t.Errorf("%s.Queries:\n%+v, want\n%+v\n", sbcName, sbc.Queries, wantQueries) + } +} + +func testCommitCount(t *testing.T, sbcName string, sbc *sandboxconn.SandboxConn, want int) { + t.Helper() + if got, want := sbc.CommitCount.Get(), int64(want); got != want { + t.Errorf("%s.CommitCount: %d, want %d\n", sbcName, got, want) + } +} + func testNonZeroDuration(t *testing.T, what, d string) { t.Helper() time, _ := strconv.ParseFloat(d, 64) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 15a249c5a16..3f3d0b2f0bd 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -385,7 +385,9 @@ func TestExecutorAutocommit(t *testing.T) { t.Errorf("Commit count: %d, want %d", got, want) } - startCount = sbclookup.CommitCount.Get() + // In the following section, we look at AsTransaction count instead of CommitCount because + // the update results in a single round-trip ExecuteBatch call. + startCount = sbclookup.AsTransactionCount.Get() _, err = executor.Execute(context.Background(), "TestExecute", session, "update main1 set id=1", nil) if err != nil { t.Fatal(err) @@ -394,7 +396,7 @@ func TestExecutorAutocommit(t *testing.T) { if !proto.Equal(session.Session, wantSession) { t.Errorf("autocommit=1: %v, want %v", session.Session, wantSession) } - if got, want := sbclookup.CommitCount.Get(), startCount+1; got != want { + if got, want := sbclookup.AsTransactionCount.Get(), startCount+1; got != want { t.Errorf("Commit count: %d, want %d", got, want) } @@ -407,6 +409,7 @@ func TestExecutorAutocommit(t *testing.T) { } // autocommit = 1, "begin" + session.Reset() startCount = sbclookup.CommitCount.Get() _, err = executor.Execute(context.Background(), "TestExecute", session, "begin", nil) if err != nil { diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index e829296e869..629676cf24e 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -32,11 +32,36 @@ import ( // (the use pattern is 'Find', if not found, then 'Append', // for a single shard) type SafeSession struct { - mu sync.Mutex - mustRollback bool + mu sync.Mutex + mustRollback bool + autocommitState autocommitState *vtgatepb.Session } +// autocommitState keeps track of whether a single round-trip +// commit to vttablet is possible. It starts as autocommitable +// if we started a transaction because of the autocommit flag +// being set. Otherwise, it starts as notAutocommitable. +// If execute is recursively called using the same session, +// like from a vindex, we will already be in a transaction, +// and this should cause the state to become notAutocommitable. +// +// SafeSession lets you request a commit token, which will +// be issued if the state is autocommitable, and ShardSessions +// is empty, implying that no intermediate transactions were started. +// If so, the state transitions to autocommited, which is terminal. +// If the token is succesfully issued, the caller has to perform +// the commit. If a token cannot be issued, then a traditional +// commit has to be performed at the outermost level where +// the autocommitable transition happened. +type autocommitState int + +const ( + notAutocommittable = autocommitState(iota) + autocommittable + autocommitted +) + // NewSafeSession returns a new SafeSession based on the Session func NewSafeSession(sessn *vtgatepb.Session) *SafeSession { return &SafeSession{Session: sessn} @@ -52,6 +77,41 @@ func NewAutocommitSession(sessn *vtgatepb.Session) *SafeSession { return NewSafeSession(newSession) } +// SetAutocommitable sets the state to autocommitable if true. +// Otherwise, it's notAutocommitable. +func (session *SafeSession) SetAutocommitable(flag bool) { + session.mu.Lock() + defer session.mu.Unlock() + + if session.autocommitState == autocommitted { + panic("BUG: SetAutocommitable: unexpected autocommit state") + } + + if flag { + session.autocommitState = autocommittable + } else { + session.autocommitState = notAutocommittable + } +} + +// AutocommitApproval returns true if we can perform a single round-trip +// autocommit. If so, the caller is responsible for commiting their +// transaction. +func (session *SafeSession) AutocommitApproval() bool { + session.mu.Lock() + defer session.mu.Unlock() + + if session.autocommitState == autocommitted { + panic("BUG: AutocommitToken: unexpected autocommit state") + } + + if session.autocommitState == autocommittable && len(session.ShardSessions) == 0 { + session.autocommitState = autocommitted + return true + } + return false +} + // InTransaction returns true if we are in a transaction func (session *SafeSession) InTransaction() bool { if session == nil || session.Session == nil { @@ -81,6 +141,11 @@ func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.T func (session *SafeSession) Append(shardSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error { session.mu.Lock() defer session.mu.Unlock() + + if session.autocommitState == autocommitted { + panic("BUG: SafeSession.Append: unexpected autocommit state") + } + // Always append, in order for rollback to succeed. session.ShardSessions = append(session.ShardSessions, shardSession) if session.isSingleDB(txMode) && len(session.ShardSessions) > 1 { @@ -124,6 +189,8 @@ func (session *SafeSession) Reset() { } session.mu.Lock() defer session.mu.Unlock() + session.mustRollback = false + session.autocommitState = notAutocommittable session.Session.InTransaction = false session.SingleDb = false session.ShardSessions = nil diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index ca17e1b26c1..167102652d8 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -166,6 +166,7 @@ func (stc *ScatterConn) ExecuteMultiShard( tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, + canAutocommit bool, ) (*sqltypes.Result, error) { // mu protects qr @@ -176,6 +177,8 @@ func (stc *ScatterConn) ExecuteMultiShard( shards = append(shards, shard) } + canCommit := len(shards) == 1 && canAutocommit && session.AutocommitApproval() + err := stc.multiGoTransaction( ctx, "Execute", @@ -185,23 +188,25 @@ func (stc *ScatterConn) ExecuteMultiShard( session, notInTransaction, func(target *querypb.Target, shouldBegin bool, transactionID int64) (int64, error) { - var innerqr *sqltypes.Result - var opts *querypb.ExecuteOptions + var ( + innerqr *sqltypes.Result + err error + opts *querypb.ExecuteOptions + ) if session != nil && session.Session != nil { opts = session.Session.Options } - if shouldBegin { - var err error + + switch { + case canCommit: + innerqr, err = stc.executeAutocommit(ctx, target, shardQueries[target.Shard].Sql, shardQueries[target.Shard].BindVariables, opts) + case shouldBegin: innerqr, transactionID, err = stc.gateway.BeginExecute(ctx, target, shardQueries[target.Shard].Sql, shardQueries[target.Shard].BindVariables, opts) - if err != nil { - return transactionID, err - } - } else { - var err error + default: innerqr, err = stc.gateway.Execute(ctx, target, shardQueries[target.Shard].Sql, shardQueries[target.Shard].BindVariables, transactionID, opts) - if err != nil { - return transactionID, err - } + } + if err != nil { + return transactionID, err } mu.Lock() @@ -212,6 +217,20 @@ func (stc *ScatterConn) ExecuteMultiShard( return qr, err } +func (stc *ScatterConn) executeAutocommit(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, error) { + queries := []*querypb.BoundQuery{{ + Sql: sql, + BindVariables: bindVariables, + }} + // ExecuteBatch is a stop-gap because it's the only function that can currently do + // single round-trip commit. + qrs, err := stc.gateway.ExecuteBatch(ctx, target, queries, true /* asTransaction */, 0, options) + if err != nil { + return nil, err + } + return &qrs[0], nil +} + // ExecuteEntityIds executes queries that are shard specific. func (stc *ScatterConn) ExecuteEntityIds( ctx context.Context, diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index b054162372c..a7cd74e44a2 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -57,7 +57,7 @@ func TestScatterConnExecuteMulti(t *testing.T) { shardQueries[shard] = query } - return sc.ExecuteMultiShard(context.Background(), "TestScatterConnExecuteMultiShard", shardQueries, topodatapb.TabletType_REPLICA, nil, false) + return sc.ExecuteMultiShard(context.Background(), "TestScatterConnExecuteMultiShard", shardQueries, topodatapb.TabletType_REPLICA, nil, false, false) }) } @@ -249,7 +249,7 @@ func TestMultiExecs(t *testing.T) { shardQueries[shard] = query } - _, _ = sc.ExecuteMultiShard(context.Background(), "TestMultiExecs", shardQueries, topodatapb.TabletType_REPLICA, nil, false) + _, _ = sc.ExecuteMultiShard(context.Background(), "TestMultiExecs", shardQueries, topodatapb.TabletType_REPLICA, nil, false, false) if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 { t.Fatalf("didn't get expected query") } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index e3d92f74c77..a2e642e22f3 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -110,9 +110,9 @@ func (vc *vcursorImpl) Execute(method string, query string, BindVars map[string] } // ExecuteMultiShard executes different queries on different shards and returns the combined result. -func (vc *vcursorImpl) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML bool) (*sqltypes.Result, error) { +func (vc *vcursorImpl) ExecuteMultiShard(keyspace string, shardQueries map[string]*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, error) { atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(shardQueries))) - qr, err := vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, commentedShardQueries(shardQueries, vc.trailingComments), vc.target.TabletType, vc.safeSession, false) + qr, err := vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, commentedShardQueries(shardQueries, vc.trailingComments), vc.target.TabletType, vc.safeSession, false, canAutocommit) if err == nil { vc.hasPartialDML = true } @@ -127,7 +127,9 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, BindVars map[string]*quer BindVariables: BindVars, }, } - return vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, bq, vc.target.TabletType, NewAutocommitSession(vc.safeSession.Session), false) + // The canAutocommit flag is not significant because we currently don't execute DMLs through ExecuteStandalone. + // But we set it to true for future-proofing this function. + return vc.executor.scatterConn.ExecuteMultiShard(vc.ctx, keyspace, bq, vc.target.TabletType, NewAutocommitSession(vc.safeSession.Session), false, true /* canAutocommit */) } // StreamExeculteMulti is the streaming version of ExecuteMultiShard.