Skip to content

Commit 38204cb

Browse files
authored
Reduce the zstd encoder state to pool size to one (#2375)
Currently a single zstd encoder with default concurrency is used. Default concurrency causes EncodeAll to create one encoder state per GOMAXPROC, per default per core. On high core machined (32+) and high compression levels this leads to 1GB memory consumption per ~32 cores. A 1GB encoder is pretty expensive compared to the 1MB payloads usually sent to kafka. The new approach limits the encoder to a single core but allows dynamic allocation of additional encoders if no encoder is available. Encoders are returned after use, thus allowing for reuse. A benchmark emulating a 96 core system shows the effectiveness of the change. Previous result: ``` goos: linux goarch: amd64 pkg: github.com/Shopify/sarama cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz BenchmarkZstdMemoryConsumption-8 2 834830801 ns/op 3664055292 B/op 4710 allocs/op PASS ok github.com/Shopify/sarama 2.181s ``` Current result: ``` goos: linux goarch: amd64 pkg: github.com/Shopify/sarama cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz BenchmarkZstdMemoryConsumption-8 5 222605954 ns/op 38960185 B/op 814 allocs/op PASS ok github.com/Shopify/sarama 3.045s ``` ``` BenchmarkZstdMemoryConsumption-8 2 834830801 ns/op 3664055292 B/op 4710 allocs/op BenchmarkZstdMemoryConsumption-8 5 222605954 ns/op 38960185 B/op 814 allocs/op ``` A ~4x improvement on total runtime and a 96x improvemenet on memory usage for the first 2x96 messages. This patch will as a downside increase how often new encoders are created on the fly and the maximum number of encoders might be even higher.
1 parent 610514e commit 38204cb

File tree

2 files changed

+67
-14
lines changed

2 files changed

+67
-14
lines changed

zstd.go

+38-14
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,49 @@ import (
66
"github.com/klauspost/compress/zstd"
77
)
88

9+
// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
10+
// If the pool of encoders is exhausted then new encoders will be created on the fly
11+
const zstdMaxBufferedEncoders = 1
12+
913
type ZstdEncoderParams struct {
1014
Level int
1115
}
1216
type ZstdDecoderParams struct {
1317
}
1418

15-
var zstdEncMap, zstdDecMap sync.Map
19+
var zstdDecMap sync.Map
20+
21+
var zstdAvailableEncoders sync.Map
1622

17-
func getEncoder(params ZstdEncoderParams) *zstd.Encoder {
18-
if ret, ok := zstdEncMap.Load(params); ok {
19-
return ret.(*zstd.Encoder)
23+
func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
24+
if c, ok := zstdAvailableEncoders.Load(params); ok {
25+
return c.(chan *zstd.Encoder)
2026
}
21-
// It's possible to race and create multiple new writers.
22-
// Only one will survive GC after use.
23-
encoderLevel := zstd.SpeedDefault
24-
if params.Level != CompressionLevelDefault {
25-
encoderLevel = zstd.EncoderLevelFromZstd(params.Level)
27+
c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, zstdMaxBufferedEncoders))
28+
return c.(chan *zstd.Encoder)
29+
}
30+
31+
func getZstdEncoder(params ZstdEncoderParams) *zstd.Encoder {
32+
select {
33+
case enc := <-getZstdEncoderChannel(params):
34+
return enc
35+
default:
36+
encoderLevel := zstd.SpeedDefault
37+
if params.Level != CompressionLevelDefault {
38+
encoderLevel = zstd.EncoderLevelFromZstd(params.Level)
39+
}
40+
zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true),
41+
zstd.WithEncoderLevel(encoderLevel),
42+
zstd.WithEncoderConcurrency(1))
43+
return zstdEnc
44+
}
45+
}
46+
47+
func releaseEncoder(params ZstdEncoderParams, enc *zstd.Encoder) {
48+
select {
49+
case getZstdEncoderChannel(params) <- enc:
50+
default:
2651
}
27-
zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true),
28-
zstd.WithEncoderLevel(encoderLevel))
29-
zstdEncMap.Store(params, zstdEnc)
30-
return zstdEnc
3152
}
3253

3354
func getDecoder(params ZstdDecoderParams) *zstd.Decoder {
@@ -46,5 +67,8 @@ func zstdDecompress(params ZstdDecoderParams, dst, src []byte) ([]byte, error) {
4667
}
4768

4869
func zstdCompress(params ZstdEncoderParams, dst, src []byte) ([]byte, error) {
49-
return getEncoder(params).EncodeAll(src, dst), nil
70+
enc := getZstdEncoder(params)
71+
out := enc.EncodeAll(src, dst)
72+
releaseEncoder(params, enc)
73+
return out, nil
5074
}

zstd_test.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package sarama
2+
3+
import (
4+
"runtime"
5+
"testing"
6+
)
7+
8+
func BenchmarkZstdMemoryConsumption(b *testing.B) {
9+
params := ZstdEncoderParams{Level: 9}
10+
buf := make([]byte, 1024*1024)
11+
for i := 0; i < len(buf); i++ {
12+
buf[i] = byte((i / 256) + (i * 257))
13+
}
14+
15+
cpus := 96
16+
17+
gomaxprocsBackup := runtime.GOMAXPROCS(cpus)
18+
b.ReportAllocs()
19+
for i := 0; i < b.N; i++ {
20+
for j := 0; j < 2*cpus; j++ {
21+
_, _ = zstdCompress(params, nil, buf)
22+
}
23+
// drain the buffered encoder
24+
getZstdEncoder(params)
25+
// previously this would be achieved with
26+
// zstdEncMap.Delete(params)
27+
}
28+
runtime.GOMAXPROCS(gomaxprocsBackup)
29+
}

0 commit comments

Comments
 (0)