Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions server/jetstream_cluster_long_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package server

import (
"errors"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -497,3 +498,112 @@ func TestLongNRGChainOfBlocks(t *testing.T) {
}
}
}

func TestLongClusterWorkQueueMessagesNotSkipped(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

iterations := 500_000
stream := "s1"
subjf := "subj.>"
consumers := map[string]string{
"c1": "subj.c1",
"c2": "subj.c2.*",
"c3": "subj.c3",
}

sig := make(chan *nats.Msg, 900)

_, err := js.AddStream(&nats.StreamConfig{
Name: stream,
Storage: nats.FileStorage,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add testing MemoryStorage too to this test I think

Subjects: []string{subjf},
Retention: nats.WorkQueuePolicy,
Duplicates: time.Minute * 2,
Replicas: 3,
MaxAge: time.Hour,
})
require_NoError(t, err)

for name, subjf := range consumers {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err = js.AddConsumer(stream, &nats.ConsumerConfig{
Name: name,
FilterSubject: subjf,
Replicas: 3,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
})
require_NoError(t, err)

ps, err := js.PullSubscribe(subjf, "", nats.Bind(stream, name))
require_NoError(t, err)

go func() {
for {
msgs, err := ps.FetchBatch(300)
if errors.Is(err, nats.ErrTimeout) {
continue
}
if errors.Is(err, nats.ErrConnectionClosed) {
return // ... for when the test finishes
}
require_NoError(t, err)
for msg := range msgs.Messages() {
go func() {
time.Sleep(time.Millisecond * time.Duration(rand.Int31n(100)))
require_NoError(t, msg.Ack())
sig <- msg
}()
}
}
}()
}

go func() {
hdrs := nats.Header{}
for i := 1; i <= iterations; i++ {
// Pick a random consumer to hit this time (map iteration order is
// non-deterministic, but break to do it just once).
for _, subj := range consumers {
hdrs.Set("Nats-Msg-Id", fmt.Sprintf("msg-%d", i))
if strings.HasPrefix(subj, "*") {
subj = strings.Replace(subj, "*", fmt.Sprintf("%d", i), 1)
}
_, err := js.PublishMsg(&nats.Msg{
Subject: subj,
Header: hdrs,
})
require_NoError(t, err)
break
}
}
}()

for i := 1; i <= iterations; i++ {
if i%10000 == 0 {
t.Logf("%d messages out of %d", i, iterations)
}

select {
case <-sig:
case <-time.After(time.Second * 2):
si, err := js.StreamInfo(stream)
require_NoError(t, err)
t.Logf("Stream info: %+v", si.State)

for name := range consumers {
ci, err := js.ConsumerInfo(stream, name)
require_NoError(t, err)
t.Logf("Consumer %q info: %+v, %+v", name, ci.AckFloor, ci.Delivered)
}

t.Fatalf("Didn't receive message %d", i)
}
}
}