Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ const (
// For smaller reuse buffers. Usually being generated during contention on the lead write buffer.
// E.g. mirrors/sources etc.
defaultSmallBlockSize = 1 * 1024 * 1024 // 1MB
// NOT an actual block size, but used for the sync.Pools, so that we don't allocate huge buffers
// unnecessarily until there are enough writes to justify it.
defaultTinyBlockSize = 1 * 1024 * 256 // 256KB
// Maximum size for the encrypted head block.
maximumEncryptedBlockSize = 2 * 1024 * 1024 // 2MB
// Default for KV based
Expand Down Expand Up @@ -947,6 +950,12 @@ func (fs *fileStore) writeStreamMeta() error {
}

// Pools to recycle the blocks to help with memory pressure.
var blkPoolTiny = &sync.Pool{
New: func() any {
b := [defaultTinyBlockSize]byte{}
return &b
},
}
var blkPoolSmall = &sync.Pool{
New: func() any {
b := [defaultSmallBlockSize]byte{}
Expand All @@ -969,6 +978,8 @@ var blkPoolBig = &sync.Pool{
// Get a new msg block based on sz estimate.
func getMsgBlockBuf(sz int) (buf []byte) {
switch {
case sz <= defaultTinyBlockSize:
return blkPoolTiny.Get().(*[defaultTinyBlockSize]byte)[:0]
case sz <= defaultSmallBlockSize:
return blkPoolSmall.Get().(*[defaultSmallBlockSize]byte)[:0]
case sz <= defaultMediumBlockSize:
Expand All @@ -986,6 +997,9 @@ func getMsgBlockBuf(sz int) (buf []byte) {
// Recycle the msg block.
func recycleMsgBlockBuf(buf []byte) {
switch cap(buf) {
case defaultTinyBlockSize:
b := (*[defaultTinyBlockSize]byte)(buf[0:defaultTinyBlockSize])
blkPoolTiny.Put(b)
case defaultSmallBlockSize:
b := (*[defaultSmallBlockSize]byte)(buf[0:defaultSmallBlockSize])
blkPoolSmall.Put(b)
Expand Down Expand Up @@ -6237,6 +6251,17 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
// reference. It will now stay strong until the flusher decides it is time to weaken.
mb.ecache.Strengthen()

// Make sure we have enough space to write into. If we don't then we can pull a buffer
// from the next pool size up to save us from reallocating in append() below.
if nsz := len(mb.cache.buf) + int(rl); cap(mb.cache.buf) < nsz {
prev := mb.cache.buf
mb.cache.buf = getMsgBlockBuf(nsz)
if prev != nil {
mb.cache.buf = mb.cache.buf[:copy(mb.cache.buf[:nsz], prev)]
recycleMsgBlockBuf(prev)
}
}

// Indexing
index := len(mb.cache.buf)

Expand Down