Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ type consumer struct {
rdq []uint64
rdqi avl.SequenceSet
rdc map[uint64]uint64
replies map[uint64]string
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
Expand Down Expand Up @@ -1188,6 +1189,7 @@ func (o *consumer) setLeader(isLeader bool) {
o.mu.RLock()
mset, closed := o.mset, o.closed
movingToClustered := o.node != nil && o.pch == nil
movingToNonClustered := o.node == nil && o.pch != nil
wasLeader := o.leader.Swap(isLeader)
o.mu.RUnlock()

Expand All @@ -1211,6 +1213,17 @@ func (o *consumer) setLeader(isLeader bool) {
}
}
o.mu.Unlock()
} else if movingToNonClustered {
// We are moving from clustered to non-clustered now.
// Set pch to nil so if we scale back up we will recreate the loopAndForward from above.
o.mu.Lock()
pch := o.pch
o.pch = nil
select {
case pch <- struct{}{}:
default:
}
o.mu.Unlock()
}
return
}
Expand Down Expand Up @@ -1394,6 +1407,8 @@ func (o *consumer) setLeader(isLeader bool) {
// If we were the leader make sure to drain queued up acks.
if wasLeader {
o.ackMsgs.drain()
// Also remove any pending replies since we should not be the one to respond at this point.
o.replies = nil
}
o.mu.Unlock()
}
Expand Down Expand Up @@ -2041,9 +2056,9 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool {

// Helper to send a reply to an ack.
func (o *consumer) sendAckReply(subj string) {
o.mu.Lock()
defer o.mu.Unlock()
o.sendAdvisory(subj, nil)
o.mu.RLock()
defer o.mu.RUnlock()
o.outq.sendMsg(subj, nil)
}

type jsAckMsg struct {
Expand Down Expand Up @@ -2101,9 +2116,11 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {

switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.processAckMsg(sseq, dseq, dc, true)
o.processAckMsg(sseq, dseq, dc, reply, true)
// We handle replies for acks in updateAcks
skipAckReply = true
case bytes.HasPrefix(msg, AckNext):
o.processAckMsg(sseq, dseq, dc, true)
o.processAckMsg(sseq, dseq, dc, _EMPTY_, true)
o.processNextMsgRequest(reply, msg[len(AckNext):])
skipAckReply = true
case bytes.HasPrefix(msg, AckNak):
Expand All @@ -2115,7 +2132,9 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
if buf := msg[len(AckTerm):]; len(buf) > 0 {
reason = string(bytes.TrimSpace(buf))
}
o.processTerm(sseq, dseq, dc, reason)
o.processTerm(sseq, dseq, dc, reason, reply)
// We handle replies for acks in updateAcks
skipAckReply = true
}

// Ack the ack if requested.
Expand Down Expand Up @@ -2150,6 +2169,13 @@ func (o *consumer) updateSkipped(seq uint64) {
}

func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
// On exit make sure we nil out pch.
defer func() {
o.mu.Lock()
o.pch = nil
o.mu.Unlock()
}()

o.mu.RLock()
node, pch := o.node, o.pch
o.mu.RUnlock()
Expand All @@ -2160,7 +2186,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {

forwardProposals := func() error {
o.mu.Lock()
if o.node != node || node.State() != Leader {
if o.node == nil || o.node.State() != Leader {
o.mu.Unlock()
return errors.New("no longer leader")
}
Expand Down Expand Up @@ -2247,8 +2273,17 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
o.ldt = time.Now()
}

// Used to remember a pending ack reply in a replicated consumer.
// Lock should be held.
func (o *consumer) addAckReply(sseq uint64, reply string) {
if o.replies == nil {
o.replies = make(map[uint64]string)
}
o.replies[sseq] = reply
}

// Lock should be held.
func (o *consumer) updateAcks(dseq, sseq uint64) {
func (o *consumer) updateAcks(dseq, sseq uint64, reply string) {
if o.node != nil {
// Inline for now, use variable compression.
var b [2*binary.MaxVarintLen64 + 1]byte
Expand All @@ -2257,8 +2292,15 @@ func (o *consumer) updateAcks(dseq, sseq uint64) {
n += binary.PutUvarint(b[n:], dseq)
n += binary.PutUvarint(b[n:], sseq)
o.propose(b[:n])
if reply != _EMPTY_ {
o.addAckReply(sseq, reply)
}
} else if o.store != nil {
o.store.UpdateAcks(dseq, sseq)
if reply != _EMPTY_ {
// Already locked so send direct.
o.outq.sendMsg(reply, nil)
}
}
// Update activity.
o.lat = time.Now()
Expand Down Expand Up @@ -2448,9 +2490,9 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
}

// Process a TERM
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason string) {
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) {
// Treat like an ack to suppress redelivery.
o.processAckMsg(sseq, dseq, dc, false)
o.processAckMsg(sseq, dseq, dc, reply, false)

o.mu.Lock()
defer o.mu.Unlock()
Expand Down Expand Up @@ -2553,6 +2595,7 @@ func (o *consumer) applyState(state *ConsumerState) {
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s
delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond

// If normal is lower than this just use that.
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
Expand Down Expand Up @@ -2784,7 +2827,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
o.sendAdvisory(o.ackEventT, j)
}

func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
Expand Down Expand Up @@ -2830,7 +2873,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
o.adflr = o.dseq - 1
}
}
// We do these regardless.
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
case AckAll:
Expand All @@ -2856,7 +2898,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
}

// Update underlying store.
o.updateAcks(dseq, sseq)
o.updateAcks(dseq, sseq, reply)

clustered := o.node != nil

Expand Down Expand Up @@ -3748,7 +3790,7 @@ func (o *consumer) checkAckFloor() {
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc, ackTermLimitsReason)
o.processTerm(seq, p.Sequence, rdc, ackTermLimitsReason, _EMPTY_)
}
}
} else if numPending > 0 {
Expand All @@ -3773,7 +3815,7 @@ func (o *consumer) checkAckFloor() {

for i := 0; i < len(toTerm); i += 3 {
seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2]
o.processTerm(seq, dseq, rdc, ackTermLimitsReason)
o.processTerm(seq, dseq, rdc, ackTermLimitsReason, _EMPTY_)
}
}

Expand Down Expand Up @@ -3820,6 +3862,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
o.mu.RLock()
s, mset := o.srv, o.mset
hasInactiveThresh := o.cfg.InactiveThreshold > 0

o.mu.RUnlock()

if s == nil || mset == nil {
Expand Down Expand Up @@ -3960,7 +4003,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
o.mu.Lock()

// consumer is closed when mset is set to nil.
if o.mset == nil {
if o.closed || o.mset == nil {
o.mu.Unlock()
return
}
Expand Down Expand Up @@ -4363,7 +4406,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
if o.node == nil || o.cfg.Direct {
mset.ackq.push(seq)
} else {
o.updateAcks(dseq, seq)
o.updateAcks(dseq, seq, _EMPTY_)
}
}
}
Expand Down Expand Up @@ -5349,7 +5392,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
if wasPending {
// We could have the lock for the stream so do this in a go routine.
// TODO(dlc) - We should do this with ipq vs naked go routines.
go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason)
go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason, _EMPTY_)
}
}

Expand Down
21 changes: 14 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4790,9 +4790,9 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
o.checkStateForInterestStream()
// Do a snapshot.
doSnapshot(true)
// Synchronize followers to our state. Only send out if we have state.
// Synchronize followers to our state. Only send out if we have state and nothing pending.
if n != nil {
if _, _, applied := n.Progress(); applied > 0 {
if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
Expand Down Expand Up @@ -5015,6 +5015,13 @@ var errConsumerClosed = errors.New("consumer closed")

func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
o.mu.Lock()
// Update activity.
o.lat = time.Now()

// Do actual ack update to store.
// Always do this to have it recorded.
o.store.UpdateAcks(dseq, sseq)

mset := o.mset
if o.closed || mset == nil {
o.mu.Unlock()
Expand All @@ -5025,11 +5032,11 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
return errStreamClosed
}

// Update activity.
o.lat = time.Now()

// Do actual ack update to store.
o.store.UpdateAcks(dseq, sseq)
// Check if we have a reply that was requested.
if reply := o.replies[sseq]; reply != _EMPTY_ {
o.outq.sendMsg(reply, nil)
delete(o.replies, sseq)
}

if o.retention == LimitsPolicy {
o.mu.Unlock()
Expand Down
5 changes: 4 additions & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
crand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -4061,8 +4062,10 @@ func TestJetStreamClusterScaleConsumer(t *testing.T) {
checkFor(t, time.Second*30, time.Millisecond*250, func() error {
if ci, err = js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Cluster == nil {
return errors.New("no cluster info")
} else if ci.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("no leader")
return errors.New("no leader")
} else if len(ci.Cluster.Replicas) != r-1 {
return fmt.Errorf("not enough replica, got %d wanted %d", len(ci.Cluster.Replicas), r-1)
} else {
Expand Down
Loading