diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index e480db7080c..8b2e54cb168 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -10447,6 +10447,122 @@ func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) { } } +func TestJetStreamClusterNoInterestDesyncOnConsumerCreate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // Pick a random server that will not know about the new consumer being created. + // If servers determine "no interest" individually, these servers will desync. + rs := c.randomNonLeader() + sjs := rs.getJetStream() + meta := sjs.getMetaGroup() + require_NoError(t, meta.PauseApply()) + + sub, err := js.PullSubscribe(_EMPTY_, "DURABLE", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub.Drain() + + checkConsumersAssigned := func(expected int) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + var count int + for _, s := range c.servers { + _, _, jsa := s.globalAccount().getJetStreamFromAccount() + if jsa.consumerAssigned("TEST", "DURABLE") { + count++ + } + } + if count != expected { + return fmt.Errorf("expected %d, got %d", expected, count) + } + return nil + }) + } + // Confirm only two servers know about the consumer. + checkConsumersAssigned(2) + c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE") + + // Publish a single message. All servers will receive this, but only two will store it. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkLastSeq := func(lseq uint64) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + mset, err := s.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + if seq := mset.lastSeq(); seq != lseq { + return fmt.Errorf("expected %d, got %d", lseq, seq) + } + } + return nil + }) + } + checkLastSeq(1) + + // Resume the meta layer such that the consumer gets created on the remaining server. + meta.ResumeApply() + checkConsumersAssigned(3) + + // All servers will now store another published message. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkLastSeq(2) + + // Make sure the consumer leader is on the same server that didn't store the first message. + cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE") + if cl != rs { + mset, err := cl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("DURABLE") + require_NotNil(t, o) + n := o.raftNode() + require_NoError(t, n.StepDown(rs.NodeName())) + c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE") + cl = c.consumerLeader(globalAccountName, "TEST", "DURABLE") + require_Equal(t, cl, rs) + } + + // Since the consumer leader is the same as the server that didn't store the first message, + // it can only receive and ack the second message. + msgs, err := sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + metadata, err := msgs[0].Metadata() + require_NoError(t, err) + require_Equal(t, metadata.Sequence.Stream, 2) + require_Equal(t, metadata.NumPending, 0) + require_NoError(t, msgs[0].AckSync()) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + // The servers will eventually be synced up again, but this relies on the interest state being checked. + for _, s := range c.servers { + if s == rs { + continue + } + mset, err := s.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + mset.checkInterestState() + } + return checkState(t, c, globalAccountName, "TEST") + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/stream.go b/server/stream.go index bbc9834593f..5c72329d065 100644 --- a/server/stream.go +++ b/server/stream.go @@ -7607,7 +7607,19 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) bool { // Only propose message deletion to the stream if we're consumer leader, otherwise all followers would also propose. // We must be the consumer leader, since we know for sure we've stored the message and don't register as pre-ack. if o != nil && !o.IsLeader() { + // Currently, interest-based streams can race on "no interest" because consumer creates/updates go over + // the meta layer and published messages go over the stream layer. Some servers could then either store + // or not store some initial set of messages that gained new interest. To get the stream back in sync, + // we allow moving the first sequence up. + // TODO(mvv): later on only the stream leader should determine "no interest" + interestRaiseFirst := mset.cfg.Retention == InterestPolicy && seq == state.FirstSeq mset.mu.Unlock() + if interestRaiseFirst { + if _, err := store.RemoveMsg(seq); err == ErrStoreEOF { + // This should not happen, but being pedantic. + mset.registerPreAckLock(o, seq) + } + } // Must still mark as removal if follower. If we become leader later, we must be able to retry the proposal. return true }