Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2560,7 +2560,6 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock {

// GetSeqFromTime looks for the first sequence number that has
// the message with >= timestamp.
// FIXME(dlc) - inefficient, and dumb really. Make this better.
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fs.mu.RLock()
lastSeq := fs.state.LastSeq
Expand All @@ -2580,14 +2579,17 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
lseq := atomic.LoadUint64(&mb.last.seq)

var smv StoreMsg

// Linear search, hence the dumb part..
ts := t.UnixNano()
for seq := fseq; seq <= lseq; seq++ {
sm, _, _ := mb.fetchMsgNoCopy(seq, &smv)
if sm != nil && sm.ts >= ts {
return sm.seq
}

// Because sort.Search expects range [0,off), we have to manually
// calculate the offset from the first sequence.
off := int(lseq - fseq + 1)
i := sort.Search(off, func(i int) bool {
sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv)
return sm != nil && sm.ts >= ts
})
if i < off {
return fseq + uint64(i)
}
return 0
}
Expand Down Expand Up @@ -6886,14 +6888,25 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
fs.mu.RLock()
defer fs.mu.RUnlock()

t := minTime.UnixNano()
for _, mb := range fs.blks {
// Binary search for first block where last.ts >= t.
i, _ := slices.BinarySearchFunc(fs.blks, minTime.UnixNano(), func(mb *msgBlock, target int64) int {
mb.mu.RLock()
found := t <= mb.last.ts
last := mb.last.ts
mb.mu.RUnlock()
if found {
return mb
switch {
case last < target:
return -1
case last > target:
return 1
default:
return 0
}
})

// BinarySearchFunc returns an insertion point if not found.
// Either way, i is the index of the first mb where mb.last.ts >= t.
if i < len(fs.blks) {
return fs.blks[i]
}
return nil
}
Expand Down
52 changes: 52 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10534,3 +10534,55 @@ func TestFileStoreCorruptedNonOrderedSequences(t *testing.T) {
})
}
}

func BenchmarkFileStoreGetSeqFromTime(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{
StoreDir: b.TempDir(),
BlockSize: 16,
},
StreamConfig{
Name: "foo",
Subjects: []string{"foo.>"},
Storage: FileStorage,
},
)
require_NoError(b, err)
defer fs.Stop()

for range 4096 {
_, _, err := fs.StoreMsg("foo.bar", nil, []byte{1, 2, 3, 4, 5}, 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use StoreMsgRaw() and pass a timestamp, and avoid the time.Sleep() at the end of the loop.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need to specify our own timestamps as even the nanosecond precision should be enough (the small block size ensures every message gets its own block), but good spot that I left the sleep in, I will remove it.

require_NoError(b, err)
}

fs.mu.RLock()
fs.blks[0].mu.RLock()
fs.lmb.mu.RLock()
start := time.Unix(0, fs.blks[0].first.ts)
middle := time.Unix(0, fs.blks[0].first.ts+(fs.lmb.last.ts-fs.blks[0].first.ts)/2)
end := time.Unix(0, fs.lmb.last.ts)
fs.blks[0].mu.RUnlock()
fs.lmb.mu.RUnlock()
fs.mu.RUnlock()

b.Run("Start", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
fs.GetSeqFromTime(start)
}
})

b.Run("Middle", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
fs.GetSeqFromTime(middle)
}
})

b.Run("End", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
fs.GetSeqFromTime(end)
}
})
}
Loading