Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [LAST_INSERT_ID(x)](#last-insert-id)
- [Maximum Idle Connections in the Pool](#max-idle-connections)
- [Filtering Query logs on Error](#query-logs)
- [MultiQuery RPC in vtgate](#multiquery)
- **[Optimization](#optimization)**
- [Prepared Statement](#prepared-statement)
- **[RPC Changes](#rpc-changes)**
Expand Down Expand Up @@ -266,6 +267,14 @@ The `querylog-mode` setting can be configured to `error` to log only queries tha

---

#### <a id="multiquery"/>MultiQuery RPC in vtgate</a>

New RPCs in vtgate have been added that allow users to pass multiple queries in a single sql string. It behaves the same way MySQL does where-in multiple result sets for the queries are returned in the same order as the queries were passed until an error is encountered. The new RPCs are `ExecuteMulti` and `StreamExecuteMulti`.

A new flag `--mysql-server-multi-query-protocol` has also been added that makes the server use this new implementation. This flag is set to `false` by default, so the old implementation is used by default. The new implementation is more efficient and allows for better performance when executing multiple queries in a single RPC call.

---

### <a id="optimization"/>Optimization</a>

#### <a id="prepared-statement"/>Prepared Statement</a>
Expand Down
38 changes: 38 additions & 0 deletions go/cmd/vtgateclienttest/services/callerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"
)

Expand Down Expand Up @@ -104,3 +105,40 @@ func (c *callerIDClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservi
}
return c.fallbackClient.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

// ExecuteMulti is part of the VTGateService interface
func (c *callerIDClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
queries, err := sqlparser.NewTestParser().SplitStatementToPieces(sqlString)
if err != nil {
return session, nil, err
}
var result *sqltypes.Result
for _, query := range queries {
session, result, err = c.Execute(ctx, mysqlCtx, session, query, nil, false)
if err != nil {
return session, qrs, err
}
qrs = append(qrs, result)
}
return session, qrs, nil
}

// StreamExecuteMulti is part of the VTGateService interface
func (c *callerIDClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
queries, err := sqlparser.NewTestParser().SplitStatementToPieces(sqlString)
if err != nil {
return session, err
}
for idx, query := range queries {
firstPacket := true
session, err = c.StreamExecute(ctx, mysqlCtx, session, query, nil, func(result *sqltypes.Result) error {
err = callback(sqltypes.QueryResponse{QueryResult: result}, idx < len(queries)-1, firstPacket)
firstPacket = false
return err
})
if err != nil {
return session, err
}
}
return session, nil
}
12 changes: 12 additions & 0 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ func (c *echoClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservice.M
return c.fallbackClient.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

// ExecuteMulti is part of the VTGateService interface
func (c *echoClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

// StreamExecuteMulti is part of the VTGateService interface
func (c *echoClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

func (c *echoClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) {
if len(sqlList) > 0 && strings.HasPrefix(sqlList[0], EchoPrefix) {
var queryResponse []sqltypes.QueryResponse
Expand Down
12 changes: 12 additions & 0 deletions go/cmd/vtgateclienttest/services/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ func (c *errorClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservice.
return c.fallbackClient.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

// ExecuteMulti is part of the VTGateService interface
func (c *errorClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

// StreamExecuteMulti is part of the VTGateService interface
func (c *errorClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

func (c *errorClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string) (*vtgatepb.Session, []*querypb.Field, uint16, error) {
if err := requestToPartialError(sql, session); err != nil {
return session, nil, 0, err
Expand Down
8 changes: 8 additions & 0 deletions go/cmd/vtgateclienttest/services/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func (c fallbackClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservic
return c.fallback.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

func (c fallbackClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
return c.fallback.ExecuteMulti(ctx, mysqlCtx, session, sqlString)
}

func (c fallbackClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
return c.fallback.StreamExecuteMulti(ctx, mysqlCtx, session, sqlString, callback)
}

func (c fallbackClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string) (*vtgatepb.Session, []*querypb.Field, uint16, error) {
return c.fallback.Prepare(ctx, session, sql)
}
Expand Down
8 changes: 8 additions & 0 deletions go/cmd/vtgateclienttest/services/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ func (c *terminalClient) Prepare(ctx context.Context, session *vtgatepb.Session,
return session, nil, 0, errTerminal
}

func (c *terminalClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
return session, nil, errTerminal
}

func (c *terminalClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
return session, errTerminal
}

func (c *terminalClient) CloseSession(ctx context.Context, session *vtgatepb.Session) error {
return errTerminal
}
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ Flags:
--mycnf_tmp_dir string mysql tmp directory
--mysql-server-drain-onterm If set, the server waits for --onterm_timeout for already connected clients to complete their in flight work
--mysql-server-keepalive-period duration TCP period between keep-alives
--mysql-server-multi-query-protocol If set, the server will use the new implementation of handling queries where-in multiple queries are sent together.
--mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers
--mysql-shell-backup-location string location where the backup will be stored
--mysql-shell-dump-flags string flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST (default "{\"threads\": 4}")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ Flags:
--min_number_serving_vttablets int The minimum number of vttablets for each replicating tablet_type (e.g. replica, rdonly) that will be continue to be used even with replication lag above discovery_low_replication_lag, but still below discovery_high_replication_lag_minimum_serving. (default 2)
--mysql-server-drain-onterm If set, the server waits for --onterm_timeout for already connected clients to complete their in flight work
--mysql-server-keepalive-period duration TCP period between keep-alives
--mysql-server-multi-query-protocol If set, the server will use the new implementation of handling queries where-in multiple queries are sent together.
--mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers
--mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections.
--mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static")
Expand Down
154 changes: 154 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync/atomic"
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/bucketpool"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/sqlerror"
Expand All @@ -38,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
)

Expand Down Expand Up @@ -67,6 +70,19 @@ const (
ephemeralRead
)

var (
mysqlMultiQuery = false
)

func registerConnFlags(fs *pflag.FlagSet) {
fs.BoolVar(&mysqlMultiQuery, "mysql-server-multi-query-protocol", mysqlMultiQuery, "If set, the server will use the new implementation of handling queries where-in multiple queries are sent together.")
}

func init() {
servenv.OnParseFor("vtgate", registerConnFlags)
servenv.OnParseFor("vtcombo", registerConnFlags)
}

// A Getter has a Get()
type Getter interface {
Get() *querypb.VTGateCallerID
Expand Down Expand Up @@ -914,6 +930,9 @@ func (c *Conn) handleNextCommand(handler Handler) bool {
res := c.execQuery("use "+sqlescape.EscapeID(db), handler, false)
return res != connErr
case ComQuery:
if mysqlMultiQuery {
return c.handleComQueryMulti(handler, data)
}
return c.handleComQuery(handler, data)
case ComPing:
return c.handleComPing()
Expand Down Expand Up @@ -1279,6 +1298,141 @@ func (c *Conn) handleComPing() bool {
return true
}

// handleComQueryMulti is a newer version of handleComQuery that uses
// the StreamExecuteMulti and ExecuteMulti RPC calls to push the splitting of statements
// down to Vtgate.
func (c *Conn) handleComQueryMulti(handler Handler, data []byte) (kontinue bool) {
c.startWriterBuffering()
defer func() {
if err := c.endWriterBuffering(); err != nil {
log.Errorf("conn %v: flush() failed: %v", c.ID(), err)
kontinue = false
}
}()

queryStart := time.Now()
query := c.parseComQuery(data)
c.recycleReadPacket()

res := c.execQueryMulti(query, handler)
if res != execSuccess {
return res != connErr
}

timings.Record(queryTimingKey, queryStart)
return true
}

// execQueryMulti is a newer version of execQuery that uses
// the StreamExecuteMulti and ExecuteMulti RPC calls to push the splitting of statements
// down to Vtgate.
func (c *Conn) execQueryMulti(query string, handler Handler) execResult {
// needsEndPacket signifies whether we have need to send the last packet to the client
// for a given query. This is used to determine whether we should send an
// end packet after the query is done or not. Initially we don't need to send an end packet
// so we initialize this value to false.
needsEndPacket := false
callbackCalled := false
var res = execSuccess

err := handler.ComQueryMulti(c, query, func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error {
callbackCalled = true
flag := c.StatusFlags
if more {
flag |= ServerMoreResultsExists
}

// firstPacket tells us that this is the start of a new query result.
// If we haven't sent a last packet yet, we should send the end result packet.
if firstPacket && needsEndPacket {
if err := c.writeEndResult(true, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return err
}
}

// We receive execution errors in a query as part of the QueryResponse.
// We check for those errors and send a error packet. If we are unable
// to send the error packet, then there is a connection error too.
if qr.QueryError != nil {
res = execErr
if !c.writeErrorPacketFromErrorAndLog(qr.QueryError) {
res = connErr
}
return nil
}

if firstPacket {
// The first packet signifies the start of a new query result.
// So we reset the needsEndPacket variable to signify we haven't sent the last
// packet for this query.
needsEndPacket = true
if len(qr.QueryResult.Fields) == 0 {

// A successful callback with no fields means that this was a
// DML or other write-only operation.
//
// We should not send any more packets after this, but make sure
// to extract the affected rows and last insert id from the result
// struct here since clients expect it.
ok := PacketOK{
affectedRows: qr.QueryResult.RowsAffected,
lastInsertID: qr.QueryResult.InsertID,
statusFlags: flag,
warnings: handler.WarningCount(c),
info: "",
sessionStateData: qr.QueryResult.SessionStateChanges,
}
needsEndPacket = false
return c.writeOKPacket(&ok)
}

if err := c.writeFields(qr.QueryResult); err != nil {
return err
}
}

return c.writeRows(qr.QueryResult)
})

// If callback was not called, we expect an error.
// It is possible that we don't get a callback if some condition checks
// fail before the query starts execution. In this case, we need to write some
// error back.
if !callbackCalled {
// This is just a failsafe. Should never happen.
if err == nil || err == io.EOF {
err = sqlerror.NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error"))
}
if !c.writeErrorPacketFromErrorAndLog(err) {
return connErr
}
return execErr
}

if res != execSuccess {
// We failed during the stream itself.
return res
}

if err != nil {
// We can't send an error in the middle of a stream.
// All we can do is abort the send, which will cause a 2013.
log.Errorf("Error in the middle of a stream to %s: %v", c, err)
return connErr
}

// If we haven't sent the final packet for the last query, we should send that too.
if needsEndPacket {
if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return connErr
}
}

return execSuccess
}

var errEmptyStatement = sqlerror.NewSQLError(sqlerror.EREmptyQuery, sqlerror.SSClientError, "Query was empty")

func (c *Conn) handleComQuery(handler Handler, data []byte) (kontinue bool) {
Expand Down
Loading
Loading