diff --git a/server/filestore.go b/server/filestore.go index 862897077a2..ec07a53d641 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -356,6 +356,9 @@ const ( // For smaller reuse buffers. Usually being generated during contention on the lead write buffer. // E.g. mirrors/sources etc. defaultSmallBlockSize = 1 * 1024 * 1024 // 1MB + // NOT an actual block size, but used for the sync.Pools, so that we don't allocate huge buffers + // unnecessarily until there are enough writes to justify it. + defaultTinyBlockSize = 1 * 1024 * 256 // 256KB // Maximum size for the encrypted head block. maximumEncryptedBlockSize = 2 * 1024 * 1024 // 2MB // Default for KV based @@ -947,6 +950,12 @@ func (fs *fileStore) writeStreamMeta() error { } // Pools to recycle the blocks to help with memory pressure. +var blkPoolTiny = &sync.Pool{ + New: func() any { + b := [defaultTinyBlockSize]byte{} + return &b + }, +} var blkPoolSmall = &sync.Pool{ New: func() any { b := [defaultSmallBlockSize]byte{} @@ -969,6 +978,8 @@ var blkPoolBig = &sync.Pool{ // Get a new msg block based on sz estimate. func getMsgBlockBuf(sz int) (buf []byte) { switch { + case sz <= defaultTinyBlockSize: + return blkPoolTiny.Get().(*[defaultTinyBlockSize]byte)[:0] case sz <= defaultSmallBlockSize: return blkPoolSmall.Get().(*[defaultSmallBlockSize]byte)[:0] case sz <= defaultMediumBlockSize: @@ -986,6 +997,9 @@ func getMsgBlockBuf(sz int) (buf []byte) { // Recycle the msg block. func recycleMsgBlockBuf(buf []byte) { switch cap(buf) { + case defaultTinyBlockSize: + b := (*[defaultTinyBlockSize]byte)(buf[0:defaultTinyBlockSize]) + blkPoolTiny.Put(b) case defaultSmallBlockSize: b := (*[defaultSmallBlockSize]byte)(buf[0:defaultSmallBlockSize]) blkPoolSmall.Put(b) @@ -6237,6 +6251,17 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg // reference. It will now stay strong until the flusher decides it is time to weaken. mb.ecache.Strengthen() + // Make sure we have enough space to write into. If we don't then we can pull a buffer + // from the next pool size up to save us from reallocating in append() below. + if nsz := len(mb.cache.buf) + int(rl); cap(mb.cache.buf) < nsz { + prev := mb.cache.buf + mb.cache.buf = getMsgBlockBuf(nsz) + if prev != nil { + mb.cache.buf = mb.cache.buf[:copy(mb.cache.buf[:nsz], prev)] + recycleMsgBlockBuf(prev) + } + } + // Indexing index := len(mb.cache.buf)