Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 61 additions & 63 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,7 @@ func (c *Conn) execQuery(ctx context.Context, query string, handler Handler, mul
// requests to write the rows from the result set.
func (c *Conn) execPrepareStatement(ctx context.Context, stmtID uint32, cursorType byte, handler Handler) error {
prepare := c.PrepareData[stmtID]
if cursorType == NoCursor {
if cursorType == NoCursor || cursorType == ParameterCountAvailable {
fieldSent := false
sendFinished := false // sendFinished is set if the response should just be an OK packet.

Expand Down Expand Up @@ -1664,74 +1664,72 @@ func (c *Conn) execPrepareStatement(ctx context.Context, stmtID uint32, cursorTy
}

return nil
} else {
next := make(chan *sqltypes.Result)
done, quit := make(chan error), make(chan error)
}
next := make(chan *sqltypes.Result)
done, quit := make(chan error), make(chan error)

go func() {
var err error
defer func() {
// pass along error, even if there's a panic
if r := recover(); r != nil {
err = fmt.Errorf("panic while running query for server-side cursor: %v", r)
}
close(next)
done <- err
close(done)
}()
err = handler.ComStmtExecute(ctx, c, prepare, func(qr *sqltypes.Result) error {
// block until query results are sent or receive signal to quit
var qerr error
select {
case next <- qr:
case qerr = <-quit:
}
return qerr
})
go func() {
var err error
defer func() {
// pass along error, even if there's a panic
if r := recover(); r != nil {
err = fmt.Errorf("panic while running query for server-side cursor: %v", r)
}
close(next)
done <- err
close(done)
}()

// Immediately receive the very first query result to write the fields
qr, ok := <-next
if !ok {
<-done
if werr := c.writeErrorPacket(ERUnknownError, SSUnknownSQLState, "unknown error: %v", "missing result set"); werr != nil {
log.Errorf("Error writing query error to %s: %v", c, werr)
return werr
err = handler.ComStmtExecute(ctx, c, prepare, func(qr *sqltypes.Result) error {
// block until query results are sent or receive signal to quit
var qerr error
select {
case next <- qr:
case qerr = <-quit:
}
return nil
}
return qerr
})
}()

if len(qr.Fields) == 0 {
// DML or something without a result set. We do not open a cursor here.
<-done
return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0)
} else {
// Open the cursor and write the fields.
c.StatusFlags |= uint16(ServerCursorExists)
if err := c.writeFieldsWithoutEOF(qr); err != nil {
log.Errorf("Error writing fields to %s: %v", c, err)
return err
}
// TODO: Look into whether accessing WarningCount
// here after passing `c` to ComStmtExecute in the
// goroutine above races.
if werr := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); werr != nil {
log.Errorf("Error writing result to %s: %v", c, werr)
return werr
}
// After writing the EOF_Packet/OK_Packet above, we
// have told the client the cursor is open.
c.StatusFlags &= ^uint16(ServerCursorExists)
c.cs = &cursorState{
stmtID: stmtID,
next: next,
done: done,
quit: quit,
pending: qr,
}
return nil
// Immediately receive the very first query result to write the fields
qr, ok := <-next
if !ok {
<-done
if werr := c.writeErrorPacket(ERUnknownError, SSUnknownSQLState, "unknown error: %v", "missing result set"); werr != nil {
log.Errorf("Error writing query error to %s: %v", c, werr)
return werr
}
return nil
}

if len(qr.Fields) == 0 {
// DML or something without a result set. We do not open a cursor here.
<-done
return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0)
}
// Open the cursor and write the fields.
c.StatusFlags |= uint16(ServerCursorExists)
if err := c.writeFieldsWithoutEOF(qr); err != nil {
log.Errorf("Error writing fields to %s: %v", c, err)
return err
}
// TODO: Look into whether accessing WarningCount
// here after passing `c` to ComStmtExecute in the
// goroutine above races.
if werr := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); werr != nil {
log.Errorf("Error writing result to %s: %v", c, werr)
return werr
}
// After writing the EOF_Packet/OK_Packet above, we
// have told the client the cursor is open.
c.StatusFlags &= ^uint16(ServerCursorExists)
c.cs = &cursorState{
stmtID: stmtID,
next: next,
done: done,
quit: quit,
pending: qr,
}
return nil
}

//
Expand Down
11 changes: 6 additions & 5 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ const (
)

// Cursor Types. They are received on COM_STMT_EXECUTE()
// See https://mariadb.com/kb/en/com_stmt_execute/
// See https://dev.mysql.com/doc/dev/mysql-server/9.3.0/mysql__com_8h.html#a3e5e9e744ff6f7b989a604fd669977da
const (
NoCursor = iota
ReadOnly
CursorForUpdate
ScrollableCursor
NoCursor uint8 = 0x00
ReadOnly uint8 = 0x01
ForUpdate uint8 = 0x02
Scrollable uint8 = 0x04
ParameterCountAvailable uint8 = 0x08
)

// State Change Information
Expand Down