-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[FIXED] Missing tombstones in file store for PurgeEx and Compact
#6685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1158,12 +1158,22 @@ func (fs *fileStore) rebuildStateLocked(ld *LostStreamData) { | |
| fs.state.Msgs += mb.msgs | ||
| fs.state.Bytes += mb.bytes | ||
| fseq := atomic.LoadUint64(&mb.first.seq) | ||
| if fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { | ||
| if fs.state.FirstSeq == 0 || (fseq < fs.state.FirstSeq && mb.first.ts != 0) { | ||
| fs.state.FirstSeq = fseq | ||
| fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() | ||
| if mb.first.ts == 0 { | ||
| fs.state.FirstTime = time.Time{} | ||
| } else { | ||
| fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() | ||
| } | ||
| } | ||
| if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq { | ||
| fs.state.LastSeq = lseq | ||
| if mb.last.ts == 0 { | ||
| fs.state.LastTime = time.Time{} | ||
| } else { | ||
| fs.state.LastTime = time.Unix(0, mb.last.ts).UTC() | ||
| } | ||
| } | ||
| fs.state.LastSeq = atomic.LoadUint64(&mb.last.seq) | ||
| fs.state.LastTime = time.Unix(0, mb.last.ts).UTC() | ||
| mb.mu.RUnlock() | ||
| } | ||
| } | ||
|
|
@@ -1556,7 +1566,7 @@ func (fs *fileStore) debug(format string, args ...any) { | |
| func updateTrackingState(state *StreamState, mb *msgBlock) { | ||
| if state.FirstSeq == 0 { | ||
| state.FirstSeq = mb.first.seq | ||
| } else if mb.first.seq < state.FirstSeq { | ||
| } else if mb.first.seq < state.FirstSeq && mb.first.ts != 0 { | ||
| state.FirstSeq = mb.first.seq | ||
| } | ||
| if mb.last.seq > state.LastSeq { | ||
|
|
@@ -2033,7 +2043,8 @@ func (fs *fileStore) recoverMsgs() error { | |
| fs.removeMsgBlockFromList(mb) | ||
| continue | ||
| } | ||
| if fseq := atomic.LoadUint64(&mb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { | ||
| fseq := atomic.LoadUint64(&mb.first.seq) | ||
| if fs.state.FirstSeq == 0 || (fseq < fs.state.FirstSeq && mb.first.ts != 0) { | ||
| fs.state.FirstSeq = fseq | ||
| if mb.first.ts == 0 { | ||
| fs.state.FirstTime = time.Time{} | ||
|
|
@@ -7807,13 +7818,10 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64, _ /* noMarke | |
| if firstSeqNeedsUpdate { | ||
| fs.selectNextFirst() | ||
| } | ||
| fseq := fs.state.FirstSeq | ||
|
|
||
| // Write any tombstones as needed. | ||
| for _, tomb := range tombs { | ||
| if tomb.seq > fseq { | ||
| fs.writeTombstone(tomb.seq, tomb.ts) | ||
| } | ||
| fs.writeTombstone(tomb.seq, tomb.ts) | ||
| } | ||
|
|
||
| os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) | ||
|
|
@@ -8005,6 +8013,7 @@ func (fs *fileStore) compact(seq uint64, _ /* noMarkers */ bool) (uint64, error) | |
|
|
||
| var smv StoreMsg | ||
| var err error | ||
| var tombs []msgId | ||
|
|
||
| smb.mu.Lock() | ||
| if atomic.LoadUint64(&smb.first.seq) == seq { | ||
|
|
@@ -8040,6 +8049,7 @@ func (fs *fileStore) compact(seq uint64, _ /* noMarkers */ bool) (uint64, error) | |
| // Update fss | ||
| smb.removeSeqPerSubject(sm.subj, mseq) | ||
| fs.removePerSubject(sm.subj, !noMarkers && fs.cfg.SubjectDeleteMarkerTTL > 0) | ||
| tombs = append(tombs, msgId{sm.seq, sm.ts}) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we not do this if we know we are going to nil out below? Also maybe allocate the tombs array if we know we will not be re-writing the block.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't think we can know apriori if we'll be allowed to rewrite the block or not? We can check if it's >2MB, but we don't know how many bytes are going to be removed and if we'll drop below half of I've added the append here so we don't need to scan the block twice. Once for calculating bytes, and one for collecting tombs. Now it's just one loop. What should the tombs array be allocated to?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can look at dmap to get an idea no? And current size of course of the block..
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dmap length for allocation of tombs I think..
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't really allocate to the length of dmap since it's not related to the amount of tombstones that need to be written. Any tombstones that do need to be written, are not deleted so are also not part of the dmap. Think it's best to leave as-is for now, given it's the same implementation as what we already have in |
||
| } | ||
| } | ||
|
|
||
|
|
@@ -8106,12 +8116,19 @@ func (fs *fileStore) compact(seq uint64, _ /* noMarkers */ bool) (uint64, error) | |
| smb.fss = nil | ||
| smb.clearCacheAndOffset() | ||
| smb.rbytes = uint64(len(nbuf)) | ||
| // Make sure we don't write any additional tombstones. | ||
| tombs = nil | ||
| } | ||
| } | ||
|
|
||
| SKIP: | ||
| smb.mu.Unlock() | ||
|
|
||
| // Write any tombstones as needed. | ||
| for _, tomb := range tombs { | ||
derekcollison marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| fs.writeTombstone(tomb.seq, tomb.ts) | ||
| } | ||
|
|
||
| if deleted > 0 { | ||
| // Update block map. | ||
| if fs.bim != nil { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.