From b045db61c6bd0bd017c80f72bcec066266ed2744 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 16 Jul 2020 16:59:12 +0200 Subject: [PATCH 1/2] Close reserved connections when client hangs up When a client to the Vite MySQL service port is closed, we used to just need to rollback and open transactions. Now we need to also remember to close any reserved connections. Signed-off-by: Andres Taylor --- go/vt/vtgate/executor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index caf86f8bfa8..896922cc842 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -299,7 +299,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats.PlanTime = execStart.Sub(logStats.StartTime) logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries)) - err := e.CloseSession(ctx, safeSession) + err := e.txConn.Rollback(ctx, safeSession) logStats.CommitTime = time.Since(execStart) return &sqltypes.Result{}, err } @@ -342,10 +342,10 @@ func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession return qr, nil } -// CloseSession closes the current transaction, if any. It is called both for explicit "rollback" -// statements and implicitly when the mysql server closes the connection. +// CloseSession releases the current connection, which rollbacks open transactions and closes reserved connections. +// It is called then the MySQL servers closes the connection to its client. func (e *Executor) CloseSession(ctx context.Context, safeSession *SafeSession) error { - return e.txConn.Rollback(ctx, safeSession) + return e.txConn.Release(ctx, safeSession) } func (e *Executor) handleSet(ctx context.Context, safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) { From 48cdd3799bd9d298d9b0ba8e4d9e2ad9963c7474 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 17 Jul 2020 20:41:30 +0530 Subject: [PATCH 2/2] implements queryservice server interface Signed-off-by: Harshit Gangal --- go/vt/vttablet/grpcqueryservice/server.go | 65 ++++++++++++++++++++--- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go index eb9f375abd7..298cdfe91f7 100644 --- a/go/vt/vttablet/grpcqueryservice/server.go +++ b/go/vt/vttablet/grpcqueryservice/server.go @@ -377,18 +377,71 @@ func (q *query) VStreamResults(request *binlogdatapb.VStreamResultsRequest, stre } //ReserveExecute implements the QueryServer interface -func (q *query) ReserveExecute(ctx context.Context, request *querypb.ReserveExecuteRequest) (*querypb.ReserveExecuteResponse, error) { - panic("implement me") +func (q *query) ReserveExecute(ctx context.Context, request *querypb.ReserveExecuteRequest) (response *querypb.ReserveExecuteResponse, err error) { + defer q.server.HandlePanic(&err) + ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx), + request.EffectiveCallerId, + request.ImmediateCallerId, + ) + result, reservedID, alias, err := q.server.ReserveExecute(ctx, request.Target, request.PreQueries, request.Query.Sql, request.Query.BindVariables, request.TransactionId, request.Options) + if err != nil { + // if we have a valid reservedID, return the error in-band + if reservedID != 0 { + return &querypb.ReserveExecuteResponse{ + Error: vterrors.ToVTRPC(err), + ReservedId: reservedID, + TabletAlias: alias, + }, nil + } + return nil, vterrors.ToGRPC(err) + } + return &querypb.ReserveExecuteResponse{ + Result: sqltypes.ResultToProto3(result), + ReservedId: reservedID, + TabletAlias: alias, + }, nil } //ReserveBeginExecute implements the QueryServer interface -func (q *query) ReserveBeginExecute(ctx context.Context, request *querypb.ReserveBeginExecuteRequest) (*querypb.ReserveBeginExecuteResponse, error) { - panic("implement me") +func (q *query) ReserveBeginExecute(ctx context.Context, request *querypb.ReserveBeginExecuteRequest) (response *querypb.ReserveBeginExecuteResponse, err error) { + defer q.server.HandlePanic(&err) + ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx), + request.EffectiveCallerId, + request.ImmediateCallerId, + ) + result, transactionID, reservedID, alias, err := q.server.ReserveBeginExecute(ctx, request.Target, request.PreQueries, request.Query.Sql, request.Query.BindVariables, request.Options) + if err != nil { + // if we have a valid reservedID, return the error in-band + if reservedID != 0 { + return &querypb.ReserveBeginExecuteResponse{ + Error: vterrors.ToVTRPC(err), + TransactionId: transactionID, + ReservedId: reservedID, + TabletAlias: alias, + }, nil + } + return nil, vterrors.ToGRPC(err) + } + return &querypb.ReserveBeginExecuteResponse{ + Result: sqltypes.ResultToProto3(result), + TransactionId: transactionID, + ReservedId: reservedID, + TabletAlias: alias, + }, nil } //Release implements the QueryServer interface -func (q *query) Release(ctx context.Context, request *querypb.ReleaseRequest) (*querypb.ReleaseResponse, error) { - panic("implement me") +func (q *query) Release(ctx context.Context, request *querypb.ReleaseRequest) (response *querypb.ReleaseResponse, err error) { + defer q.server.HandlePanic(&err) + ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx), + request.EffectiveCallerId, + request.ImmediateCallerId, + ) + err = q.server.Release(ctx, request.Target, request.TransactionId, request.ReservedId) + if err != nil { + return nil, vterrors.ToGRPC(err) + } + return &querypb.ReleaseResponse{}, nil } // Register registers the implementation on the provide gRPC Server.