diff --git a/server/consumer.go b/server/consumer.go index af3e34d69e3..987417b5a1b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -350,6 +350,7 @@ type consumer struct { rdq []uint64 rdqi avl.SequenceSet rdc map[uint64]uint64 + replies map[uint64]string maxdc uint64 waiting *waitQueue cfg ConsumerConfig @@ -1188,6 +1189,7 @@ func (o *consumer) setLeader(isLeader bool) { o.mu.RLock() mset, closed := o.mset, o.closed movingToClustered := o.node != nil && o.pch == nil + movingToNonClustered := o.node == nil && o.pch != nil wasLeader := o.leader.Swap(isLeader) o.mu.RUnlock() @@ -1211,6 +1213,17 @@ func (o *consumer) setLeader(isLeader bool) { } } o.mu.Unlock() + } else if movingToNonClustered { + // We are moving from clustered to non-clustered now. + // Set pch to nil so if we scale back up we will recreate the loopAndForward from above. + o.mu.Lock() + pch := o.pch + o.pch = nil + select { + case pch <- struct{}{}: + default: + } + o.mu.Unlock() } return } @@ -1394,6 +1407,8 @@ func (o *consumer) setLeader(isLeader bool) { // If we were the leader make sure to drain queued up acks. if wasLeader { o.ackMsgs.drain() + // Also remove any pending replies since we should not be the one to respond at this point. + o.replies = nil } o.mu.Unlock() } @@ -2041,9 +2056,9 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool { // Helper to send a reply to an ack. func (o *consumer) sendAckReply(subj string) { - o.mu.Lock() - defer o.mu.Unlock() - o.sendAdvisory(subj, nil) + o.mu.RLock() + defer o.mu.RUnlock() + o.outq.sendMsg(subj, nil) } type jsAckMsg struct { @@ -2101,9 +2116,11 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - o.processAckMsg(sseq, dseq, dc, true) + o.processAckMsg(sseq, dseq, dc, reply, true) + // We handle replies for acks in updateAcks + skipAckReply = true case bytes.HasPrefix(msg, AckNext): - o.processAckMsg(sseq, dseq, dc, true) + o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) o.processNextMsgRequest(reply, msg[len(AckNext):]) skipAckReply = true case bytes.HasPrefix(msg, AckNak): @@ -2115,7 +2132,9 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { if buf := msg[len(AckTerm):]; len(buf) > 0 { reason = string(bytes.TrimSpace(buf)) } - o.processTerm(sseq, dseq, dc, reason) + o.processTerm(sseq, dseq, dc, reason, reply) + // We handle replies for acks in updateAcks + skipAckReply = true } // Ack the ack if requested. @@ -2150,6 +2169,13 @@ func (o *consumer) updateSkipped(seq uint64) { } func (o *consumer) loopAndForwardProposals(qch chan struct{}) { + // On exit make sure we nil out pch. + defer func() { + o.mu.Lock() + o.pch = nil + o.mu.Unlock() + }() + o.mu.RLock() node, pch := o.node, o.pch o.mu.RUnlock() @@ -2160,7 +2186,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { forwardProposals := func() error { o.mu.Lock() - if o.node != node || node.State() != Leader { + if o.node == nil || o.node.State() != Leader { o.mu.Unlock() return errors.New("no longer leader") } @@ -2247,8 +2273,17 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) { o.ldt = time.Now() } +// Used to remember a pending ack reply in a replicated consumer. +// Lock should be held. +func (o *consumer) addAckReply(sseq uint64, reply string) { + if o.replies == nil { + o.replies = make(map[uint64]string) + } + o.replies[sseq] = reply +} + // Lock should be held. -func (o *consumer) updateAcks(dseq, sseq uint64) { +func (o *consumer) updateAcks(dseq, sseq uint64, reply string) { if o.node != nil { // Inline for now, use variable compression. var b [2*binary.MaxVarintLen64 + 1]byte @@ -2257,8 +2292,15 @@ func (o *consumer) updateAcks(dseq, sseq uint64) { n += binary.PutUvarint(b[n:], dseq) n += binary.PutUvarint(b[n:], sseq) o.propose(b[:n]) + if reply != _EMPTY_ { + o.addAckReply(sseq, reply) + } } else if o.store != nil { o.store.UpdateAcks(dseq, sseq) + if reply != _EMPTY_ { + // Already locked so send direct. + o.outq.sendMsg(reply, nil) + } } // Update activity. o.lat = time.Now() @@ -2448,9 +2490,9 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { } // Process a TERM -func (o *consumer) processTerm(sseq, dseq, dc uint64, reason string) { +func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) { // Treat like an ack to suppress redelivery. - o.processAckMsg(sseq, dseq, dc, false) + o.processAckMsg(sseq, dseq, dc, reply, false) o.mu.Lock() defer o.mu.Unlock() @@ -2553,6 +2595,7 @@ func (o *consumer) applyState(state *ConsumerState) { // This is on startup or leader change. We want to check pending // sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond + // If normal is lower than this just use that. if o.cfg.AckWait < delay { delay = o.ackWait(0) @@ -2784,7 +2827,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { o.sendAdvisory(o.ackEventT, j) } -func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { +func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) { o.mu.Lock() if o.closed { o.mu.Unlock() @@ -2830,7 +2873,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { o.adflr = o.dseq - 1 } } - // We do these regardless. delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: @@ -2856,7 +2898,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { } // Update underlying store. - o.updateAcks(dseq, sseq) + o.updateAcks(dseq, sseq, reply) clustered := o.node != nil @@ -3748,7 +3790,7 @@ func (o *consumer) checkAckFloor() { o.mu.RUnlock() // If it was pending for us, get rid of it. if isPending { - o.processTerm(seq, p.Sequence, rdc, ackTermLimitsReason) + o.processTerm(seq, p.Sequence, rdc, ackTermLimitsReason, _EMPTY_) } } } else if numPending > 0 { @@ -3773,7 +3815,7 @@ func (o *consumer) checkAckFloor() { for i := 0; i < len(toTerm); i += 3 { seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2] - o.processTerm(seq, dseq, rdc, ackTermLimitsReason) + o.processTerm(seq, dseq, rdc, ackTermLimitsReason, _EMPTY_) } } @@ -3820,6 +3862,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { o.mu.RLock() s, mset := o.srv, o.mset hasInactiveThresh := o.cfg.InactiveThreshold > 0 + o.mu.RUnlock() if s == nil || mset == nil { @@ -3960,7 +4003,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { o.mu.Lock() // consumer is closed when mset is set to nil. - if o.mset == nil { + if o.closed || o.mset == nil { o.mu.Unlock() return } @@ -4363,7 +4406,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, if o.node == nil || o.cfg.Direct { mset.ackq.push(seq) } else { - o.updateAcks(dseq, seq) + o.updateAcks(dseq, seq, _EMPTY_) } } } @@ -5349,7 +5392,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { if wasPending { // We could have the lock for the stream so do this in a go routine. // TODO(dlc) - We should do this with ipq vs naked go routines. - go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason) + go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason, _EMPTY_) } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index bd8b19bfa60..6bde805cbc0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4790,9 +4790,9 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { o.checkStateForInterestStream() // Do a snapshot. doSnapshot(true) - // Synchronize followers to our state. Only send out if we have state. + // Synchronize followers to our state. Only send out if we have state and nothing pending. if n != nil { - if _, _, applied := n.Progress(); applied > 0 { + if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 { if snap, err := o.store.EncodedState(); err == nil { n.SendSnapshot(snap) } @@ -5015,6 +5015,13 @@ var errConsumerClosed = errors.New("consumer closed") func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { o.mu.Lock() + // Update activity. + o.lat = time.Now() + + // Do actual ack update to store. + // Always do this to have it recorded. + o.store.UpdateAcks(dseq, sseq) + mset := o.mset if o.closed || mset == nil { o.mu.Unlock() @@ -5025,11 +5032,11 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { return errStreamClosed } - // Update activity. - o.lat = time.Now() - - // Do actual ack update to store. - o.store.UpdateAcks(dseq, sseq) + // Check if we have a reply that was requested. + if reply := o.replies[sseq]; reply != _EMPTY_ { + o.outq.sendMsg(reply, nil) + delete(o.replies, sseq) + } if o.retention == LimitsPolicy { o.mu.Unlock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 767290c9c64..6b6ade74448 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -21,6 +21,7 @@ import ( "context" crand "crypto/rand" "encoding/json" + "errors" "fmt" "math/rand" "os" @@ -4061,8 +4062,10 @@ func TestJetStreamClusterScaleConsumer(t *testing.T) { checkFor(t, time.Second*30, time.Millisecond*250, func() error { if ci, err = js.ConsumerInfo("TEST", "DUR"); err != nil { return err + } else if ci.Cluster == nil { + return errors.New("no cluster info") } else if ci.Cluster.Leader == _EMPTY_ { - return fmt.Errorf("no leader") + return errors.New("no leader") } else if len(ci.Cluster.Replicas) != r-1 { return fmt.Errorf("not enough replica, got %d wanted %d", len(ci.Cluster.Replicas), r-1) } else { diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index a381ddd1777..ca77bff985e 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -1429,3 +1429,190 @@ func TestClusteredInterestConsumerFilterEdit(t *testing.T) { t.Fatalf("expected 1 message got %d", nfo.State.Msgs) } } + +func TestJetStreamClusterDoubleAckRedelivery(t *testing.T) { + conf := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + server_tags: ["test"] + system_account: sys + no_auth_user: js + accounts { + sys { users = [ { user: sys, pass: sys } ] } + js { + jetstream = enabled + users = [ { user: js, pass: js } ] + } + }` + c := createJetStreamClusterWithTemplate(t, conf, "R3F", 3) + defer c.shutdown() + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.LameDuckDuration = 15 * time.Second + s.opts.LameDuckGracePeriod = -15 * time.Second + s.optsMu.Unlock() + } + s := c.randomNonLeader() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + sc, err := js.AddStream(&nats.StreamConfig{ + Name: "LIMITS", + Subjects: []string{"foo.>"}, + Replicas: 3, + Storage: nats.FileStorage, + }) + require_NoError(t, err) + + stepDown := func() { + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, sc.Config.Name), nil, time.Second) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + producer := func(name string) { + wg.Add(1) + nc, js := jsClientConnect(t, s) + defer nc.Close() + defer wg.Done() + + i := 0 + payload := []byte(strings.Repeat("Z", 1024)) + for range time.NewTicker(1 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + msgID := nats.MsgId(fmt.Sprintf("%s:%d", name, i)) + js.PublishAsync("foo.bar", payload, msgID, nats.RetryAttempts(10)) + i++ + } + } + go producer("A") + go producer("B") + go producer("C") + + sub, err := js.PullSubscribe("foo.bar", "ABC", nats.AckWait(5*time.Second), nats.MaxAckPending(1000), nats.PullMaxWaiting(1000)) + if err != nil { + t.Fatal(err) + } + + type ackResult struct { + ack *nats.Msg + original *nats.Msg + redelivered *nats.Msg + } + received := make(map[string]int64) + acked := make(map[string]*ackResult) + errors := make(map[string]error) + extraRedeliveries := 0 + + wg.Add(1) + go func() { + nc, js = jsClientConnect(t, s) + defer nc.Close() + defer wg.Done() + + fetch := func(t *testing.T, batchSize int) { + msgs, err := sub.Fetch(batchSize, nats.MaxWait(500*time.Millisecond)) + if err != nil { + return + } + + for _, msg := range msgs { + meta, err := msg.Metadata() + if err != nil { + t.Error(err) + continue + } + + msgID := msg.Header.Get(nats.MsgIdHdr) + if meta.NumDelivered > 1 { + if err, ok := errors[msgID]; ok { + t.Logf("Redelivery after failed Ack Sync: %+v - %+v - error: %v", msg.Reply, msg.Header, err) + } else { + t.Logf("Redelivery: %+v - %+v", msg.Reply, msg.Header) + } + if resp, ok := acked[msgID]; ok { + t.Errorf("Redelivery after successful Ack Sync: msgID:%v - redelivered:%v - original:%+v - ack:%+v", + msgID, msg.Reply, resp.original.Reply, resp.ack) + resp.redelivered = msg + extraRedeliveries++ + } + } + received[msgID]++ + resp, err := nc.Request(msg.Reply, []byte("+ACK"), 500*time.Millisecond) + if err != nil { + errors[msgID] = err + } else { + acked[msgID] = &ackResult{resp, msg, nil} + } + } + } + + for { + select { + case <-ctx.Done(): + return + default: + } + fetch(t, 1) + fetch(t, 50) + } + }() + + // Cause a couple of step downs before the restarts as well. + time.AfterFunc(5*time.Second, func() { stepDown() }) + time.AfterFunc(10*time.Second, func() { stepDown() }) + + // Let messages be produced, and then restart the servers. + <-time.After(15 * time.Second) + +NextServer: + for _, s := range c.servers { + s.lameDuckMode() + s.WaitForShutdown() + s = c.restartServer(s) + + hctx, hcancel := context.WithTimeout(ctx, 60*time.Second) + defer hcancel() + for range time.NewTicker(2 * time.Second).C { + select { + case <-hctx.Done(): + t.Logf("WRN: Timed out waiting for healthz from %s", s) + continue NextServer + default: + } + + status := s.healthz(nil) + if status.StatusCode == 200 { + continue NextServer + } + } + // Pause in-between server restarts. + time.Sleep(10 * time.Second) + } + + // Stop all producer and consumer goroutines to check results. + cancel() + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + } + wg.Wait() + if extraRedeliveries > 0 { + t.Fatalf("Received %v redeliveries after a successful ack", extraRedeliveries) + } +}