From 7d6b0a8b24c52380de834c24154949098338f516 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 23 Oct 2025 14:20:36 +0200 Subject: [PATCH 1/2] [FIXED] Consumer send 404 No Messages on EOS Signed-off-by: Maurice van Veen --- server/consumer.go | 22 +++++++++++++---- server/jetstream_consumer_test.go | 40 +++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 080ec7a1fdc..b6721885089 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4519,7 +4519,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { var pre *waitingRequest for wr := wq.head; wr != nil; { // Check expiration. - if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { + expires := !wr.expires.IsZero() && now.After(wr.expires) + if (eos && wr.noWait) || expires { rdWait := o.replicateDeliveries() if rdWait { // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. @@ -4528,13 +4529,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } else { wd.pn, wd.pb = wr.n, wr.b } + // If we still need to wait for replicated deliveries, remove from waiting list. + if rdWait { + wr = remove(pre, wr) + continue + } } - if !rdWait { + // Normally it's a timeout. + if expires || !wr.noWait || wr.d > 0 { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue + } else if wr.expires.IsZero() { + // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. + // Return no messages instead, which is the same as if we'd rejected the pull request initially. + hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue } - wr = remove(pre, wr) - continue } // Now check interest. interest := wr.acc.sl.HasInterest(wr.interest) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index cb7883d277d..e6eae9d9753 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10341,3 +10341,43 @@ func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) { o.mu.RUnlock() require_Equal(t, maxdc, 0) } + +// https://github.com/nats-io/nats-server/issues/7457 +func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Fiddle with the pending count such that the NoWait request will go through, + // and the "404 No Messages" will be sent when hitting the end of the stream. + o.mu.Lock() + o.npc++ + o.mu.Unlock() + + req := &JSApiConsumerGetNextRequest{NoWait: true} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} From b5b86fb306fec4588f7d968fe1e479d7eb98e2b8 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 23 Oct 2025 17:28:37 +0200 Subject: [PATCH 2/2] (2.14) [FIXED] Consumer send 404 No Messages on EOS after delivering messages Signed-off-by: Maurice van Veen --- server/consumer.go | 4 +-- server/jetstream_consumer_test.go | 43 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index b6721885089..226e5a3177e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4536,12 +4536,12 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } } // Normally it's a timeout. - if expires || !wr.noWait || wr.d > 0 { + if expires { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) wr = remove(pre, wr) continue - } else if wr.expires.IsZero() { + } else if wr.expires.IsZero() || wr.d > 0 { // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. // Return no messages instead, which is the same as if we'd rejected the pull request initially. hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index e6eae9d9753..9cc79db264d 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10381,3 +10381,46 @@ func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { require_Equal(t, msg.Header.Get("Status"), "404") require_Equal(t, msg.Header.Get("Description"), "No Messages") } + +// https://github.com/nats-io/nats-server/issues/5373 +func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "foo") + require_Equal(t, string(msg.Data), "msg") + + // We requested two messages but the stream only contained 1. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +}