Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d6f23d8
feat: add com query multi implementation along with stream execute an…
GuptaManan100 Mar 25, 2025
265da77
feat: add stream execute multi and execute multi rpc
GuptaManan100 Mar 26, 2025
011b49b
Merge remote-tracking branch 'upstream/main' into split-vtgate
GuptaManan100 Mar 27, 2025
847314b
feat: add a flag to control whether to use the new implementation or not
GuptaManan100 Mar 27, 2025
02c012e
feat: fix error handling so that its sent in the callback
GuptaManan100 Mar 27, 2025
739fb57
test: use the new implementation across all the tests
GuptaManan100 Mar 27, 2025
3492604
Merge remote-tracking branch 'upstream/main' into split-vtgate
GuptaManan100 Apr 1, 2025
c882978
test: add e2e test
GuptaManan100 Apr 1, 2025
1fa0084
feat: fix bug in more flag
GuptaManan100 Apr 1, 2025
bf87df8
feat: add new package to run in e2e setup
GuptaManan100 Apr 1, 2025
40de906
test: fix vtcombo output
GuptaManan100 Apr 1, 2025
4320260
feat: minor refactor
GuptaManan100 Apr 4, 2025
75b0202
refactor: change comment
GuptaManan100 Apr 4, 2025
0372c73
Merge remote-tracking branch 'upstream/main' into split-vtgate
GuptaManan100 Apr 4, 2025
545b749
feat: add vtgateconn implementation and add tests for it
GuptaManan100 Apr 7, 2025
964dfb9
feat: change flag name
GuptaManan100 Apr 7, 2025
2d6f60d
feat: add more tests for invalid cases
GuptaManan100 Apr 7, 2025
0533b09
feat: remove test code that isn't used
GuptaManan100 Apr 8, 2025
40a2485
feat: add unit tests for the connection code and fix bug for not gett…
GuptaManan100 Apr 8, 2025
e2258c2
feat: add tests for plugin implementation
GuptaManan100 Apr 8, 2025
fd56759
feat: add test for empty query
GuptaManan100 Apr 8, 2025
0c8ce28
feat: fix flag output
GuptaManan100 Apr 8, 2025
2119b36
feat: add summary changes
GuptaManan100 Apr 8, 2025
1ef902f
feat: add grpc tests too
GuptaManan100 Apr 8, 2025
a1471f0
Merge remote-tracking branch 'upstream/main' into split-vtgate
GuptaManan100 Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [LAST_INSERT_ID(x)](#last-insert-id)
- [Maximum Idle Connections in the Pool](#max-idle-connections)
- [Filtering Query logs on Error](#query-logs)
- [MultiQuery RPC in vtgate](#multiquery)
- **[Optimization](#optimization)**
- [Prepared Statement](#prepared-statement)
- **[RPC Changes](#rpc-changes)**
Expand Down Expand Up @@ -280,6 +281,14 @@ The `querylog-mode` setting can be configured to `error` to log only queries tha

---

#### <a id="multiquery"/>MultiQuery RPC in vtgate</a>

New RPCs in vtgate have been added that allow users to pass multiple queries in a single sql string. It behaves the same way MySQL does where-in multiple result sets for the queries are returned in the same order as the queries were passed until an error is encountered. The new RPCs are `ExecuteMulti` and `StreamExecuteMulti`.

A new flag `--mysql-server-multi-query-protocol` has also been added that makes the server use this new implementation. This flag is set to `false` by default, so the old implementation is used by default. The new implementation is more efficient and allows for better performance when executing multiple queries in a single RPC call.

---

### <a id="optimization"/>Optimization</a>

#### <a id="prepared-statement"/>Prepared Statement</a>
Expand Down
38 changes: 38 additions & 0 deletions go/cmd/vtgateclienttest/services/callerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"
)

Expand Down Expand Up @@ -104,3 +105,40 @@ func (c *callerIDClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservi
}
return c.fallbackClient.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

// ExecuteMulti is part of the VTGateService interface
func (c *callerIDClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
queries, err := sqlparser.NewTestParser().SplitStatementToPieces(sqlString)
if err != nil {
return session, nil, err
}
var result *sqltypes.Result
for _, query := range queries {
session, result, err = c.Execute(ctx, mysqlCtx, session, query, nil, false)
if err != nil {
return session, qrs, err
}
qrs = append(qrs, result)
}
return session, qrs, nil
}

// StreamExecuteMulti is part of the VTGateService interface
func (c *callerIDClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
queries, err := sqlparser.NewTestParser().SplitStatementToPieces(sqlString)
if err != nil {
return session, err
}
for idx, query := range queries {
firstPacket := true
session, err = c.StreamExecute(ctx, mysqlCtx, session, query, nil, func(result *sqltypes.Result) error {
err = callback(sqltypes.QueryResponse{QueryResult: result}, idx < len(queries)-1, firstPacket)
firstPacket = false
return err
})
if err != nil {
return session, err
}
}
return session, nil
}
12 changes: 12 additions & 0 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ func (c *echoClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservice.M
return c.fallbackClient.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

// ExecuteMulti is part of the VTGateService interface
func (c *echoClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

// StreamExecuteMulti is part of the VTGateService interface
func (c *echoClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

func (c *echoClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) {
if len(sqlList) > 0 && strings.HasPrefix(sqlList[0], EchoPrefix) {
var queryResponse []sqltypes.QueryResponse
Expand Down
12 changes: 12 additions & 0 deletions go/cmd/vtgateclienttest/services/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ func (c *errorClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservice.
return c.fallbackClient.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

// ExecuteMulti is part of the VTGateService interface
func (c *errorClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

// StreamExecuteMulti is part of the VTGateService interface
func (c *errorClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
// Look at https://github.com/vitessio/vitess/pull/18059 for details on how to implement this.
panic("unimplemented")
}

func (c *errorClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string) (*vtgatepb.Session, []*querypb.Field, uint16, error) {
if err := requestToPartialError(sql, session); err != nil {
return session, nil, 0, err
Expand Down
8 changes: 8 additions & 0 deletions go/cmd/vtgateclienttest/services/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func (c fallbackClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservic
return c.fallback.StreamExecute(ctx, mysqlCtx, session, sql, bindVariables, callback)
}

func (c fallbackClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
return c.fallback.ExecuteMulti(ctx, mysqlCtx, session, sqlString)
}

func (c fallbackClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
return c.fallback.StreamExecuteMulti(ctx, mysqlCtx, session, sqlString, callback)
}

func (c fallbackClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string) (*vtgatepb.Session, []*querypb.Field, uint16, error) {
return c.fallback.Prepare(ctx, session, sql)
}
Expand Down
8 changes: 8 additions & 0 deletions go/cmd/vtgateclienttest/services/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ func (c *terminalClient) Prepare(ctx context.Context, session *vtgatepb.Session,
return session, nil, 0, errTerminal
}

func (c *terminalClient) ExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string) (newSession *vtgatepb.Session, qrs []*sqltypes.Result, err error) {
return session, nil, errTerminal
}

func (c *terminalClient) StreamExecuteMulti(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sqlString string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) (*vtgatepb.Session, error) {
return session, errTerminal
}

func (c *terminalClient) CloseSession(ctx context.Context, session *vtgatepb.Session) error {
return errTerminal
}
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ Flags:
--mycnf_tmp_dir string mysql tmp directory
--mysql-server-drain-onterm If set, the server waits for --onterm_timeout for already connected clients to complete their in flight work
--mysql-server-keepalive-period duration TCP period between keep-alives
--mysql-server-multi-query-protocol If set, the server will use the new implementation of handling queries where-in multiple queries are sent together.
--mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers
--mysql-shell-backup-location string location where the backup will be stored
--mysql-shell-dump-flags string flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST (default "{\"threads\": 4}")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ Flags:
--min_number_serving_vttablets int The minimum number of vttablets for each replicating tablet_type (e.g. replica, rdonly) that will be continue to be used even with replication lag above discovery_low_replication_lag, but still below discovery_high_replication_lag_minimum_serving. (default 2)
--mysql-server-drain-onterm If set, the server waits for --onterm_timeout for already connected clients to complete their in flight work
--mysql-server-keepalive-period duration TCP period between keep-alives
--mysql-server-multi-query-protocol If set, the server will use the new implementation of handling queries where-in multiple queries are sent together.
--mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers
--mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections.
--mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static")
Expand Down
154 changes: 154 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync/atomic"
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/bucketpool"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/sqlerror"
Expand All @@ -38,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
)

Expand Down Expand Up @@ -67,6 +70,19 @@ const (
ephemeralRead
)

var (
mysqlMultiQuery = false
)

func registerConnFlags(fs *pflag.FlagSet) {
fs.BoolVar(&mysqlMultiQuery, "mysql-server-multi-query-protocol", mysqlMultiQuery, "If set, the server will use the new implementation of handling queries where-in multiple queries are sent together.")
}

func init() {
servenv.OnParseFor("vtgate", registerConnFlags)
servenv.OnParseFor("vtcombo", registerConnFlags)
}

// A Getter has a Get()
type Getter interface {
Get() *querypb.VTGateCallerID
Expand Down Expand Up @@ -914,6 +930,9 @@ func (c *Conn) handleNextCommand(handler Handler) bool {
res := c.execQuery("use "+sqlescape.EscapeID(db), handler, false)
return res != connErr
case ComQuery:
if mysqlMultiQuery {
return c.handleComQueryMulti(handler, data)
}
return c.handleComQuery(handler, data)
case ComPing:
return c.handleComPing()
Expand Down Expand Up @@ -1279,6 +1298,141 @@ func (c *Conn) handleComPing() bool {
return true
}

// handleComQueryMulti is a newer version of handleComQuery that uses
// the StreamExecuteMulti and ExecuteMulti RPC calls to push the splitting of statements
// down to Vtgate.
func (c *Conn) handleComQueryMulti(handler Handler, data []byte) (kontinue bool) {
c.startWriterBuffering()
defer func() {
if err := c.endWriterBuffering(); err != nil {
log.Errorf("conn %v: flush() failed: %v", c.ID(), err)
kontinue = false
}
}()

queryStart := time.Now()
query := c.parseComQuery(data)
c.recycleReadPacket()

res := c.execQueryMulti(query, handler)
if res != execSuccess {
return res != connErr
}

timings.Record(queryTimingKey, queryStart)
return true
}

// execQueryMulti is a newer version of execQuery that uses
// the StreamExecuteMulti and ExecuteMulti RPC calls to push the splitting of statements
// down to Vtgate.
func (c *Conn) execQueryMulti(query string, handler Handler) execResult {
// needsEndPacket signifies whether we have need to send the last packet to the client
// for a given query. This is used to determine whether we should send an
// end packet after the query is done or not. Initially we don't need to send an end packet
// so we initialize this value to false.
needsEndPacket := false
callbackCalled := false
var res = execSuccess

err := handler.ComQueryMulti(c, query, func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error {
callbackCalled = true
flag := c.StatusFlags
if more {
flag |= ServerMoreResultsExists
}

// firstPacket tells us that this is the start of a new query result.
// If we haven't sent a last packet yet, we should send the end result packet.
if firstPacket && needsEndPacket {
if err := c.writeEndResult(true, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return err
}
}

// We receive execution errors in a query as part of the QueryResponse.
// We check for those errors and send a error packet. If we are unable
// to send the error packet, then there is a connection error too.
if qr.QueryError != nil {
res = execErr
if !c.writeErrorPacketFromErrorAndLog(qr.QueryError) {
res = connErr
}
return nil
}

if firstPacket {
// The first packet signifies the start of a new query result.
// So we reset the needsEndPacket variable to signify we haven't sent the last
// packet for this query.
needsEndPacket = true
if len(qr.QueryResult.Fields) == 0 {

// A successful callback with no fields means that this was a
// DML or other write-only operation.
//
// We should not send any more packets after this, but make sure
// to extract the affected rows and last insert id from the result
// struct here since clients expect it.
ok := PacketOK{
affectedRows: qr.QueryResult.RowsAffected,
lastInsertID: qr.QueryResult.InsertID,
statusFlags: flag,
warnings: handler.WarningCount(c),
info: "",
sessionStateData: qr.QueryResult.SessionStateChanges,
}
needsEndPacket = false
return c.writeOKPacket(&ok)
}

if err := c.writeFields(qr.QueryResult); err != nil {
return err
}
}

return c.writeRows(qr.QueryResult)
})

// If callback was not called, we expect an error.
// It is possible that we don't get a callback if some condition checks
// fail before the query starts execution. In this case, we need to write some
// error back.
if !callbackCalled {
// This is just a failsafe. Should never happen.
if err == nil || err == io.EOF {
err = sqlerror.NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error"))
}
if !c.writeErrorPacketFromErrorAndLog(err) {
return connErr
}
return execErr
}

if res != execSuccess {
// We failed during the stream itself.
return res
}

if err != nil {
// We can't send an error in the middle of a stream.
// All we can do is abort the send, which will cause a 2013.
log.Errorf("Error in the middle of a stream to %s: %v", c, err)
return connErr
}

// If we haven't sent the final packet for the last query, we should send that too.
if needsEndPacket {
if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return connErr
}
}

return execSuccess
}

var errEmptyStatement = sqlerror.NewSQLError(sqlerror.EREmptyQuery, sqlerror.SSClientError, "Query was empty")

func (c *Conn) handleComQuery(handler Handler, data []byte) (kontinue bool) {
Expand Down
Loading
Loading