Skip to content

Commit

Permalink
fix: fix duplicate compression on open
Browse files Browse the repository at this point in the history
Signed-off-by: arkbriar <[email protected]>
  • Loading branch information
arkbriar committed Sep 12, 2024
1 parent f9af954 commit 42d0372
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion internal/filechannel/filechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ func (sm *SegmentManager) updateWatermark() {
sm.watermark = min(sm.pinFreq.Front().Key(), sm.readFreq.Front().Key())
}

// The watermark should be monotonically increasing.
if prevWatermark > sm.watermark {
panic("watermark regression")
}

if prevWatermark != sm.watermark {
sm.readCond.Broadcast()
}
Expand Down Expand Up @@ -1019,6 +1024,7 @@ func (fc *FileChannel) Open() error {
}
fc.segmentManager.beginIndex = lowestIndex
fc.segmentManager.watermark = lowestIndex
fc.segmentManager.maxReadIndex = int64(lowestIndex) - 1
}

toCompress := make([]uint32, 0, 4)
Expand All @@ -1028,7 +1034,11 @@ func (fc *FileChannel) Open() error {
case slices.Equal(states, []SegmentFileState{Plain, Compressed}):
_ = os.Remove(fc.segmentManager.SegmentFile(index, Plain))
case slices.Equal(states, []SegmentFileState{Plain}):
if index != fc.segmentManager.CurrentSegmentIndex() {
// If the file is not the last and second last file, compress it.
// The rule aligns with the rotation so that there won't be duplicated compressing:
// when the current file is rotated, the previous file is compressed.
// And the current rotated files are [0, currentIndex-1].
if index+1 < fc.segmentManager.CurrentSegmentIndex() {
toCompress = append(toCompress, index)
}
case slices.Equal(states, []SegmentFileState{Compressed}):
Expand Down

0 comments on commit 42d0372

Please sign in to comment.