diff --git a/server/jetstream_cluster_long_test.go b/server/jetstream_cluster_long_test.go index 447df501437..ffaa9723c8d 100644 --- a/server/jetstream_cluster_long_test.go +++ b/server/jetstream_cluster_long_test.go @@ -17,6 +17,7 @@ package server import ( + "errors" "fmt" "math/rand" "strings" @@ -497,3 +498,112 @@ func TestLongNRGChainOfBlocks(t *testing.T) { } } } + +func TestLongClusterWorkQueueMessagesNotSkipped(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + iterations := 500_000 + stream := "s1" + subjf := "subj.>" + consumers := map[string]string{ + "c1": "subj.c1", + "c2": "subj.c2.*", + "c3": "subj.c3", + } + + sig := make(chan *nats.Msg, 900) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Storage: nats.FileStorage, + Subjects: []string{subjf}, + Retention: nats.WorkQueuePolicy, + Duplicates: time.Minute * 2, + Replicas: 3, + MaxAge: time.Hour, + }) + require_NoError(t, err) + + for name, subjf := range consumers { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err = js.AddConsumer(stream, &nats.ConsumerConfig{ + Name: name, + FilterSubject: subjf, + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + }) + require_NoError(t, err) + + ps, err := js.PullSubscribe(subjf, "", nats.Bind(stream, name)) + require_NoError(t, err) + + go func() { + for { + msgs, err := ps.FetchBatch(300) + if errors.Is(err, nats.ErrTimeout) { + continue + } + if errors.Is(err, nats.ErrConnectionClosed) { + return // ... for when the test finishes + } + require_NoError(t, err) + for msg := range msgs.Messages() { + go func() { + time.Sleep(time.Millisecond * time.Duration(rand.Int31n(100))) + require_NoError(t, msg.Ack()) + sig <- msg + }() + } + } + }() + } + + go func() { + hdrs := nats.Header{} + for i := 1; i <= iterations; i++ { + // Pick a random consumer to hit this time (map iteration order is + // non-deterministic, but break to do it just once). + for _, subj := range consumers { + hdrs.Set("Nats-Msg-Id", fmt.Sprintf("msg-%d", i)) + if strings.HasPrefix(subj, "*") { + subj = strings.Replace(subj, "*", fmt.Sprintf("%d", i), 1) + } + _, err := js.PublishMsg(&nats.Msg{ + Subject: subj, + Header: hdrs, + }) + require_NoError(t, err) + break + } + } + }() + + for i := 1; i <= iterations; i++ { + if i%10000 == 0 { + t.Logf("%d messages out of %d", i, iterations) + } + + select { + case <-sig: + case <-time.After(time.Second * 2): + si, err := js.StreamInfo(stream) + require_NoError(t, err) + t.Logf("Stream info: %+v", si.State) + + for name := range consumers { + ci, err := js.ConsumerInfo(stream, name) + require_NoError(t, err) + t.Logf("Consumer %q info: %+v, %+v", name, ci.AckFloor, ci.Delivered) + } + + t.Fatalf("Didn't receive message %d", i) + } + } +}