diff --git a/server/filestore.go b/server/filestore.go index 8552c84b110..d29d51f88cd 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4320,11 +4320,13 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { if mb.cmp != NoCompression { buf, err := mb.loadBlock(nil) if err != nil { + mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to load block from disk: %w", err) } if mb.bek != nil && len(buf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { + mb.mu.Unlock() return 0, 0, err } mb.bek = bek @@ -4332,12 +4334,14 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } buf, err = mb.decompressIfNeeded(buf) if err != nil { + mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to decompress block: %w", err) } buf = buf[:eof] copy(mb.lchk[0:], buf[:len(buf)-checksumSize]) buf, err = mb.cmp.Compress(buf) if err != nil { + mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to recompress block: %w", err) } meta := &CompressionInfo{ @@ -4348,6 +4352,7 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { if mb.bek != nil && len(buf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { + mb.mu.Unlock() return 0, 0, err } mb.bek = bek @@ -4355,9 +4360,11 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } n, err := mb.writeAt(buf, 0) if err != nil { + mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err) } if n != len(buf) { + mb.mu.Unlock() return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf)) } mb.mfd.Truncate(int64(len(buf))) @@ -7008,6 +7015,7 @@ func (fs *fileStore) Truncate(seq uint64) error { // Set lmb to nlmb and make sure writeable. fs.lmb = nlmb if err := nlmb.enableForWriting(fs.fip); err != nil { + fs.mu.Unlock() return err } diff --git a/server/stream.go b/server/stream.go index 0462a96787e..73694d2ed59 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5611,6 +5611,7 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) { if o.closed || o.mset == nil { o.mu.Unlock() + mset.clsMu.Unlock() return }