Skip to content

Commit 735f33b

Browse files
authored
feat(consumer): use buffer pools for decompression (#2484)
Signed-off-by: Ronan Harmegnies <[email protected]>
1 parent 669d2bc commit 735f33b

File tree

1 file changed

+43
-6
lines changed

1 file changed

+43
-6
lines changed

decompress.go

+43-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"compress/gzip"
66
"fmt"
7-
"io"
87
"sync"
98

109
snappy "github.com/eapache/go-xerial-snappy"
@@ -19,6 +18,19 @@ var (
1918
}
2019

2120
gzipReaderPool sync.Pool
21+
22+
bufferPool = sync.Pool{
23+
New: func() interface{} {
24+
return new(bytes.Buffer)
25+
},
26+
}
27+
28+
bytesPool = sync.Pool{
29+
New: func() interface{} {
30+
res := make([]byte, 0, 4096)
31+
return &res
32+
},
33+
}
2234
)
2335

2436
func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
@@ -38,9 +50,17 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
3850
return nil, err
3951
}
4052

41-
defer gzipReaderPool.Put(reader)
53+
buffer := bufferPool.Get().(*bytes.Buffer)
54+
_, err = buffer.ReadFrom(reader)
55+
// copy the buffer to a new slice with the correct length
56+
// reuse gzipReader and buffer
57+
gzipReaderPool.Put(reader)
58+
res := make([]byte, buffer.Len())
59+
copy(res, buffer.Bytes())
60+
buffer.Reset()
61+
bufferPool.Put(buffer)
4262

43-
return io.ReadAll(reader)
63+
return res, err
4464
case CompressionSnappy:
4565
return snappy.Decode(data)
4666
case CompressionLZ4:
@@ -50,11 +70,28 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
5070
} else {
5171
reader.Reset(bytes.NewReader(data))
5272
}
53-
defer lz4ReaderPool.Put(reader)
73+
buffer := bufferPool.Get().(*bytes.Buffer)
74+
_, err := buffer.ReadFrom(reader)
75+
// copy the buffer to a new slice with the correct length
76+
// reuse lz4Reader and buffer
77+
lz4ReaderPool.Put(reader)
78+
res := make([]byte, buffer.Len())
79+
copy(res, buffer.Bytes())
80+
buffer.Reset()
81+
bufferPool.Put(buffer)
5482

55-
return io.ReadAll(reader)
83+
return res, err
5684
case CompressionZSTD:
57-
return zstdDecompress(ZstdDecoderParams{}, nil, data)
85+
buffer := *bytesPool.Get().(*[]byte)
86+
var err error
87+
buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data)
88+
// copy the buffer to a new slice with the correct length and reuse buffer
89+
res := make([]byte, len(buffer))
90+
copy(res, buffer)
91+
buffer = buffer[:0]
92+
bytesPool.Put(&buffer)
93+
94+
return res, err
5895
default:
5996
return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
6097
}

0 commit comments

Comments
 (0)