diff --git a/server/filestore.go b/server/filestore.go index 77499a6b079..33e0737c23b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2560,7 +2560,6 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock { // GetSeqFromTime looks for the first sequence number that has // the message with >= timestamp. -// FIXME(dlc) - inefficient, and dumb really. Make this better. func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { fs.mu.RLock() lastSeq := fs.state.LastSeq @@ -2580,14 +2579,17 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { lseq := atomic.LoadUint64(&mb.last.seq) var smv StoreMsg - - // Linear search, hence the dumb part.. ts := t.UnixNano() - for seq := fseq; seq <= lseq; seq++ { - sm, _, _ := mb.fetchMsgNoCopy(seq, &smv) - if sm != nil && sm.ts >= ts { - return sm.seq - } + + // Because sort.Search expects range [0,off), we have to manually + // calculate the offset from the first sequence. + off := int(lseq - fseq + 1) + i := sort.Search(off, func(i int) bool { + sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv) + return sm != nil && sm.ts >= ts + }) + if i < off { + return fseq + uint64(i) } return 0 } @@ -6886,14 +6888,25 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { fs.mu.RLock() defer fs.mu.RUnlock() - t := minTime.UnixNano() - for _, mb := range fs.blks { + // Binary search for first block where last.ts >= t. + i, _ := slices.BinarySearchFunc(fs.blks, minTime.UnixNano(), func(mb *msgBlock, target int64) int { mb.mu.RLock() - found := t <= mb.last.ts + last := mb.last.ts mb.mu.RUnlock() - if found { - return mb + switch { + case last < target: + return -1 + case last > target: + return 1 + default: + return 0 } + }) + + // BinarySearchFunc returns an insertion point if not found. + // Either way, i is the index of the first mb where mb.last.ts >= t. + if i < len(fs.blks) { + return fs.blks[i] } return nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index 5ae0881ad21..80b4ce5a0fb 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10534,3 +10534,55 @@ func TestFileStoreCorruptedNonOrderedSequences(t *testing.T) { }) } } + +func BenchmarkFileStoreGetSeqFromTime(b *testing.B) { + fs, err := newFileStore( + FileStoreConfig{ + StoreDir: b.TempDir(), + BlockSize: 16, + }, + StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Storage: FileStorage, + }, + ) + require_NoError(b, err) + defer fs.Stop() + + for range 4096 { + _, _, err := fs.StoreMsg("foo.bar", nil, []byte{1, 2, 3, 4, 5}, 0) + require_NoError(b, err) + } + + fs.mu.RLock() + fs.blks[0].mu.RLock() + fs.lmb.mu.RLock() + start := time.Unix(0, fs.blks[0].first.ts) + middle := time.Unix(0, fs.blks[0].first.ts+(fs.lmb.last.ts-fs.blks[0].first.ts)/2) + end := time.Unix(0, fs.lmb.last.ts) + fs.blks[0].mu.RUnlock() + fs.lmb.mu.RUnlock() + fs.mu.RUnlock() + + b.Run("Start", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + fs.GetSeqFromTime(start) + } + }) + + b.Run("Middle", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + fs.GetSeqFromTime(middle) + } + }) + + b.Run("End", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + fs.GetSeqFromTime(end) + } + }) +}