Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ const (
// read or write a packet while one is already used.
ephemeralUnused = iota

// ephemeralWrite means we currently in process of writing currentEphemeralWriteBuffer
// ephemeralWrite means we currently in process of writing from currentEphemeralBuffer
ephemeralWrite

// ephemeralRead means we currently in process of reading into currentEphemeralReadBuffer
// ephemeralRead means we currently in process of reading into currentEphemeralBuffer
ephemeralRead
)

Expand Down Expand Up @@ -141,12 +141,9 @@ type Conn struct {
// - startEphemeralPacket / writeEphemeralPacket methods for writes.
// - readEphemeralPacket / recycleReadPacket methods for reads.
currentEphemeralPolicy int
// TODO (danieltahara): Ultimately get rid of this delineation.
// currentEphemeralWriteBuffer and currentEphemeralReadBuffer used for tracking
// allocated temporary buffers for writes and reads respectively. They can be allocated
// from bufPool or heap and should be recycled in the same manner.
currentEphemeralWriteBuffer *[]byte
currentEphemeralReadBuffer *[]byte
// currentEphemeralBuffer for tracking allocated temporary buffer for writes and reads respectively.
// It can be allocated from bufPool or heap and should be recycled in the same manner.
currentEphemeralBuffer *[]byte
}

// bufPool is used to allocate and free buffers in an efficient way.
Expand Down Expand Up @@ -278,11 +275,11 @@ func (c *Conn) readEphemeralPacket() ([]byte, error) {
c.currentEphemeralPolicy = ephemeralRead
// Use the bufPool.
if length < MaxPacketSize {
c.currentEphemeralReadBuffer = bufPool.Get(length)
if _, err := io.ReadFull(r, *c.currentEphemeralReadBuffer); err != nil {
c.currentEphemeralBuffer = bufPool.Get(length)
if _, err := io.ReadFull(r, *c.currentEphemeralBuffer); err != nil {
return nil, fmt.Errorf("io.ReadFull(packet body of length %v) failed: %v", length, err)
}
return *c.currentEphemeralReadBuffer, nil
return *c.currentEphemeralBuffer, nil
}

// Much slower path, revert to allocating everything from scratch.
Expand Down Expand Up @@ -337,11 +334,11 @@ func (c *Conn) readEphemeralPacketDirect() ([]byte, error) {

c.currentEphemeralPolicy = ephemeralRead
if length < MaxPacketSize {
c.currentEphemeralReadBuffer = bufPool.Get(length)
if _, err := io.ReadFull(r, *c.currentEphemeralReadBuffer); err != nil {
c.currentEphemeralBuffer = bufPool.Get(length)
if _, err := io.ReadFull(r, *c.currentEphemeralBuffer); err != nil {
return nil, fmt.Errorf("io.ReadFull(packet body of length %v) failed: %v", length, err)
}
return *c.currentEphemeralReadBuffer, nil
return *c.currentEphemeralBuffer, nil
}

return nil, fmt.Errorf("readEphemeralPacketDirect doesn't support more than one packet")
Expand All @@ -352,10 +349,10 @@ func (c *Conn) readEphemeralPacketDirect() ([]byte, error) {
func (c *Conn) recycleReadPacket() {
switch c.currentEphemeralPolicy {
case ephemeralRead:
if c.currentEphemeralReadBuffer != nil {
if c.currentEphemeralBuffer != nil {
// We are using the pool, put the buffer back in.
bufPool.Put(c.currentEphemeralReadBuffer)
c.currentEphemeralReadBuffer = nil
bufPool.Put(c.currentEphemeralBuffer)
c.currentEphemeralBuffer = nil
}
case ephemeralUnused, ephemeralWrite:
// Programming error.
Expand Down Expand Up @@ -502,8 +499,8 @@ func (c *Conn) startEphemeralPacket(length int) []byte {

c.currentEphemeralPolicy = ephemeralWrite
// get buffer from pool or it'll be allocated if length is too big
c.currentEphemeralWriteBuffer = bufPool.Get(length)
return *c.currentEphemeralWriteBuffer
c.currentEphemeralBuffer = bufPool.Get(length)
return *c.currentEphemeralBuffer
}

// writeEphemeralPacket writes the packet that was allocated by
Expand All @@ -513,7 +510,7 @@ func (c *Conn) writeEphemeralPacket() error {

switch c.currentEphemeralPolicy {
case ephemeralWrite:
if err := c.writePacket(*c.currentEphemeralWriteBuffer); err != nil {
if err := c.writePacket(*c.currentEphemeralBuffer); err != nil {
return fmt.Errorf("Conn %v: %v", c.ID(), err)
}
case ephemeralUnused, ephemeralRead:
Expand All @@ -530,8 +527,8 @@ func (c *Conn) recycleWritePacket() {
switch c.currentEphemeralPolicy {
case ephemeralWrite:
// Release our reference so the buffer can be gced
bufPool.Put(c.currentEphemeralWriteBuffer)
c.currentEphemeralWriteBuffer = nil
bufPool.Put(c.currentEphemeralBuffer)
c.currentEphemeralBuffer = nil
case ephemeralUnused, ephemeralRead:
// Programming error.
panic(fmt.Errorf("trying to call recycleWritePacket while currentEphemeralPolicy is %d", c.currentEphemeralPolicy))
Expand Down