diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index ef123ae2ff3..2e4166b9b85 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -42,7 +42,6 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/planbuilder" @@ -211,115 +210,21 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st } switch stmtType { - case sqlparser.StmtSelect: - return e.handleExec(ctx, safeSession, sql, bindVars, logStats, stmtType) - case sqlparser.StmtInsert, sqlparser.StmtReplace, sqlparser.StmtUpdate, sqlparser.StmtDelete: - safeSession := safeSession - - mustCommit := false - if safeSession.Autocommit && !safeSession.InTransaction() { - mustCommit = true - if err := e.txConn.Begin(ctx, safeSession); err != nil { - return nil, err - } - // The defer acts as a failsafe. If commit was successful, - // the rollback will be a no-op. - defer e.txConn.Rollback(ctx, safeSession) - } - - // The SetAutocommitable flag should be same as mustCommit. - // If we started a transaction because of autocommit, then mustCommit - // will be true, which means that we can autocommit. If we were already - // in a transaction, it means that the app started it, or we are being - // called recursively. If so, we cannot autocommit because whatever we - // do is likely not final. - // The control flow is such that autocommitable can only be turned on - // at the beginning, but never after. - safeSession.SetAutocommittable(mustCommit) - - qr, err := e.handleExec(ctx, safeSession, sql, bindVars, logStats, stmtType) - if err != nil { - return nil, err - } - - if mustCommit { - commitStart := time.Now() - if err = e.txConn.Commit(ctx, safeSession); err != nil { - return nil, err - } - logStats.CommitTime = time.Since(commitStart) - } - return qr, nil - case sqlparser.StmtDDL: - return e.handleDDL(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats) + case sqlparser.StmtSelect, sqlparser.StmtInsert, sqlparser.StmtReplace, sqlparser.StmtUpdate, + sqlparser.StmtDelete, sqlparser.StmtDDL, sqlparser.StmtUse: + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: not reachable as handled with plan execute") case sqlparser.StmtSet: return e.handleSet(ctx, safeSession, sql, logStats) case sqlparser.StmtShow: return e.handleShow(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats) - case sqlparser.StmtUse: - return e.handleUse(safeSession, sql) case sqlparser.StmtOther: return e.handleOther(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats) case sqlparser.StmtComment: return e.handleComment(sql) - case sqlparser.StmtBegin, sqlparser.StmtCommit, sqlparser.StmtRollback: - return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "should be handled by plan_execute") } return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unrecognized statement: %s", sql) } -func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats, stmtType sqlparser.StatementType) (*sqltypes.Result, error) { - query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.resolver.resolver) - plan, err := e.getPlan( - vcursor, - query, - comments, - bindVars, - skipQueryPlanCache(safeSession), - logStats, - ) - execStart := time.Now() - logStats.PlanTime = execStart.Sub(logStats.StartTime) - - if err != nil { - logStats.Error = err - return nil, err - } - - err = e.addNeededBindVars(plan.BindVarNeeds, bindVars, safeSession) - if err != nil { - logStats.Error = err - return nil, err - } - - qr, err := plan.Instructions.Execute(vcursor, bindVars, true) - logStats.ExecuteTime = time.Since(execStart) - - e.updateQueryCounts(plan.Instructions.RouteType(), plan.Instructions.GetKeyspaceName(), plan.Instructions.GetTableName(), int64(logStats.ShardQueries)) - - var errCount uint64 - if err != nil { - logStats.Error = err - errCount = 1 - } else { - logStats.RowsAffected = qr.RowsAffected - if qr != nil && stmtType == sqlparser.StmtInsert { - safeSession.LastInsertId = qr.InsertID - } - } - - // Check if there was partial DML execution. If so, rollback the transaction. - if err != nil && safeSession.InTransaction() && vcursor.rollbackOnPartialExec { - _ = e.txConn.Rollback(ctx, safeSession) - err = vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction rolled back due to partial DML execution: %v", err) - } - - plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), logStats.RowsAffected, errCount) - - return qr, err -} - // addNeededBindVars adds bind vars that are needed by the plan func (e *Executor) addNeededBindVars(bindVarNeeds sqlparser.BindVarNeeds, bindVars map[string]*querypb.BindVariable, session *SafeSession) error { if bindVarNeeds.NeedDatabase { @@ -353,90 +258,6 @@ func (e *Executor) destinationExec(ctx context.Context, safeSession *SafeSession return e.resolver.Execute(ctx, sql, bindVars, destKeyspace, destTabletType, dest, safeSession, false /* notInTransaction */, safeSession.Options, logStats, false /* canAutocommit */) } -func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) { - // Parse the statement to handle vindex operations - // If the statement failed to be properly parsed, fall through anyway - // to broadcast the ddl to all shards. - query, _ := sqlparser.SplitMarginComments(sql) - stmt, err := sqlparser.Parse(query) - if err != nil { - return nil, err - } - ddl, ok := stmt.(*sqlparser.DDL) - if ok { - execStart := time.Now() - logStats.PlanTime = execStart.Sub(logStats.StartTime) - switch ddl.Action { - case sqlparser.CreateVindexStr, - sqlparser.AddVschemaTableStr, - sqlparser.DropVschemaTableStr, - sqlparser.AddColVindexStr, - sqlparser.DropColVindexStr, - sqlparser.AddSequenceStr, - sqlparser.AddAutoIncStr: - - err := e.handleVSchemaDDL(ctx, destKeyspace, ddl) - logStats.ExecuteTime = time.Since(execStart) - return &sqltypes.Result{}, err - default: - // fallthrough to broadcast the ddl to all shards - } - } - - if destKeyspace == "" { - return nil, errNoKeyspace - } - - if dest == nil { - dest = key.DestinationAllShards{} - } - - execStart := time.Now() - logStats.PlanTime = execStart.Sub(logStats.StartTime) - result, err := e.destinationExec(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats) - logStats.ExecuteTime = time.Since(execStart) - - e.updateQueryCounts("DDL", "", "", int64(logStats.ShardQueries)) - - return result, err -} - -func (e *Executor) handleVSchemaDDL(ctx context.Context, destKeyspace string, ddl *sqlparser.DDL) error { - vschema := e.vm.GetCurrentSrvVschema() - if vschema == nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema not loaded") - } - - allowed := vschemaacl.Authorized(callerid.ImmediateCallerIDFromContext(ctx)) - if !allowed { - return vterrors.Errorf(vtrpcpb.Code_PERMISSION_DENIED, "not authorized to perform vschema operations") - - } - - // Resolve the keyspace either from the table qualifier or the target keyspace - var ksName string - if !ddl.Table.IsEmpty() { - ksName = ddl.Table.Qualifier.String() - } - if ksName == "" { - ksName = destKeyspace - } - if ksName == "" { - return errNoKeyspace - } - - ks := vschema.Keyspaces[ksName] - ks, err := topotools.ApplyVSchemaDDL(ksName, ks, ddl) - - if err != nil { - return err - } - - vschema.Keyspaces[ksName] = ks - - return e.vm.UpdateVSchema(ctx, ksName, vschema) -} - func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) { if destTabletType != topodatapb.TabletType_MASTER { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType) @@ -1119,31 +940,6 @@ func (e *Executor) handleShow(ctx context.Context, safeSession *SafeSession, sql return e.handleOther(ctx, safeSession, sql, bindVars, dest, destKeyspace, destTabletType, logStats) } -func (e *Executor) handleUse(safeSession *SafeSession, sql string) (*sqltypes.Result, error) { - stmt, err := sqlparser.Parse(sql) - if err != nil { - return nil, err - } - use, ok := stmt.(*sqlparser.Use) - if !ok { - // This code is unreachable. - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unrecognized USE statement: %v", sql) - } - destKeyspace, destTabletType, _, err := e.ParseDestinationTarget(use.DBName.String()) - if err != nil { - return nil, err - } - if _, ok := e.VSchema().Keyspaces[destKeyspace]; destKeyspace != "" && !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid keyspace provided: %s", destKeyspace) - } - - if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot change to a non-master type in the middle of a transaction: %v", destTabletType) - } - safeSession.TargetString = use.DBName.String() - return &sqltypes.Result{}, nil -} - func (e *Executor) handleOther(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) { if destKeyspace == "" { return nil, errNoKeyspace