Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 87 additions & 76 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,14 +474,16 @@ func (itc *internalTabletConn) HandlePanic(err *error) {
}

//ReserveBeginExecute is part of the QueryService interface.
func (itc *internalTabletConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, sql string, preQueries []string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
res, transactionID, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, target, sql, preQueries, bindVariables, options)
func (itc *internalTabletConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
res, transactionID, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, target, preQueries, sql, bindVariables, options)
return res, transactionID, reservedID, alias, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

//ReserveBeginExecute is part of the QueryService interface.
func (itc *internalTabletConn) ReserveExecute(ctx context.Context, target *querypb.Target, sql string, preQueries []string, bindVariables map[string]*querypb.BindVariable, txID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
res, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveExecute(ctx, target, sql, preQueries, bindVariables, txID, options)
func (itc *internalTabletConn) ReserveExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
res, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveExecute(ctx, target, preQueries, sql, bindVariables, transactionID, options)
return res, reservedID, alias, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down
3 changes: 3 additions & 0 deletions go/vt/vterrors/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func Aggregate(errors []error) error {
if len(errors) == 0 {
return nil
}
if len(errors) == 1 {
return errors[0]
}
return New(aggregateCodes(errors), aggregateErrors(errors))
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (e *Executor) addNeededBindVars(bindVarNeeds sqlparser.BindVarNeeds, bindVa
}

func (e *Executor) destinationExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) {
return e.resolver.Execute(ctx, sql, bindVars, destKeyspace, destTabletType, dest, safeSession, false /* notInTransaction */, safeSession.Options, logStats, false /* canAutocommit */)
return e.resolver.Execute(ctx, sql, bindVars, destKeyspace, destTabletType, dest, safeSession, safeSession.Options, logStats, false /* canAutocommit */)
}

func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) {
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession
queries[i] = &querypb.BoundQuery{Sql: sql}
}

qr, errs := e.ExecuteMultiShard(ctx, rss, queries, safeSession, false, false)
qr, errs := e.ExecuteMultiShard(ctx, rss, queries, safeSession, false /*autocommit*/)
err := vterrors.Aggregate(errs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1653,8 +1653,8 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession,
}

// ExecuteMultiShard implements the IExecutor interface
func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, rss, queries, session, notInTransaction, autocommit)
func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, rss, queries, session, autocommit)
}

// StreamExecuteMulti implements the IExecutor interface
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestExecutorAutocommit(t *testing.T) {
assert.NotEqual(t, uint64(0), logStats.RowsAffected, "logstats: expected non-zero RowsAffected")

// autocommit = 1, "begin"
session.Reset()
session.ResetTx()
startCount = sbclookup.CommitCount.Get()
_, err = executor.Execute(ctx, "TestExecute", session, "begin", nil)
require.NoError(t, err)
Expand Down
19 changes: 13 additions & 6 deletions go/vt/vtgate/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (res *Resolver) Execute(
tabletType topodatapb.TabletType,
destination key.Destination,
session *SafeSession,
notInTransaction bool,
options *querypb.ExecuteOptions,
logStats *LogStats,
canAutocommit bool,
Expand All @@ -78,17 +77,25 @@ func (res *Resolver) Execute(

autocommit := len(rss) == 1 && canAutocommit && session.AutocommitApproval()

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: sql,
BindVariables: bindVars,
}
}

session.SetOptions(options)

for {
qr, err := res.scatterConn.Execute(
qr, errors := res.scatterConn.ExecuteMultiShard(
ctx,
sql,
bindVars,
rss,
queries,
session,
notInTransaction,
options,
autocommit,
)
err = vterrors.Aggregate(errors)
if isRetryableError(err) {
newRss, err := res.resolver.ResolveDestination(ctx, keyspace, tabletType, destination)
if err != nil {
Expand Down
102 changes: 86 additions & 16 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// SafeSession is a mutex-protected version of the Session.
// It is thread-safe if each thread only accesses one shard.
// (the use pattern is 'Find', if not found, then 'Append',
// (the use pattern is 'Find', if not found, then 'AppendOrUpdate',
// for a single shard)
type SafeSession struct {
mu sync.Mutex
Expand Down Expand Up @@ -85,18 +85,35 @@ func NewAutocommitSession(sessn *vtgatepb.Session) *SafeSession {
return NewSafeSession(newSession)
}

// ResetTx clears the session
func (session *SafeSession) ResetTx() {
session.mu.Lock()
defer session.mu.Unlock()
session.mustRollback = false
session.autocommitState = notAutocommittable
session.Session.InTransaction = false
session.commitOrder = vtgatepb.CommitOrder_NORMAL
session.Savepoints = nil
if !session.Session.InReservedConn {
session.ShardSessions = nil
session.PreSessions = nil
session.PostSessions = nil
}
}

// Reset clears the session
func (session *SafeSession) Reset() {
session.mu.Lock()
defer session.mu.Unlock()
session.mustRollback = false
session.autocommitState = notAutocommittable
session.Session.InTransaction = false
session.commitOrder = vtgatepb.CommitOrder_NORMAL
session.Savepoints = nil
session.ShardSessions = nil
session.PreSessions = nil
session.PostSessions = nil
session.commitOrder = vtgatepb.CommitOrder_NORMAL
session.Savepoints = nil
session.Session.InReservedConn = false
}

// SetAutocommittable sets the state to autocommitable if true.
Expand Down Expand Up @@ -151,7 +168,7 @@ func (session *SafeSession) InTransaction() bool {
}

// Find returns the transactionId and tabletAlias, if any, for a session
func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.TabletType) (transactionID int64, alias *topodatapb.TabletAlias) {
func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.TabletType) (transactionID int64, reservedID int64, alias *topodatapb.TabletAlias) {
session.mu.Lock()
defer session.mu.Unlock()
sessions := session.ShardSessions
Expand All @@ -163,41 +180,80 @@ func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.T
}
for _, shardSession := range sessions {
if keyspace == shardSession.Target.Keyspace && tabletType == shardSession.Target.TabletType && shard == shardSession.Target.Shard {
return shardSession.TransactionId, shardSession.TabletAlias
return shardSession.TransactionId, shardSession.ReservedId, shardSession.TabletAlias
}
}
return 0, 0, nil
}

func addOrUpdate(shardSession *vtgatepb.Session_ShardSession, sessions []*vtgatepb.Session_ShardSession) ([]*vtgatepb.Session_ShardSession, error) {
appendSession := true
for i, sess := range sessions {
targetedAtSameTablet := sess.Target.Keyspace == shardSession.Target.Keyspace &&
sess.Target.TabletType == shardSession.Target.TabletType &&
sess.Target.Shard == shardSession.Target.Shard
if targetedAtSameTablet {
if sess.TabletAlias != shardSession.TabletAlias {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "got a different alias for the same target")
}
// replace the old info with the new one
sessions[i] = shardSession
appendSession = false
break
}
}
return 0, nil
if appendSession {
sessions = append(sessions, shardSession)
}

return sessions, nil
}

// Append adds a new ShardSession
func (session *SafeSession) Append(shardSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error {
// AppendOrUpdate adds a new ShardSession, or updates an existing one if one already exists for the given shard session
func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error {
session.mu.Lock()
defer session.mu.Unlock()

if session.autocommitState == autocommitted {
// Unreachable.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.Append: unexpected autocommit state")
// Should be unreachable
return vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.AppendOrUpdate: unexpected autocommit state")
}
if !session.Session.InTransaction {
// Unreachable.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.Append: not in transaction")
if !(session.Session.InTransaction || session.Session.InReservedConn) {
// Should be unreachable
return vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.AppendOrUpdate: not in transaction and not in reserved connection")
}
session.autocommitState = notAutocommittable

// Always append, in order for rollback to succeed.
switch session.commitOrder {
case vtgatepb.CommitOrder_NORMAL:
session.ShardSessions = append(session.ShardSessions, shardSession)
newSessions, err := addOrUpdate(shardSession, session.ShardSessions)
if err != nil {
return err
}
session.ShardSessions = newSessions
// isSingle is enforced only for normmal commit order operations.
if session.isSingleDB(txMode) && len(session.ShardSessions) > 1 {
session.mustRollback = true
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "multi-db transaction attempted: %v", session.ShardSessions)
}
case vtgatepb.CommitOrder_PRE:
session.PreSessions = append(session.PreSessions, shardSession)
newSessions, err := addOrUpdate(shardSession, session.PreSessions)
if err != nil {
return err
}
session.PreSessions = newSessions
case vtgatepb.CommitOrder_POST:
session.PostSessions = append(session.PostSessions, shardSession)
newSessions, err := addOrUpdate(shardSession, session.PostSessions)
if err != nil {
return err
}
session.PostSessions = newSessions
default:
// Should be unreachable
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.AppendOrUpdate: unexpected commitOrder")
}

return nil
}

Expand Down Expand Up @@ -264,9 +320,23 @@ func (session *SafeSession) SetSystemVariable(name string, expr string) {
session.SystemVariables[name] = expr
}

//SetOptions sets the options
func (session *SafeSession) SetOptions(options *querypb.ExecuteOptions) {
session.mu.Lock()
defer session.mu.Unlock()
session.Options = options
}

//StoreSavepoint stores the savepoint and release savepoint queries in the session
func (session *SafeSession) StoreSavepoint(sql string) {
session.mu.Lock()
defer session.mu.Unlock()
session.Savepoints = append(session.Savepoints, sql)
}

//InReservedConn returns true if the session needs to execute on a dedicated connection
func (session *SafeSession) InReservedConn() bool {
session.mu.Lock()
defer session.mu.Unlock()
return session.Session.InReservedConn
}
48 changes: 48 additions & 0 deletions go/vt/vtgate/safe_session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vtgate

import (
"testing"

"github.com/stretchr/testify/require"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

func TestFailToMultiShardWhenSetToSingleDb(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{
InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE,
})

sess0 := &vtgatepb.Session_ShardSession{
Target: &querypb.Target{Keyspace: "keyspace", Shard: "0"},
TabletAlias: &topodatapb.TabletAlias{Cell: "cell", Uid: 0},
TransactionId: 1,
}
sess1 := &vtgatepb.Session_ShardSession{
Target: &querypb.Target{Keyspace: "keyspace", Shard: "1"},
TabletAlias: &topodatapb.TabletAlias{Cell: "cell", Uid: 1},
TransactionId: 1,
}

err := session.AppendOrUpdate(sess0, vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
err = session.AppendOrUpdate(sess1, vtgatepb.TransactionMode_SINGLE)
require.Error(t, err)
}
Loading