Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ type consumer struct {
rdqi avl.SequenceSet
rdc map[uint64]uint64
replies map[uint64]string
pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
Expand Down Expand Up @@ -1533,6 +1534,7 @@ func (o *consumer) setLeader(isLeader bool) {
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
o.resetPendingDeliveries()
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
Expand Down Expand Up @@ -2170,6 +2172,11 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
if cfg.MaxAckPending != o.cfg.MaxAckPending {
o.maxp = cfg.MaxAckPending
o.signalNewMessages()
// If MaxAckPending is lowered, we could have allocated a pending deliveries map of larger size.
// Reset it here, so we can shrink the map.
if cfg.MaxAckPending < o.cfg.MaxAckPending {
o.resetPendingDeliveries()
}
}
// AckWait
if cfg.AckWait != o.cfg.AckWait {
Expand Down Expand Up @@ -2529,6 +2536,16 @@ func (o *consumer) addAckReply(sseq uint64, reply string) {
o.replies[sseq] = reply
}

// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this comment is true?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream sequence is used as a key, which is the same for o.pending. That means it can only ever become as large as max length of o.pending, i.e. MaxAckPending.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so when new message comes into stream and we kick the consumer we just bail because we hit max pending yes?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o.getNextMsg bails based on hitting max pending, indeed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood but I am wondering if we should short circuit in stream's signalConsumers()? WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially be an additional optimization, although can't say how large the gain would be.
Either way I think it should be in a different PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that PR after this one that short circuits.

if o.pendingDeliveries == nil {
o.pendingDeliveries = make(map[uint64]*jsPubMsg)
}
o.pendingDeliveries[pmsg.seq] = pmsg
}

// Lock should be held.
func (o *consumer) updateAcks(dseq, sseq uint64, reply string) {
if o.node != nil {
Expand Down Expand Up @@ -3089,7 +3106,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}

// Check if this ack is above the current pointer to our next to deliver.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq >= o.sseq {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
Expand Down Expand Up @@ -4842,7 +4858,14 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}

// Send message.
o.outq.send(pmsg)
// If we're replicated we MUST only send the message AFTER we've got quorum for updating
// delivered state. Otherwise, we could be in an invalid state after a leader change.
// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
if o.node == nil || ap == AckNone || o.cfg.FlowControl {
o.outq.send(pmsg)
} else {
o.addReplicatedQueuedMsg(pmsg)
}

// Flow control.
if o.maxpb > 0 && o.needFlowControl(psz) {
Expand Down Expand Up @@ -6104,3 +6127,10 @@ func (o *consumer) stopAndClearPtmr() {
stopAndClearTimer(&o.ptmr)
o.ptmrEnd = time.Time{}
}

func (o *consumer) resetPendingDeliveries() {
for _, pmsg := range o.pendingDeliveries {
pmsg.returnToPool()
}
o.pendingDeliveries = nil
}
5 changes: 5 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5043,6 +5043,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.mu.Lock()
err = o.store.UpdateDelivered(dseq, sseq, dc, ts)
o.ldt = time.Now()
// Need to send message to the client, since we have quorum to do so now.
if pmsg, ok := o.pendingDeliveries[sseq]; ok {
o.outq.send(pmsg)
delete(o.pendingDeliveries, sseq)
}
o.mu.Unlock()
if err != nil {
panic(err.Error())
Expand Down
124 changes: 124 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8118,6 +8118,130 @@ func TestJetStreamClusterInvalidJSACKOverRoute(t *testing.T) {
}
}

func TestJetStreamClusterConsumerOnlyDeliverMsgAfterQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
AckWait: 2 * time.Second,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

checkFor(t, time.Second, 100*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
acc, err := cl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)
rn := o.raftNode().(*raft)

// Force the leader to not be able to make proposals.
rn.Lock()
rn.werr = errors.New("block proposals")
rn.Unlock()

sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
defer sub.Unsubscribe()

// We must only receive a message AFTER quorum was met for updating delivered state.
// This should time out since proposals are blocked.
msgs, err := sub.Fetch(1, nats.MaxWait(2*time.Second))
require_Error(t, err, nats.ErrTimeout)
require_Len(t, len(msgs), 0)

// Allow proposals to be made again.
rn.Lock()
rn.werr = nil
rn.Unlock()

// Now it should pass.
msgs, err = sub.Fetch(1, nats.MaxWait(2*time.Second))
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NoError(t, msgs[0].AckSync())
}

func TestJetStreamClusterConsumerResetPendingDeliveriesOnMaxAckPendingUpdate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Replicas: 3,
})
require_NoError(t, err)

cfg := &nats.ConsumerConfig{
Durable: "CONSUMER",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
AckWait: 2 * time.Second,
MaxAckPending: 100,
}
_, err = js.AddConsumer("TEST", cfg)
require_NoError(t, err)

cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
acc, err := cl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)

o.mu.Lock()
o.pendingDeliveries = map[uint64]*jsPubMsg{0: getJSPubMsgFromPool()}
o.mu.Unlock()

// Increasing does not reset pending deliveries.
cfg.MaxAckPending++
_, err = js.UpdateConsumer("TEST", cfg)
require_NoError(t, err)

o.mu.Lock()
l := len(o.pendingDeliveries)
o.mu.Unlock()
require_Equal(t, l, 1)

// Decreasing does reset pending deliveries, so we can shrink the map.
cfg.MaxAckPending--
_, err = js.UpdateConsumer("TEST", cfg)
require_NoError(t, err)

o.mu.Lock()
l = len(o.pendingDeliveries)
o.mu.Unlock()
require_Equal(t, l, 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
4 changes: 2 additions & 2 deletions server/norace_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5078,7 +5078,7 @@ func TestNoRaceJetStreamPullConsumersAndInteriorDeletes(t *testing.T) {
Durable: "foo",
FilterSubject: "foo",
MaxAckPending: 20000,
AckWait: time.Minute,
AckWait: 5 * time.Second,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
Expand Down Expand Up @@ -5147,7 +5147,7 @@ func TestNoRaceJetStreamPullConsumersAndInteriorDeletes(t *testing.T) {
case <-ch:
// OK
case <-time.After(30 * time.Second):
t.Fatalf("Consumers took too long to consumer all messages")
t.Fatalf("Consumers took too long to consume all messages")
}
}

Expand Down