diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 2f6e07fa25a..90428c058b2 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6545,6 +6545,77 @@ func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) { } } +func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { + c := createJetStreamClusterExplicit(t, "WQ3", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"test"}, Replicas: 3}) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: 100, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy, + Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3}) + require_NoError(t, err) + + sendBatch := func(subject string, n int) { + for i := 0; i < n; i++ { + _, err = js.Publish(subject, []byte("OK")) + require_NoError(t, err) + } + } + // Populate each one. + sendBatch("test", 300) + + checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("wq") + require_NoError(t, err) + if si.State.Msgs != 100 { + return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State) + } + return nil + }) + + _, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + ss, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc")) + require_NoError(t, err) + // we must have at least one message on the transformed subject name (ie no timeout) + f := func(done chan bool) { + for i := 0; i < 300; i++ { + m, err := ss.Fetch(1, nats.MaxWait(3*time.Second)) + require_NoError(t, err) + time.Sleep(11 * time.Millisecond) + err = m[0].Ack() + require_NoError(t, err) + } + done <- true + } + + var doneChan = make(chan bool) + go f(doneChan) + + checkFor(t, 6*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("wq") + require_NoError(t, err) + if si.State.Msgs > 0 && si.State.Msgs <= 100 { + return fmt.Errorf("Expected 0 msgs, got: %d", si.State.Msgs) + } else if si.State.Msgs > 100 { + t.Fatalf("Got more than our 100 message limit: %+v", si.State) + } + return nil + }) + + select { + case <-doneChan: + ss.Drain() + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } +} + func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 21f834c0239..bb985f1ed73 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11896,6 +11896,78 @@ func TestJetStreamSourceBasics(t *testing.T) { }) } +func TestJetStreamSourceWorkingQueueWithLimit(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"test"}}) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: 100, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy, + Sources: []*nats.StreamSource{{Name: "test"}}}) + require_NoError(t, err) + + sendBatch := func(subject string, n int) { + for i := 0; i < n; i++ { + _, err = js.Publish(subject, []byte("OK")) + require_NoError(t, err) + } + } + // Populate each one. + sendBatch("test", 300) + + checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("wq") + require_NoError(t, err) + if si.State.Msgs != 100 { + return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State) + } + return nil + }) + + _, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + ss, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc")) + require_NoError(t, err) + // we must have at least one message on the transformed subject name (ie no timeout) + f := func(done chan bool) { + for i := 0; i < 300; i++ { + m, err := ss.Fetch(1, nats.MaxWait(3*time.Second)) + require_NoError(t, err) + time.Sleep(11 * time.Millisecond) + err = m[0].Ack() + require_NoError(t, err) + } + done <- true + } + + var doneChan = make(chan bool) + go f(doneChan) + + checkFor(t, 6*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("wq") + require_NoError(t, err) + if si.State.Msgs > 0 && si.State.Msgs <= 100 { + return fmt.Errorf("Expected 0 msgs, got: %d", si.State.Msgs) + } else if si.State.Msgs > 100 { + t.Fatalf("Got more than our 100 message limit: %+v", si.State) + } + return nil + }) + + select { + case <-doneChan: + ss.Drain() + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } +} + func TestJetStreamInputTransform(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown()