Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 12 additions & 48 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,10 @@ const (
// read or write a packet while one is already used.
ephemeralUnused = iota

// ephemeralWriteSingleBuffer means a single buffer was
// allocated to write a packet. It is in
// c.currentEphemeralWriteBuffer. The first four bytes contain size
// and sequence.
ephemeralWriteSingleBuffer

// ephemeralWriteBigBuffer means a big buffer was allocated to
// write a packet, and will need to be split when sending.
// The allocated buffer is in c.currentEphemeralWriteBuffer.
ephemeralWriteBigBuffer

// ephemeralRead means we are using currentEphemeralReadBuffer, which might be allocated from heap or pool.
// On recycle smaller buffers will be returned to pool and bigger (> MaxPacketSize) discarded.
// ephemeralWrite means we currently in process of writing currentEphemeralWriteBuffer
ephemeralWrite

// ephemeralRead means we currently in process of reading into currentEphemeralReadBuffer
ephemeralRead
)

Expand Down Expand Up @@ -152,7 +143,8 @@ type Conn struct {
currentEphemeralPolicy int
// TODO (danieltahara): Ultimately get rid of this delineation.
// currentEphemeralWriteBuffer and currentEphemeralReadBuffer used for tracking
// allocated temporary buffers for writes and reads respectively.
// allocated temporary buffers for writes and reads respectively. They can be allocated
// from bufPool or heap and should be recycled in the same manner.
currentEphemeralWriteBuffer *[]byte
currentEphemeralReadBuffer *[]byte
}
Expand Down Expand Up @@ -365,7 +357,7 @@ func (c *Conn) recycleReadPacket() {
bufPool.Put(c.currentEphemeralReadBuffer)
c.currentEphemeralReadBuffer = nil
}
case ephemeralUnused, ephemeralWriteSingleBuffer, ephemeralWriteBigBuffer:
case ephemeralUnused, ephemeralWrite:
// Programming error.
panic(fmt.Errorf("trying to call recycleReadPacket while currentEphemeralPolicy is %d", c.currentEphemeralPolicy))
}
Expand Down Expand Up @@ -508,23 +500,9 @@ func (c *Conn) startEphemeralPacket(length int) []byte {
panic("startEphemeralPacket cannot be used while a packet is already started.")
}

// get buffer from pool
if length < MaxPacketSize {
c.currentEphemeralPolicy = ephemeralWriteSingleBuffer

c.currentEphemeralWriteBuffer = bufPool.Get(length + 4)
(*c.currentEphemeralWriteBuffer)[0] = byte(length)
(*c.currentEphemeralWriteBuffer)[1] = byte(length >> 8)
(*c.currentEphemeralWriteBuffer)[2] = byte(length >> 16)
(*c.currentEphemeralWriteBuffer)[3] = c.sequence
c.sequence++
return (*c.currentEphemeralWriteBuffer)[4:]
}

// Even slower path: create a full size buffer and return it.
c.currentEphemeralPolicy = ephemeralWriteBigBuffer
data := make([]byte, length)
c.currentEphemeralWriteBuffer = &data
c.currentEphemeralPolicy = ephemeralWrite
// get buffer from pool or it'll be allocated if length is too big
c.currentEphemeralWriteBuffer = bufPool.Get(length)
return *c.currentEphemeralWriteBuffer
}

Expand All @@ -534,16 +512,7 @@ func (c *Conn) writeEphemeralPacket() error {
defer c.recycleWritePacket()

switch c.currentEphemeralPolicy {
case ephemeralWriteSingleBuffer:
// Write the allocated buffer as a single buffer.
// It has both header and data.
if n, err := c.getWriter().Write(*c.currentEphemeralWriteBuffer); err != nil {
return fmt.Errorf("Conn %v: Write(*c.currentEphemeralWriteBuffer) failed: %v", c.ID(), err)
} else if n != len(*c.currentEphemeralWriteBuffer) {
return fmt.Errorf("Conn %v: Write(*c.currentEphemeralWriteBuffer) returned a short write: %v < %v", c.ID(), n, len(*c.currentEphemeralWriteBuffer))
}
case ephemeralWriteBigBuffer:
// This is the slower path for big data.
case ephemeralWrite:
if err := c.writePacket(*c.currentEphemeralWriteBuffer); err != nil {
return fmt.Errorf("Conn %v: %v", c.ID(), err)
}
Expand All @@ -559,15 +528,10 @@ func (c *Conn) writeEphemeralPacket() error {
// after writeEphemeralPacket was called.
func (c *Conn) recycleWritePacket() {
switch c.currentEphemeralPolicy {
case ephemeralWriteSingleBuffer:
case ephemeralWrite:
// Release our reference so the buffer can be gced
bufPool.Put(c.currentEphemeralWriteBuffer)
c.currentEphemeralWriteBuffer = nil
case ephemeralWriteBigBuffer:
// We allocated a one-time buffer we can't re-use.
// N.B. Unlike the read packet, we actually assign the big buffer to currentEphemeralReadBuffer,
// so we should remove our reference to it.
c.currentEphemeralWriteBuffer = nil
case ephemeralUnused, ephemeralRead:
// Programming error.
panic(fmt.Errorf("trying to call recycleWritePacket while currentEphemeralPolicy is %d", c.currentEphemeralPolicy))
Expand Down