Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 3 additions & 207 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
Expand Down Expand Up @@ -211,115 +210,21 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st
}

switch stmtType {
case sqlparser.StmtSelect:
return e.handleExec(ctx, safeSession, sql, bindVars, logStats, stmtType)
case sqlparser.StmtInsert, sqlparser.StmtReplace, sqlparser.StmtUpdate, sqlparser.StmtDelete:
safeSession := safeSession

mustCommit := false
if safeSession.Autocommit && !safeSession.InTransaction() {
mustCommit = true
if err := e.txConn.Begin(ctx, safeSession); err != nil {
return nil, err
}
// The defer acts as a failsafe. If commit was successful,
// the rollback will be a no-op.
defer e.txConn.Rollback(ctx, safeSession)
}

// The SetAutocommitable flag should be same as mustCommit.
// If we started a transaction because of autocommit, then mustCommit
// will be true, which means that we can autocommit. If we were already
// in a transaction, it means that the app started it, or we are being
// called recursively. If so, we cannot autocommit because whatever we
// do is likely not final.
// The control flow is such that autocommitable can only be turned on
// at the beginning, but never after.
safeSession.SetAutocommittable(mustCommit)

qr, err := e.handleExec(ctx, safeSession, sql, bindVars, logStats, stmtType)
if err != nil {
return nil, err
}

if mustCommit {
commitStart := time.Now()
if err = e.txConn.Commit(ctx, safeSession); err != nil {
return nil, err
}
logStats.CommitTime = time.Since(commitStart)
}
return qr, nil
case sqlparser.StmtDDL:
return e.handleDDL(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
case sqlparser.StmtSelect, sqlparser.StmtInsert, sqlparser.StmtReplace, sqlparser.StmtUpdate,
sqlparser.StmtDelete, sqlparser.StmtDDL, sqlparser.StmtUse:
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: not reachable as handled with plan execute")
case sqlparser.StmtSet:
return e.handleSet(ctx, safeSession, sql, logStats)
case sqlparser.StmtShow:
return e.handleShow(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
case sqlparser.StmtUse:
return e.handleUse(safeSession, sql)
case sqlparser.StmtOther:
return e.handleOther(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
case sqlparser.StmtComment:
return e.handleComment(sql)
case sqlparser.StmtBegin, sqlparser.StmtCommit, sqlparser.StmtRollback:
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "should be handled by plan_execute")
}
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unrecognized statement: %s", sql)
}

func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats, stmtType sqlparser.StatementType) (*sqltypes.Result, error) {
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.resolver.resolver)
plan, err := e.getPlan(
vcursor,
query,
comments,
bindVars,
skipQueryPlanCache(safeSession),
logStats,
)
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)

if err != nil {
logStats.Error = err
return nil, err
}

err = e.addNeededBindVars(plan.BindVarNeeds, bindVars, safeSession)
if err != nil {
logStats.Error = err
return nil, err
}

qr, err := plan.Instructions.Execute(vcursor, bindVars, true)
logStats.ExecuteTime = time.Since(execStart)

e.updateQueryCounts(plan.Instructions.RouteType(), plan.Instructions.GetKeyspaceName(), plan.Instructions.GetTableName(), int64(logStats.ShardQueries))

var errCount uint64
if err != nil {
logStats.Error = err
errCount = 1
} else {
logStats.RowsAffected = qr.RowsAffected
if qr != nil && stmtType == sqlparser.StmtInsert {
safeSession.LastInsertId = qr.InsertID
}
}

// Check if there was partial DML execution. If so, rollback the transaction.
if err != nil && safeSession.InTransaction() && vcursor.rollbackOnPartialExec {
_ = e.txConn.Rollback(ctx, safeSession)
err = vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction rolled back due to partial DML execution: %v", err)
}

plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), logStats.RowsAffected, errCount)

return qr, err
}

// addNeededBindVars adds bind vars that are needed by the plan
func (e *Executor) addNeededBindVars(bindVarNeeds sqlparser.BindVarNeeds, bindVars map[string]*querypb.BindVariable, session *SafeSession) error {
if bindVarNeeds.NeedDatabase {
Expand Down Expand Up @@ -353,90 +258,6 @@ func (e *Executor) destinationExec(ctx context.Context, safeSession *SafeSession
return e.resolver.Execute(ctx, sql, bindVars, destKeyspace, destTabletType, dest, safeSession, false /* notInTransaction */, safeSession.Options, logStats, false /* canAutocommit */)
}

func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) {
// Parse the statement to handle vindex operations
// If the statement failed to be properly parsed, fall through anyway
// to broadcast the ddl to all shards.
query, _ := sqlparser.SplitMarginComments(sql)
stmt, err := sqlparser.Parse(query)
if err != nil {
return nil, err
}
ddl, ok := stmt.(*sqlparser.DDL)
if ok {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
switch ddl.Action {
case sqlparser.CreateVindexStr,
sqlparser.AddVschemaTableStr,
sqlparser.DropVschemaTableStr,
sqlparser.AddColVindexStr,
sqlparser.DropColVindexStr,
sqlparser.AddSequenceStr,
sqlparser.AddAutoIncStr:

err := e.handleVSchemaDDL(ctx, destKeyspace, ddl)
logStats.ExecuteTime = time.Since(execStart)
return &sqltypes.Result{}, err
default:
// fallthrough to broadcast the ddl to all shards
}
}

if destKeyspace == "" {
return nil, errNoKeyspace
}

if dest == nil {
dest = key.DestinationAllShards{}
}

execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
result, err := e.destinationExec(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
logStats.ExecuteTime = time.Since(execStart)

e.updateQueryCounts("DDL", "", "", int64(logStats.ShardQueries))

return result, err
}

func (e *Executor) handleVSchemaDDL(ctx context.Context, destKeyspace string, ddl *sqlparser.DDL) error {
vschema := e.vm.GetCurrentSrvVschema()
if vschema == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema not loaded")
}

allowed := vschemaacl.Authorized(callerid.ImmediateCallerIDFromContext(ctx))
if !allowed {
return vterrors.Errorf(vtrpcpb.Code_PERMISSION_DENIED, "not authorized to perform vschema operations")

}

// Resolve the keyspace either from the table qualifier or the target keyspace
var ksName string
if !ddl.Table.IsEmpty() {
ksName = ddl.Table.Qualifier.String()
}
if ksName == "" {
ksName = destKeyspace
}
if ksName == "" {
return errNoKeyspace
}

ks := vschema.Keyspaces[ksName]
ks, err := topotools.ApplyVSchemaDDL(ksName, ks, ddl)

if err != nil {
return err
}

vschema.Keyspaces[ksName] = ks

return e.vm.UpdateVSchema(ctx, ksName, vschema)
}

func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) {
if destTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType)
Expand Down Expand Up @@ -1119,31 +940,6 @@ func (e *Executor) handleShow(ctx context.Context, safeSession *SafeSession, sql
return e.handleOther(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats)
}

func (e *Executor) handleUse(safeSession *SafeSession, sql string) (*sqltypes.Result, error) {
stmt, err := sqlparser.Parse(sql)
if err != nil {
return nil, err
}
use, ok := stmt.(*sqlparser.Use)
if !ok {
// This code is unreachable.
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unrecognized USE statement: %v", sql)
}
destKeyspace, destTabletType, _, err := e.ParseDestinationTarget(use.DBName.String())
if err != nil {
return nil, err
}
if _, ok := e.VSchema().Keyspaces[destKeyspace]; destKeyspace != "" && !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid keyspace provided: %s", destKeyspace)
}

if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot change to a non-master type in the middle of a transaction: %v", destTabletType)
}
safeSession.TargetString = use.DBName.String()
return &sqltypes.Result{}, nil
}

func (e *Executor) handleOther(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) {
if destKeyspace == "" {
return nil, errNoKeyspace
Expand Down