Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,7 @@ func (e *Executor) Execute(ctx context.Context, method string, safeSession *Safe
warnings.Add("ResultsExceeded", 1)
}

// The mysql plugin runs an implicit rollback whenever a connection closes.
// To avoid spamming the log with no-op rollback records, ignore it if
// it was a no-op record (i.e. didn't issue any queries)
if !(logStats.StmtType == "ROLLBACK" && logStats.ShardQueries == 0) {
logStats.Send()
}
logStats.Send()
return result, err
}

Expand Down Expand Up @@ -308,11 +303,17 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession,
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries))
err := e.txConn.Rollback(ctx, safeSession)
err := e.CloseSession(ctx, safeSession)
logStats.CommitTime = time.Since(execStart)
return &sqltypes.Result{}, err
}

// CloseSession closes the current transaction, if any. It is called both for explicit "rollback"
// statements and implicitly when the mysql server closes the connection.
func (e *Executor) CloseSession(ctx context.Context, safeSession *SafeSession) error {
return e.txConn.Rollback(ctx, safeSession)
}

func (e *Executor) handleSet(ctx context.Context, safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) {
stmt, err := sqlparser.Parse(sql)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func TestExecutorTransactionsNoAutoCommit(t *testing.T) {
t.Errorf("logstats: expected non-zero CommitTime")
}

// rollback doesn't emit a logstats record when it doesn't do anything
_, err = executor.Execute(context.Background(), "TestExecute", session, "rollback", nil)
// CloseSession doesn't log anything
err = executor.CloseSession(context.Background(), session)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/plan_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func TestPlanExecutorTransactionsNoAutoCommit(t *testing.T) {
t.Errorf("logstats: expected non-zero CommitTime")
}

// rollback doesn't emit a logstats record when it doesn't do anything
_, err = executor.Execute(context.Background(), "TestExecute", session, "rollback", nil)
// CloseSession doesn't log anything
err = executor.CloseSession(context.Background(), session)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (vh *vtgateHandler) ComResetConnection(c *mysql.Conn) {
if session.InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}
_, _, err := vh.vtg.Execute(ctx, session, "rollback", make(map[string]*querypb.BindVariable))
err := vh.vtg.CloseSession(ctx, session)
if err != nil {
log.Errorf("Error happened in transaction rollback: %v", err)
}
Expand All @@ -134,7 +134,7 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
if session.InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}
_, _, _ = vh.vtg.Execute(ctx, session, "rollback", make(map[string]*querypb.BindVariable))
_ = vh.vtg.CloseSession(ctx, session)
}

// Regexp to extract parent span id over the sql query
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ handleError:
return nil
}

// CloseSession closes the session, rolling back any implicit transactions. This has the
// same effect as if a "rollback" statement was executed, but does not affect the query
// statistics.
func (vtg *VTGate) CloseSession(ctx context.Context, session *vtgatepb.Session) error {
return vtg.executor.CloseSession(ctx, NewSafeSession(session))
}

// ResolveTransaction resolves the specified 2PC transaction.
func (vtg *VTGate) ResolveTransaction(ctx context.Context, dtid string) error {
return formatError(vtg.txConn.Resolve(ctx, dtid))
Expand Down