From a37b37558763ad1e50bfc9f9b25ea3c0f1036928 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Tue, 4 Sep 2018 22:42:38 -0700 Subject: [PATCH] [go/mysql] use tiered pool for buffers to avoid false hits Now when we get smaller than we want buffer from pool, we just put it back and allocate a new one. This might be quite inefficient. So, instead use multiple pools which always return buffers >= request size. Signed-off-by: Alexander Morozov --- go/bucketpool/bucketpool.go | 91 ++++++++++++++ go/bucketpool/bucketpool_test.go | 202 +++++++++++++++++++++++++++++++ go/mysql/conn.go | 29 +---- 3 files changed, 297 insertions(+), 25 deletions(-) create mode 100644 go/bucketpool/bucketpool.go create mode 100644 go/bucketpool/bucketpool_test.go diff --git a/go/bucketpool/bucketpool.go b/go/bucketpool/bucketpool.go new file mode 100644 index 00000000000..be0ec18aa7d --- /dev/null +++ b/go/bucketpool/bucketpool.go @@ -0,0 +1,91 @@ +package bucketpool + +import ( + "math" + "sync" +) + +type sizedPool struct { + size int + pool sync.Pool +} + +func newSizedPool(size int) *sizedPool { + return &sizedPool{ + size: size, + pool: sync.Pool{ + New: func() interface{} { return makeSlicePointer(size) }, + }, + } +} + +// Pool is actually multiple pools which store buffers of specific size. +// i.e. it can be three pools which return buffers 32K, 64K and 128K. +type Pool struct { + minSize int + maxSize int + pools []*sizedPool +} + +// New returns Pool which has buckets from minSize to maxSize. +// Buckets increase with the power of two, i.e with multiplier 2: [2b, 4b, 16b, ... , 1024b] +// Last pool will always be capped to maxSize. +func New(minSize, maxSize int) *Pool { + if maxSize < minSize { + panic("maxSize can't be less than minSize") + } + const multiplier = 2 + var pools []*sizedPool + curSize := minSize + for curSize < maxSize { + pools = append(pools, newSizedPool(curSize)) + curSize *= multiplier + } + pools = append(pools, newSizedPool(maxSize)) + return &Pool{ + minSize: minSize, + maxSize: maxSize, + pools: pools, + } +} + +func (p *Pool) findPool(size int) *sizedPool { + if size > p.maxSize { + return nil + } + idx := int(math.Ceil(math.Log2(float64(size) / float64(p.minSize)))) + if idx < 0 { + idx = 0 + } + if idx > len(p.pools)-1 { + return nil + } + return p.pools[idx] +} + +// Get returns pointer to []byte which has len size. +// If there is no bucket with buffers >= size, slice will be allocated. +func (p *Pool) Get(size int) *[]byte { + sp := p.findPool(size) + if sp == nil { + return makeSlicePointer(size) + } + buf := sp.pool.Get().(*[]byte) + *buf = (*buf)[:size] + return buf +} + +// Put returns pointer to slice to some bucket. Discards slice for which there is no bucket +func (p *Pool) Put(b *[]byte) { + sp := p.findPool(cap(*b)) + if sp == nil { + return + } + *b = (*b)[:cap(*b)] + sp.pool.Put(b) +} + +func makeSlicePointer(size int) *[]byte { + data := make([]byte, size) + return &data +} diff --git a/go/bucketpool/bucketpool_test.go b/go/bucketpool/bucketpool_test.go new file mode 100644 index 00000000000..a7e2cf0e75c --- /dev/null +++ b/go/bucketpool/bucketpool_test.go @@ -0,0 +1,202 @@ +package bucketpool + +import ( + "math/rand" + "testing" +) + +func TestPool(t *testing.T) { + maxSize := 16384 + pool := New(1024, maxSize) + if pool.maxSize != maxSize { + t.Fatalf("Invalid max pool size: %d, expected %d", pool.maxSize, maxSize) + } + if len(pool.pools) != 5 { + t.Fatalf("Invalid number of pools: %d, expected %d", len(pool.pools), 5) + } + + buf := pool.Get(64) + if len(*buf) != 64 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 1024 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + + // get from same pool, check that length is right + buf = pool.Get(128) + if len(*buf) != 128 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 1024 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + // get boundary size + buf = pool.Get(1024) + if len(*buf) != 1024 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 1024 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + // get from the middle + buf = pool.Get(5000) + if len(*buf) != 5000 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 8192 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + // check last pool + buf = pool.Get(16383) + if len(*buf) != 16383 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 16384 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + // get big buffer + buf = pool.Get(16385) + if len(*buf) != 16385 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 16385 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) +} + +func TestPoolOneSize(t *testing.T) { + maxSize := 1024 + pool := New(1024, maxSize) + if pool.maxSize != maxSize { + t.Fatalf("Invalid max pool size: %d, expected %d", pool.maxSize, maxSize) + } + buf := pool.Get(64) + if len(*buf) != 64 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 1024 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + buf = pool.Get(1025) + if len(*buf) != 1025 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 1025 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) +} + +func TestPoolTwoSizeNotMultiplier(t *testing.T) { + maxSize := 2000 + pool := New(1024, maxSize) + if pool.maxSize != maxSize { + t.Fatalf("Invalid max pool size: %d, expected %d", pool.maxSize, maxSize) + } + buf := pool.Get(64) + if len(*buf) != 64 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 1024 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + buf = pool.Get(2001) + if len(*buf) != 2001 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 2001 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) +} + +func TestPoolWeirdMaxSize(t *testing.T) { + maxSize := 15000 + pool := New(1024, maxSize) + if pool.maxSize != maxSize { + t.Fatalf("Invalid max pool size: %d, expected %d", pool.maxSize, maxSize) + } + + buf := pool.Get(14000) + if len(*buf) != 14000 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 15000 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) + + buf = pool.Get(16383) + if len(*buf) != 16383 { + t.Fatalf("unexpected buf length: %d", len(*buf)) + } + if cap(*buf) != 16383 { + t.Fatalf("unexepected buf cap: %d", cap(*buf)) + } + pool.Put(buf) +} + +func TestFuzz(t *testing.T) { + maxTestSize := 16384 + for i := 0; i < 20000; i++ { + minSize := rand.Intn(maxTestSize) + maxSize := rand.Intn(maxTestSize-minSize) + minSize + p := New(minSize, maxSize) + bufSize := rand.Intn(maxTestSize) + buf := p.Get(bufSize) + if len(*buf) != bufSize { + t.Fatalf("Invalid length %d, expected %d", len(*buf), bufSize) + } + sPool := p.findPool(bufSize) + if sPool == nil { + if cap(*buf) != len(*buf) { + t.Fatalf("Invalid cap %d, expected %d", cap(*buf), len(*buf)) + } + } else { + if cap(*buf) != sPool.size { + t.Fatalf("Invalid cap %d, expected %d", cap(*buf), sPool.size) + } + } + p.Put(buf) + } +} + +func BenchmarkPool(b *testing.B) { + pool := New(2, 16384) + b.SetParallelism(16) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + randomSize := rand.Intn(pool.maxSize) + data := pool.Get(randomSize) + pool.Put(data) + } + }) +} + +func BenchmarkPoolGet(b *testing.B) { + pool := New(2, 16384) + b.SetParallelism(16) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + randomSize := rand.Intn(pool.maxSize) + data := pool.Get(randomSize) + _ = data + } + }) +} diff --git a/go/mysql/conn.go b/go/mysql/conn.go index 4a08057ead4..f5f4d48e637 100644 --- a/go/mysql/conn.go +++ b/go/mysql/conn.go @@ -22,8 +22,8 @@ import ( "io" "net" "strings" - "sync" + "vitess.io/vitess/go/bucketpool" querypb "vitess.io/vitess/go/vt/proto/query" ) @@ -190,28 +190,7 @@ type Conn struct { } // bufPool is used to allocate and free buffers in an efficient way. -var bufPool = sync.Pool{} - -// length is always > connBufferSize here, for smaller buffers static buffer field is used -func getBuf(length int) *[]byte { - i := bufPool.Get() - if i == nil { - buf := make([]byte, length) - return &buf - } - // We got an array from the pool, see if it's - // big enough. - buf := i.(*[]byte) - if cap(*buf) >= length { - // big enough, shrink to length and use it. - *buf = (*buf)[:length] - return buf - } - // not big enough: allocate a new one, put smaller buffer back to the pool - bufPool.Put(buf) - data := make([]byte, length) - return &data -} +var bufPool = bucketpool.New(connBufferSize, MaxPacketSize) // newConn is an internal method to create a Conn. Used by client and server // side for common creation code. @@ -337,7 +316,7 @@ func (c *Conn) readEphemeralPacket() ([]byte, error) { // Slightly slower path: single packet. Use the bufPool. if length < MaxPacketSize { c.currentEphemeralPolicy = ephemeralReadSingleBuffer - c.currentEphemeralReadBuffer = getBuf(length) + c.currentEphemeralReadBuffer = bufPool.Get(length) if _, err := io.ReadFull(c.reader, *c.currentEphemeralReadBuffer); err != nil { return nil, fmt.Errorf("io.ReadFull(packet body of length %v) failed: %v", length, err) } @@ -553,7 +532,7 @@ func (c *Conn) startEphemeralPacket(length int) []byte { if length < MaxPacketSize { c.currentEphemeralPolicy = ephemeralWriteSingleBuffer - c.currentEphemeralWriteBuffer = getBuf(length + 4) + c.currentEphemeralWriteBuffer = bufPool.Get(length + 4) (*c.currentEphemeralWriteBuffer)[0] = byte(length) (*c.currentEphemeralWriteBuffer)[1] = byte(length >> 8) (*c.currentEphemeralWriteBuffer)[2] = byte(length >> 16)