Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3304,11 +3304,10 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
// Check if we are filtered, and if so check if this is even applicable to us.
if isFiltered {
if subj == _EMPTY_ {
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
var err error
if subj, err = o.mset.store.SubjectForSeq(sseq); err != nil {
return false
}
subj = svp.subj
}
if !o.isFilteredMatch(subj) {
return false
Expand Down
18 changes: 18 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7399,6 +7399,24 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
return sm, nil
}

// SubjectForSeq will return what the subject is for this sequence if found.
func (fs *fileStore) SubjectForSeq(seq uint64) (string, error) {
fs.mu.RLock()
if seq < fs.state.FirstSeq {
fs.mu.RUnlock()
return _EMPTY_, ErrStoreMsgNotFound
}
var smv StoreMsg
mb := fs.selectMsgBlock(seq)
fs.mu.RUnlock()
if mb != nil {
if sm, _, _ := mb.fetchMsgNoCopy(seq, &smv); sm != nil {
return sm.subj, nil
}
}
return _EMPTY_, ErrStoreMsgNotFound
}

// LoadMsg will lookup the message by sequence number and return it if found.
func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
return fs.msgForSeq(seq, sm)
Expand Down
60 changes: 60 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9627,3 +9627,63 @@ func TestFileStoreUpdateConfigTTLState(t *testing.T) {
require_NoError(t, fs.UpdateConfig(&cfg))
require_Equal(t, fs.ttls, nil)
}

func TestFileStoreSubjectForSeq(t *testing.T) {
cfg := StreamConfig{
Name: "foo",
Subjects: []string{"foo.>"},
Storage: FileStorage,
}
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg)
require_NoError(t, err)
defer fs.Stop()

seq, _, err := fs.StoreMsg("foo.bar", nil, nil, 0)
require_NoError(t, err)
require_Equal(t, seq, 1)

_, err = fs.SubjectForSeq(0)
require_Error(t, err, ErrStoreMsgNotFound)

subj, err := fs.SubjectForSeq(1)
require_NoError(t, err)
require_Equal(t, subj, "foo.bar")

_, err = fs.SubjectForSeq(2)
require_Error(t, err, ErrStoreMsgNotFound)
}

func BenchmarkFileStoreSubjectAccesses(b *testing.B) {
fs, err := newFileStore(FileStoreConfig{StoreDir: b.TempDir()}, StreamConfig{
Name: "foo",
Subjects: []string{"foo.>"},
Storage: FileStorage,
})
require_NoError(b, err)
defer fs.Stop()

seq, _, err := fs.StoreMsg("foo.bar", nil, []byte{1, 2, 3, 4, 5}, 0)
require_NoError(b, err)
require_Equal(b, seq, 1)

b.Run("SubjectForSeq", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
subj, err := fs.SubjectForSeq(1)
require_NoError(b, err)
require_Equal(b, subj, "foo.bar")
}
})

b.Run("LoadMsg", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
// smv is deliberately inside the loop here because that's
// effectively what is happening with needAck.
var smv StoreMsg
sm, err := fs.LoadMsg(1, &smv)
require_NoError(b, err)
require_Equal(b, sm.subj, "foo.bar")
}
})
}
13 changes: 13 additions & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,19 @@ func (ms *memStore) deleteFirstMsg() bool {
return ms.removeMsg(ms.state.FirstSeq, false)
}

// SubjectForSeq will return what the subject is for this sequence if found.
func (ms *memStore) SubjectForSeq(seq uint64) (string, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if seq < ms.state.FirstSeq {
return _EMPTY_, ErrStoreMsgNotFound
}
if sm, ok := ms.msgs[seq]; ok {
return sm.subj, nil
}
return _EMPTY_, ErrStoreMsgNotFound
}

// LoadMsg will lookup the message by sequence number and return it if found.
func (ms *memStore) LoadMsg(seq uint64, smp *StoreMsg) (*StoreMsg, error) {
return ms.loadMsgLocked(seq, smp, true)
Expand Down
24 changes: 24 additions & 0 deletions server/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,30 @@ func TestMemStoreUpdateConfigTTLState(t *testing.T) {
require_Equal(t, ms.ttls, nil)
}

func TestMemStoreSubjectForSeq(t *testing.T) {
cfg := StreamConfig{
Name: "foo",
Subjects: []string{"foo.>"},
Storage: MemoryStorage,
}
ms, err := newMemStore(&cfg)
require_NoError(t, err)

seq, _, err := ms.StoreMsg("foo.bar", nil, nil, 0)
require_NoError(t, err)
require_Equal(t, seq, 1)

_, err = ms.SubjectForSeq(0)
require_Error(t, err, ErrStoreMsgNotFound)

subj, err := ms.SubjectForSeq(1)
require_NoError(t, err)
require_Equal(t, subj, "foo.bar")

_, err = ms.SubjectForSeq(2)
require_Error(t, err, ErrStoreMsgNotFound)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type StreamStore interface {
SubjectsTotals(filterSubject string) map[string]uint64
AllLastSeqs() ([]uint64, error)
MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error)
SubjectForSeq(seq uint64) (string, error)
NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64)
NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64)
State() StreamState
Expand Down