Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions server/gsl/gsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ func (n *node[T]) isEmpty() bool {

// Return the number of nodes for the given level.
func (l *level[T]) numNodes() int {
if l == nil {
return 0
}
num := len(l.nodes)
if l.pwc != nil {
num++
Expand Down Expand Up @@ -489,39 +492,49 @@ func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
switch {
case r.fwc != nil:
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
case r.pwc != nil:
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.subs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next != nil && r.pwc.next.numNodes() > 0 {
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
default:
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
nsubj := append(nsubj, t...)
if len(n.subs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.subs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
if n.next != nil && n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions server/gsl/gsl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,35 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) {
require_NoDuplicates(t, got)
})

t.Run("PWCExtended", func(t *testing.T) {
got := map[string]int{}
sl := NewSublist[int]()
require_NoError(t, sl.Insert("stream.*.child", 11))
require_NoError(t, sl.Insert("stream.A", 22))
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
got[string(subj)]++
})
require_Len(t, len(got), 2)
require_NoDuplicates(t, got)
})

t.Run("PWCExtendedAggressive", func(t *testing.T) {
got := map[string]int{}
sl := NewSublist[int]()
require_NoError(t, sl.Insert("stream.A.child", 11))
require_NoError(t, sl.Insert("*.A.child", 22))
require_NoError(t, sl.Insert("stream.*.child", 22))
require_NoError(t, sl.Insert("stream.A.*", 22))
require_NoError(t, sl.Insert("stream.*.*", 22))
require_NoError(t, sl.Insert("*.A.*", 22))
require_NoError(t, sl.Insert("*.*.child", 22))
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
got[string(subj)]++
})
require_Len(t, len(got), 1)
require_NoDuplicates(t, got)
})

t.Run("FWCAll", func(t *testing.T) {
got := map[string]int{}
sl := NewSublist[int]()
Expand Down
55 changes: 55 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9634,6 +9634,61 @@ func TestJetStreamConsumerDeliverAllNonOverlappingFilterSubjects(t *testing.T) {
require_Equal(t, i.NumPending, 0)
}

// https://github.com/nats-io/nats-server/issues/7336
func TestJetStreamConsumerDeliverPartialOverlappingFilterSubjects(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnectNewAPI(t, s)
defer nc.Close()

ctx := context.Background()
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "TEST",
Subjects: []string{"stream.>"},
})
require_NoError(t, err)

publishMessageCount := 10
for i := 0; i < publishMessageCount; i++ {
_, err = js.Publish(ctx, "stream.A", nil)
require_NoError(t, err)
}

// Create consumer
consumer, err := js.CreateOrUpdateConsumer(ctx, "TEST", jetstream.ConsumerConfig{
DeliverPolicy: jetstream.DeliverAllPolicy,
FilterSubjects: []string{
"stream.A",
"stream.*.A",
},
})
require_NoError(t, err)

messages := make(chan jetstream.Msg)
cc, err := consumer.Consume(func(msg jetstream.Msg) {
messages <- msg
msg.Ack()
})
require_NoError(t, err)
defer cc.Drain()

var count = 0
for {
if count == publishMessageCount {
// All messages received.
return
}
select {
case <-messages:
count++
case <-time.After(2 * time.Second):
t.Errorf("Timeout reached, %d messages received. Exiting.", count)
return
}
}
}

func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down
49 changes: 31 additions & 18 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,9 @@ func (n *node) isEmpty() bool {

// Return the number of nodes for the given level.
func (l *level) numNodes() int {
if l == nil {
return 0
}
num := len(l.nodes)
if l.pwc != nil {
num++
Expand Down Expand Up @@ -1758,39 +1761,49 @@ func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb f
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
switch {
case r.fwc != nil:
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
case r.pwc != nil:
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next != nil && r.pwc.next.numNodes() > 0 {
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
default:
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
nsubj := append(nsubj, t...)
if len(n.psubs)+len(n.qsubs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.psubs)+len(n.qsubs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
if n.next != nil && n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}
29 changes: 29 additions & 0 deletions server/sublist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2086,6 +2086,35 @@ func TestSublistInterestBasedIntersection(t *testing.T) {
require_NoDuplicates(t, got)
})

t.Run("PWCExtended", func(t *testing.T) {
got := map[string]int{}
sl := NewSublistNoCache()
sl.Insert(newSub("stream.*.child"))
sl.Insert(newSub("stream.A"))
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
got[string(subj)]++
})
require_Len(t, len(got), 2)
require_NoDuplicates(t, got)
})

t.Run("PWCExtendedAggressive", func(t *testing.T) {
got := map[string]int{}
sl := NewSublistNoCache()
sl.Insert(newSub("stream.A.child"))
sl.Insert(newSub("*.A.child"))
sl.Insert(newSub("stream.*.child"))
sl.Insert(newSub("stream.A.*"))
sl.Insert(newSub("stream.*.*"))
sl.Insert(newSub("*.A.*"))
sl.Insert(newSub("*.*.child"))
IntersectStree(st, sl, func(subj []byte, entry *struct{}) {
got[string(subj)]++
})
require_Len(t, len(got), 1)
require_NoDuplicates(t, got)
})

t.Run("FWCAll", func(t *testing.T) {
got := map[string]int{}
sl := NewSublistNoCache()
Expand Down
Loading