diff --git a/server/gsl/gsl.go b/server/gsl/gsl.go index 9fa413d7b67..55f9bad98bd 100644 --- a/server/gsl/gsl.go +++ b/server/gsl/gsl.go @@ -408,6 +408,9 @@ func (n *node[T]) isEmpty() bool { // Return the number of nodes for the given level. func (l *level[T]) numNodes() int { + if l == nil { + return 0 + } num := len(l.nodes) if l.pwc != nil { num++ @@ -489,39 +492,49 @@ func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T if len(nsubj) > 0 { nsubj = append(subj, '.') } - switch { - case r.fwc != nil: + if r.fwc != nil { // We've reached a full wildcard, do a FWC match on the stree at this point // and don't keep iterating downward. nsubj := append(nsubj, '>') st.Match(nsubj, cb) - case r.pwc != nil: + return + } + if r.pwc != nil { // We've found a partial wildcard. We'll keep iterating downwards, but first // check whether there's interest at this level (without triggering dupes) and // match if so. + var done bool nsubj := append(nsubj, '*') if len(r.pwc.subs) > 0 { st.Match(nsubj, cb) + done = true } - if r.pwc.next != nil && r.pwc.next.numNodes() > 0 { + if r.pwc.next.numNodes() > 0 { intersectStree(st, r.pwc.next, nsubj, cb) } - default: - // Normal node with subject literals, keep iterating. - for t, n := range r.nodes { - nsubj := append(nsubj, t...) - if len(n.subs) > 0 { - if subjectHasWildcard(bytesToString(nsubj)) { - st.Match(nsubj, cb) - } else { - if e, ok := st.Find(nsubj); ok { - cb(nsubj, e) - } + if done { + return + } + } + // Normal node with subject literals, keep iterating. + for t, n := range r.nodes { + if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 { + // A wildcard at the next level will already visit these descendents + // so skip so we don't callback the same subject more than once. + continue + } + nsubj := append(nsubj, t...) + if len(n.subs) > 0 { + if subjectHasWildcard(bytesToString(nsubj)) { + st.Match(nsubj, cb) + } else { + if e, ok := st.Find(nsubj); ok { + cb(nsubj, e) } } - if n.next != nil && n.next.numNodes() > 0 { - intersectStree(st, n.next, nsubj, cb) - } + } + if n.next.numNodes() > 0 { + intersectStree(st, n.next, nsubj, cb) } } } diff --git a/server/gsl/gsl_test.go b/server/gsl/gsl_test.go index 7ae5d2bb26c..f8b149ed2a8 100644 --- a/server/gsl/gsl_test.go +++ b/server/gsl/gsl_test.go @@ -397,6 +397,35 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) { require_NoDuplicates(t, got) }) + t.Run("PWCExtended", func(t *testing.T) { + got := map[string]int{} + sl := NewSublist[int]() + require_NoError(t, sl.Insert("stream.*.child", 11)) + require_NoError(t, sl.Insert("stream.A", 22)) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWCExtendedAggressive", func(t *testing.T) { + got := map[string]int{} + sl := NewSublist[int]() + require_NoError(t, sl.Insert("stream.A.child", 11)) + require_NoError(t, sl.Insert("*.A.child", 22)) + require_NoError(t, sl.Insert("stream.*.child", 22)) + require_NoError(t, sl.Insert("stream.A.*", 22)) + require_NoError(t, sl.Insert("stream.*.*", 22)) + require_NoError(t, sl.Insert("*.A.*", 22)) + require_NoError(t, sl.Insert("*.*.child", 22)) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 1) + require_NoDuplicates(t, got) + }) + t.Run("FWCAll", func(t *testing.T) { got := map[string]int{} sl := NewSublist[int]() diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 9928bd4cb38..cb7883d277d 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -9634,6 +9634,61 @@ func TestJetStreamConsumerDeliverAllNonOverlappingFilterSubjects(t *testing.T) { require_Equal(t, i.NumPending, 0) } +// https://github.com/nats-io/nats-server/issues/7336 +func TestJetStreamConsumerDeliverPartialOverlappingFilterSubjects(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnectNewAPI(t, s) + defer nc.Close() + + ctx := context.Background() + _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"stream.>"}, + }) + require_NoError(t, err) + + publishMessageCount := 10 + for i := 0; i < publishMessageCount; i++ { + _, err = js.Publish(ctx, "stream.A", nil) + require_NoError(t, err) + } + + // Create consumer + consumer, err := js.CreateOrUpdateConsumer(ctx, "TEST", jetstream.ConsumerConfig{ + DeliverPolicy: jetstream.DeliverAllPolicy, + FilterSubjects: []string{ + "stream.A", + "stream.*.A", + }, + }) + require_NoError(t, err) + + messages := make(chan jetstream.Msg) + cc, err := consumer.Consume(func(msg jetstream.Msg) { + messages <- msg + msg.Ack() + }) + require_NoError(t, err) + defer cc.Drain() + + var count = 0 + for { + if count == publishMessageCount { + // All messages received. + return + } + select { + case <-messages: + count++ + case <-time.After(2 * time.Second): + t.Errorf("Timeout reached, %d messages received. Exiting.", count) + return + } + } +} + func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/sublist.go b/server/sublist.go index 2bed802c180..f759cb084fb 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -995,6 +995,9 @@ func (n *node) isEmpty() bool { // Return the number of nodes for the given level. func (l *level) numNodes() int { + if l == nil { + return 0 + } num := len(l.nodes) if l.pwc != nil { num++ @@ -1758,39 +1761,49 @@ func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb f if len(nsubj) > 0 { nsubj = append(subj, '.') } - switch { - case r.fwc != nil: + if r.fwc != nil { // We've reached a full wildcard, do a FWC match on the stree at this point // and don't keep iterating downward. nsubj := append(nsubj, '>') st.Match(nsubj, cb) - case r.pwc != nil: + return + } + if r.pwc != nil { // We've found a partial wildcard. We'll keep iterating downwards, but first // check whether there's interest at this level (without triggering dupes) and // match if so. + var done bool nsubj := append(nsubj, '*') if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 { st.Match(nsubj, cb) + done = true } - if r.pwc.next != nil && r.pwc.next.numNodes() > 0 { + if r.pwc.next.numNodes() > 0 { intersectStree(st, r.pwc.next, nsubj, cb) } - default: - // Normal node with subject literals, keep iterating. - for t, n := range r.nodes { - nsubj := append(nsubj, t...) - if len(n.psubs)+len(n.qsubs) > 0 { - if subjectHasWildcard(bytesToString(nsubj)) { - st.Match(nsubj, cb) - } else { - if e, ok := st.Find(nsubj); ok { - cb(nsubj, e) - } + if done { + return + } + } + // Normal node with subject literals, keep iterating. + for t, n := range r.nodes { + if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 { + // A wildcard at the next level will already visit these descendents + // so skip so we don't callback the same subject more than once. + continue + } + nsubj := append(nsubj, t...) + if len(n.psubs)+len(n.qsubs) > 0 { + if subjectHasWildcard(bytesToString(nsubj)) { + st.Match(nsubj, cb) + } else { + if e, ok := st.Find(nsubj); ok { + cb(nsubj, e) } } - if n.next != nil && n.next.numNodes() > 0 { - intersectStree(st, n.next, nsubj, cb) - } + } + if n.next.numNodes() > 0 { + intersectStree(st, n.next, nsubj, cb) } } } diff --git a/server/sublist_test.go b/server/sublist_test.go index acc5b53af25..e1eaf23d432 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -2086,6 +2086,35 @@ func TestSublistInterestBasedIntersection(t *testing.T) { require_NoDuplicates(t, got) }) + t.Run("PWCExtended", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("stream.*.child")) + sl.Insert(newSub("stream.A")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWCExtendedAggressive", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("stream.A.child")) + sl.Insert(newSub("*.A.child")) + sl.Insert(newSub("stream.*.child")) + sl.Insert(newSub("stream.A.*")) + sl.Insert(newSub("stream.*.*")) + sl.Insert(newSub("*.A.*")) + sl.Insert(newSub("*.*.child")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 1) + require_NoDuplicates(t, got) + }) + t.Run("FWCAll", func(t *testing.T) { got := map[string]int{} sl := NewSublistNoCache()