Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6545,6 +6545,77 @@ func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) {
}
}

func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {
c := createJetStreamClusterExplicit(t, "WQ3", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"test"}, Replicas: 3})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: 100, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy,
Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3})
require_NoError(t, err)

sendBatch := func(subject string, n int) {
for i := 0; i < n; i++ {
_, err = js.Publish(subject, []byte("OK"))
require_NoError(t, err)
}
}
// Populate each one.
sendBatch("test", 300)

checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
require_NoError(t, err)
if si.State.Msgs != 100 {
return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State)
}
return nil
})

_, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

ss, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc"))
require_NoError(t, err)
// we must have at least one message on the transformed subject name (ie no timeout)
f := func(done chan bool) {
for i := 0; i < 300; i++ {
m, err := ss.Fetch(1, nats.MaxWait(3*time.Second))
require_NoError(t, err)
time.Sleep(11 * time.Millisecond)
err = m[0].Ack()
require_NoError(t, err)
}
done <- true
}

var doneChan = make(chan bool)
go f(doneChan)

checkFor(t, 6*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
require_NoError(t, err)
if si.State.Msgs > 0 && si.State.Msgs <= 100 {
return fmt.Errorf("Expected 0 msgs, got: %d", si.State.Msgs)
} else if si.State.Msgs > 100 {
t.Fatalf("Got more than our 100 message limit: %+v", si.State)
}
return nil
})

select {
case <-doneChan:
ss.Drain()
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}

func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
72 changes: 72 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11896,6 +11896,78 @@ func TestJetStreamSourceBasics(t *testing.T) {
})
}

func TestJetStreamSourceWorkingQueueWithLimit(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"test"}})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: 100, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy,
Sources: []*nats.StreamSource{{Name: "test"}}})
require_NoError(t, err)

sendBatch := func(subject string, n int) {
for i := 0; i < n; i++ {
_, err = js.Publish(subject, []byte("OK"))
require_NoError(t, err)
}
}
// Populate each one.
sendBatch("test", 300)

checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
require_NoError(t, err)
if si.State.Msgs != 100 {
return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State)
}
return nil
})

_, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

ss, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc"))
require_NoError(t, err)
// we must have at least one message on the transformed subject name (ie no timeout)
f := func(done chan bool) {
for i := 0; i < 300; i++ {
m, err := ss.Fetch(1, nats.MaxWait(3*time.Second))
require_NoError(t, err)
time.Sleep(11 * time.Millisecond)
err = m[0].Ack()
require_NoError(t, err)
}
done <- true
}

var doneChan = make(chan bool)
go f(doneChan)

checkFor(t, 6*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
require_NoError(t, err)
if si.State.Msgs > 0 && si.State.Msgs <= 100 {
return fmt.Errorf("Expected 0 msgs, got: %d", si.State.Msgs)
} else if si.State.Msgs > 100 {
t.Fatalf("Got more than our 100 message limit: %+v", si.State)
}
return nil
})

select {
case <-doneChan:
ss.Drain()
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}

func TestJetStreamInputTransform(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down