From 82cb3edb03fde9b792bf991957c4ea46ffdb8fed Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 24 Apr 2025 10:33:03 +0100 Subject: [PATCH 1/2] Fix subject tree intersection to match literals when followed by additional wildcards Signed-off-by: Neil Twigg --- server/sublist.go | 17 ++++++++++++----- server/sublist_test.go | 19 +++++++++++++++++-- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index 004150aa9d7..985cc35144e 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1747,13 +1747,20 @@ func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj [ } func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) { + // This level could potentially match literals, despite being followed up by + // additional wildcards. For literals we can use Find since it is considerably + // faster. Then we can carry on checking for further matches in the usual way. + wc := subjectHasWildcard(bytesToString(subj)) + if !wc { + if e, ok := st.Find(subj); ok { + cb(subj, e) + } + } if r.numNodes() == 0 { - // For wildcards we can't avoid Match, but if it's a literal subject at - // this point, using Find is considerably cheaper. - if subjectHasWildcard(bytesToString(subj)) { + // No further recursions to be made at this point but there's still a wildcard + // to match, so let the subject tree work it out. + if wc { st.Match(subj, cb) - } else if e, ok := st.Find(subj); ok { - cb(subj, e) } return } diff --git a/server/sublist_test.go b/server/sublist_test.go index 5e8145c5ec7..e685894bc63 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1990,8 +1990,11 @@ func TestSublistInterestBasedIntersection(t *testing.T) { st.Insert([]byte("one.two.six"), struct{}{}) st.Insert([]byte("one.two.seven"), struct{}{}) st.Insert([]byte("eight.nine"), struct{}{}) + st.Insert([]byte("stream.A"), struct{}{}) + st.Insert([]byte("stream.A.child"), struct{}{}) require_NoDuplicates := func(t *testing.T, got map[string]int) { + t.Helper() for _, c := range got { require_Equal(t, c, 1) } @@ -2044,7 +2047,7 @@ func TestSublistInterestBasedIntersection(t *testing.T) { IntersectStree(st, sl, func(subj []byte, entry *struct{}) { got[string(subj)]++ }) - require_Len(t, len(got), 5) + require_Len(t, len(got), 7) require_NoDuplicates(t, got) }) @@ -2071,6 +2074,18 @@ func TestSublistInterestBasedIntersection(t *testing.T) { require_NoDuplicates(t, got) }) + t.Run("FWCExtended", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("stream.A.>")) + sl.Insert(newSub("stream.A")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + t.Run("FWCAll", func(t *testing.T) { got := map[string]int{} sl := NewSublistNoCache() @@ -2078,7 +2093,7 @@ func TestSublistInterestBasedIntersection(t *testing.T) { IntersectStree(st, sl, func(subj []byte, entry *struct{}) { got[string(subj)]++ }) - require_Len(t, len(got), 5) + require_Len(t, len(got), 7) require_NoDuplicates(t, got) }) From dc16bbfab05177479f7992ae8d86c7deb300ca07 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 24 Apr 2025 11:47:32 +0200 Subject: [PATCH 2/2] Test extended full wildcard Signed-off-by: Maurice van Veen --- server/jetstream_consumer_test.go | 58 +++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 1a1f4fa207c..48465091fb7 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -17,6 +17,7 @@ package server import ( "bytes" + "context" crand "crypto/rand" "encoding/base64" "encoding/json" @@ -34,6 +35,8 @@ import ( "testing" "time" + "github.com/nats-io/nats.go/jetstream" + "github.com/nats-io/nats.go" "github.com/nats-io/nuid" ) @@ -9473,3 +9476,58 @@ func TestJetStreamConsumerPullMaxBytes(t *testing.T) { checkHeader(m, badReq) checkSubsPending(t, sub, 0) } + +// https://github.com/nats-io/nats-server/issues/6824 +func TestJetStreamConsumerDeliverAllOverlappingFilterSubjects(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnectNewAPI(t, s) + defer nc.Close() + + ctx := context.Background() + _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"stream.>"}, + }) + require_NoError(t, err) + + publishMessageCount := 10 + for i := 0; i < publishMessageCount; i++ { + _, err = js.Publish(ctx, "stream.A", nil) + require_NoError(t, err) + } + + // Create consumer + consumer, err := js.CreateOrUpdateConsumer(ctx, "TEST", jetstream.ConsumerConfig{ + DeliverPolicy: jetstream.DeliverAllPolicy, + FilterSubjects: []string{ + "stream.A", + "stream.A.>", + }, + }) + require_NoError(t, err) + + messages := make(chan jetstream.Msg) + cc, err := consumer.Consume(func(msg jetstream.Msg) { + messages <- msg + msg.Ack() + }) + require_NoError(t, err) + defer cc.Drain() + + var count = 0 + for { + if count == publishMessageCount { + // All messages received. + return + } + select { + case <-messages: + count++ + case <-time.After(2 * time.Second): + t.Errorf("Timeout reached, %d messages received. Exiting.", count) + return + } + } +}