diff --git a/server/consumer.go b/server/consumer.go index ddc0f6b82d4..ed319324614 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3304,11 +3304,10 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { // Check if we are filtered, and if so check if this is even applicable to us. if isFiltered { if subj == _EMPTY_ { - var svp StoreMsg - if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { + var err error + if subj, err = o.mset.store.SubjectForSeq(sseq); err != nil { return false } - subj = svp.subj } if !o.isFilteredMatch(subj) { return false diff --git a/server/filestore.go b/server/filestore.go index 5ad6442ac3d..7b79584f46f 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7399,6 +7399,24 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop return sm, nil } +// SubjectForSeq will return what the subject is for this sequence if found. +func (fs *fileStore) SubjectForSeq(seq uint64) (string, error) { + fs.mu.RLock() + if seq < fs.state.FirstSeq { + fs.mu.RUnlock() + return _EMPTY_, ErrStoreMsgNotFound + } + var smv StoreMsg + mb := fs.selectMsgBlock(seq) + fs.mu.RUnlock() + if mb != nil { + if sm, _, _ := mb.fetchMsgNoCopy(seq, &smv); sm != nil { + return sm.subj, nil + } + } + return _EMPTY_, ErrStoreMsgNotFound +} + // LoadMsg will lookup the message by sequence number and return it if found. func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) { return fs.msgForSeq(seq, sm) diff --git a/server/filestore_test.go b/server/filestore_test.go index 22592e3f8e4..17aeeae79b6 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9627,3 +9627,63 @@ func TestFileStoreUpdateConfigTTLState(t *testing.T) { require_NoError(t, fs.UpdateConfig(&cfg)) require_Equal(t, fs.ttls, nil) } + +func TestFileStoreSubjectForSeq(t *testing.T) { + cfg := StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Storage: FileStorage, + } + fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg) + require_NoError(t, err) + defer fs.Stop() + + seq, _, err := fs.StoreMsg("foo.bar", nil, nil, 0) + require_NoError(t, err) + require_Equal(t, seq, 1) + + _, err = fs.SubjectForSeq(0) + require_Error(t, err, ErrStoreMsgNotFound) + + subj, err := fs.SubjectForSeq(1) + require_NoError(t, err) + require_Equal(t, subj, "foo.bar") + + _, err = fs.SubjectForSeq(2) + require_Error(t, err, ErrStoreMsgNotFound) +} + +func BenchmarkFileStoreSubjectAccesses(b *testing.B) { + fs, err := newFileStore(FileStoreConfig{StoreDir: b.TempDir()}, StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Storage: FileStorage, + }) + require_NoError(b, err) + defer fs.Stop() + + seq, _, err := fs.StoreMsg("foo.bar", nil, []byte{1, 2, 3, 4, 5}, 0) + require_NoError(b, err) + require_Equal(b, seq, 1) + + b.Run("SubjectForSeq", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + subj, err := fs.SubjectForSeq(1) + require_NoError(b, err) + require_Equal(b, subj, "foo.bar") + } + }) + + b.Run("LoadMsg", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + // smv is deliberately inside the loop here because that's + // effectively what is happening with needAck. + var smv StoreMsg + sm, err := fs.LoadMsg(1, &smv) + require_NoError(b, err) + require_Equal(b, sm.subj, "foo.bar") + } + }) +} diff --git a/server/memstore.go b/server/memstore.go index 581edb5e562..e90238a4b57 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1469,6 +1469,19 @@ func (ms *memStore) deleteFirstMsg() bool { return ms.removeMsg(ms.state.FirstSeq, false) } +// SubjectForSeq will return what the subject is for this sequence if found. +func (ms *memStore) SubjectForSeq(seq uint64) (string, error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + if seq < ms.state.FirstSeq { + return _EMPTY_, ErrStoreMsgNotFound + } + if sm, ok := ms.msgs[seq]; ok { + return sm.subj, nil + } + return _EMPTY_, ErrStoreMsgNotFound +} + // LoadMsg will lookup the message by sequence number and return it if found. func (ms *memStore) LoadMsg(seq uint64, smp *StoreMsg) (*StoreMsg, error) { return ms.loadMsgLocked(seq, smp, true) diff --git a/server/memstore_test.go b/server/memstore_test.go index 2f95ee3eb8b..a80c097e51b 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1312,6 +1312,30 @@ func TestMemStoreUpdateConfigTTLState(t *testing.T) { require_Equal(t, ms.ttls, nil) } +func TestMemStoreSubjectForSeq(t *testing.T) { + cfg := StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(&cfg) + require_NoError(t, err) + + seq, _, err := ms.StoreMsg("foo.bar", nil, nil, 0) + require_NoError(t, err) + require_Equal(t, seq, 1) + + _, err = ms.SubjectForSeq(0) + require_Error(t, err, ErrStoreMsgNotFound) + + subj, err := ms.SubjectForSeq(1) + require_NoError(t, err) + require_Equal(t, subj, "foo.bar") + + _, err = ms.SubjectForSeq(2) + require_Error(t, err, ErrStoreMsgNotFound) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/store.go b/server/store.go index c97e3afd020..b4df15605d8 100644 --- a/server/store.go +++ b/server/store.go @@ -112,6 +112,7 @@ type StreamStore interface { SubjectsTotals(filterSubject string) map[string]uint64 AllLastSeqs() ([]uint64, error) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) + SubjectForSeq(seq uint64) (string, error) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) State() StreamState