diff --git a/server/consumer.go b/server/consumer.go index 080ec7a1fdc..226e5a3177e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4519,7 +4519,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { var pre *waitingRequest for wr := wq.head; wr != nil; { // Check expiration. - if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { + expires := !wr.expires.IsZero() && now.After(wr.expires) + if (eos && wr.noWait) || expires { rdWait := o.replicateDeliveries() if rdWait { // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. @@ -4528,13 +4529,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } else { wd.pn, wd.pb = wr.n, wr.b } + // If we still need to wait for replicated deliveries, remove from waiting list. + if rdWait { + wr = remove(pre, wr) + continue + } } - if !rdWait { + // Normally it's a timeout. + if expires { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue + } else if wr.expires.IsZero() || wr.d > 0 { + // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. + // Return no messages instead, which is the same as if we'd rejected the pull request initially. + hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue } - wr = remove(pre, wr) - continue } // Now check interest. interest := wr.acc.sl.HasInterest(wr.interest) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index cb7883d277d..9cc79db264d 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10341,3 +10341,86 @@ func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) { o.mu.RUnlock() require_Equal(t, maxdc, 0) } + +// https://github.com/nats-io/nats-server/issues/7457 +func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Fiddle with the pending count such that the NoWait request will go through, + // and the "404 No Messages" will be sent when hitting the end of the stream. + o.mu.Lock() + o.npc++ + o.mu.Unlock() + + req := &JSApiConsumerGetNextRequest{NoWait: true} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} + +// https://github.com/nats-io/nats-server/issues/5373 +func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "foo") + require_Equal(t, string(msg.Data), "msg") + + // We requested two messages but the stream only contained 1. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +}