Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 82 additions & 18 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ type consumer struct {
rdqi avl.SequenceSet
rdc map[uint64]uint64
replies map[uint64]string
pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
waitingDeliveries map[string]*waitingDelivery // (Optional) request timeout messages that need to wait for replicated deliveries first.
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
Expand Down Expand Up @@ -2540,11 +2541,23 @@ func (o *consumer) addAckReply(sseq uint64, reply string) {
// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
// Is not explicitly limited in size, but will at most hold maximum ack pending.
if o.pendingDeliveries == nil {
o.pendingDeliveries = make(map[uint64]*jsPubMsg)
}
o.pendingDeliveries[pmsg.seq] = pmsg

// Is not explicitly limited in size, but will at most hold maximum waiting requests.
if o.waitingDeliveries == nil {
o.waitingDeliveries = make(map[string]*waitingDelivery)
}
if wd, ok := o.waitingDeliveries[pmsg.dsubj]; ok {
wd.seq = pmsg.seq
} else {
wd := wdPool.Get().(*waitingDelivery)
wd.seq = pmsg.seq
o.waitingDeliveries[pmsg.dsubj] = wd
}
}

// Lock should be held.
Expand Down Expand Up @@ -3446,6 +3459,28 @@ func (wr *waitingRequest) recycle() {
}
}

// Represents an (optional) request timeout that's sent after waiting for replicated deliveries.
type waitingDelivery struct {
seq uint64
pn int // Pending messages.
pb int // Pending bytes.
}

// sync.Pool for waiting deliveries.
var wdPool = sync.Pool{
New: func() any {
return new(waitingDelivery)
},
}

// Force a recycle.
func (wd *waitingDelivery) recycle() {
if wd != nil {
wd.seq, wd.pn, wd.pb = 0, 0, 0
wdPool.Put(wd)
}
}

// waiting queue for requests that are waiting for new messages to arrive.
type waitQueue struct {
n, max int
Expand Down Expand Up @@ -3721,8 +3756,19 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
} else {
// We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there.
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
rdWait := o.replicateDeliveries()
if rdWait {
// Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
if wd, ok := o.waitingDeliveries[wr.reply]; !ok {
rdWait = false
} else {
wd.pn, wd.pb = wr.n, wr.b
}
}
if !rdWait {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
o.waiting.removeCurrent()
if o.node != nil {
o.removeClusterPendingRequest(wr.reply)
Expand Down Expand Up @@ -4187,8 +4233,19 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
for wr := wq.head; wr != nil; {
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
rdWait := o.replicateDeliveries()
if rdWait {
// Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
if wd, ok := o.waitingDeliveries[wr.reply]; !ok {
rdWait = false
} else {
wd.pn, wd.pb = wr.n, wr.b
}
}
if !rdWait {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
wr = remove(pre, wr)
continue
}
Expand Down Expand Up @@ -4485,7 +4542,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
delay time.Duration
sz int
wrn, wrb int
wrNoWait bool
)

o.mu.Lock()
Expand Down Expand Up @@ -4564,7 +4620,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if o.isPushMode() {
dsubj = o.dsubj
} else if wr := o.nextWaiting(sz); wr != nil {
wrn, wrb, wrNoWait = wr.n, wr.b, wr.noWait
wrn, wrb = wr.n, wr.b
dsubj = wr.reply
if o.cfg.PriorityPolicy == PriorityPinnedClient {
// FIXME(jrm): Can we make this prettier?
Expand Down Expand Up @@ -4639,7 +4695,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}

// Do actual delivery.
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp, wrNoWait)
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)

// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
Expand Down Expand Up @@ -4838,7 +4894,7 @@ func convertToHeadersOnly(pmsg *jsPubMsg) {

// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy, wrNoWait bool) {
func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
if o.mset == nil {
pmsg.returnToPool()
return
Expand Down Expand Up @@ -4871,15 +4927,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}

// Send message.
// If we're replicated we MUST only send the message AFTER we've got quorum for updating
// delivered state. Otherwise, we could be in an invalid state after a leader change.
// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
// TODO(mvv): If NoWait we also bypass replicating first.
// Ideally we'd only send the NoWait request timeout after replication and delivery.
if o.node == nil || ap == AckNone || o.cfg.FlowControl || wrNoWait {
o.outq.send(pmsg)
} else {
if o.replicateDeliveries() {
o.addReplicatedQueuedMsg(pmsg)
} else {
o.outq.send(pmsg)
}

// Flow control.
Expand All @@ -4902,6 +4953,15 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}
}

// replicateDeliveries returns whether deliveries should be replicated before sending them.
// If we're replicated we MUST only send the message AFTER we've got quorum for updating
// delivered state. Otherwise, we could be in an invalid state after a leader change.
// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
// Lock should be held.
func (o *consumer) replicateDeliveries() bool {
return o.node != nil && o.cfg.AckPolicy != AckNone && !o.cfg.FlowControl
}

func (o *consumer) needFlowControl(sz int) bool {
if o.maxpb == 0 {
return false
Expand Down Expand Up @@ -6148,4 +6208,8 @@ func (o *consumer) resetPendingDeliveries() {
pmsg.returnToPool()
}
o.pendingDeliveries = nil
for _, wd := range o.waitingDeliveries {
wd.recycle()
}
o.waitingDeliveries = nil
}
12 changes: 12 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5058,8 +5058,20 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.ldt = time.Now()
// Need to send message to the client, since we have quorum to do so now.
if pmsg, ok := o.pendingDeliveries[sseq]; ok {
// Copy delivery subject and sequence first, as the send returns it to the pool and clears it.
dsubj, seq := pmsg.dsubj, pmsg.seq
o.outq.send(pmsg)
delete(o.pendingDeliveries, sseq)

// Might need to send a request timeout after sending the last replicated delivery.
if wd, ok := o.waitingDeliveries[dsubj]; ok && wd.seq == seq {
if wd.pn > 0 || wd.pb > 0 {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wd.pn, JSPullRequestPendingBytes, wd.pb)
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
wd.recycle()
delete(o.waitingDeliveries, dsubj)
}
}
o.mu.Unlock()
if err != nil {
Expand Down
66 changes: 35 additions & 31 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9688,44 +9688,48 @@ func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) {
}

func TestJetStreamConsumerPullNoWaitBatchLargerThanPending(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
test := func(t *testing.T, replicas int) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: replicas,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: "foo",
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: "foo",
Replicas: replicas,
})
require_NoError(t, err)

req := JSApiConsumerGetNextRequest{Batch: 10, NoWait: true}
req := JSApiConsumerGetNextRequest{Batch: 10, NoWait: true}

for range 5 {
_, err := js.Publish("foo", []byte("OK"))
require_NoError(t, err)
}
for range 5 {
_, err := js.Publish("foo", []byte("OK"))
require_NoError(t, err)
}

sub := sendRequest(t, nc, "rply", req)
defer sub.Unsubscribe()
sub := sendRequest(t, nc, "rply", req)
defer sub.Unsubscribe()

// Should get all 5 messages.
// TODO(mvv): Currently bypassing replicating first, need to figure out
// how to send NoWait's request timeout after replication.
for range 5 {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
if len(msg.Data) == 0 && msg.Header != nil {
t.Fatalf("Expected data, got: %s", msg.Header.Get("Description"))
// Should get all 5 messages.
for range 5 {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
if len(msg.Data) == 0 && msg.Header != nil {
t.Fatalf("Expected data, got: %s", msg.Header.Get("Description"))
}
}
}

t.Run("R1", func(t *testing.T) { test(t, 1) })
t.Run("R3", func(t *testing.T) { test(t, 3) })
}