Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4313,8 +4313,11 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {

// Truncate this message block to the storedMsg.
func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()

// Make sure we are loaded to process messages etc.
if err := mb.loadMsgs(); err != nil {
if err := mb.loadMsgsWithLock(); err != nil {
return 0, 0, err
}

Expand All @@ -4328,8 +4331,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {

var purged, bytes uint64

mb.mu.Lock()

checkDmap := mb.dmap.Size() > 0
var smv StoreMsg

Expand Down Expand Up @@ -4365,28 +4366,24 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if mb.cmp != NoCompression {
buf, err := mb.loadBlock(nil)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to load block from disk: %w", err)
}
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
mb.mu.Unlock()
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
buf, err = mb.decompressIfNeeded(buf)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to decompress block: %w", err)
}
buf = buf[:eof]
copy(mb.lchk[0:], buf[:len(buf)-checksumSize])
buf, err = mb.cmp.Compress(buf)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to recompress block: %w", err)
}
meta := &CompressionInfo{
Expand All @@ -4397,19 +4394,16 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
mb.mu.Unlock()
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
n, err := mb.writeAt(buf, 0)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err)
}
if n != len(buf) {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf))
}
mb.mfd.Truncate(int64(len(buf)))
Expand All @@ -4422,7 +4416,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
mb.mfd.ReadAt(lchk[:], eof-8)
copy(mb.lchk[0:], lchk[:])
} else {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to truncate msg block %d, file not open", mb.index)
}

Expand All @@ -4436,10 +4429,8 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
// Redo per subject info for this block.
mb.resetPerSubjectInfo()

mb.mu.Unlock()

// Load msgs again.
mb.loadMsgs()
mb.loadMsgsWithLock()

return purged, bytes, nil
}
Expand Down