From f9e07205b51e736daeb5d391bf1da13a3439397d Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 6 Nov 2025 19:50:59 +0100 Subject: [PATCH] [FIXED] Consumer not found after meta recovery Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 6 ++- server/jetstream_cluster_4_test.go | 84 +++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 56fffc0ed5a..3fd1b8a4dfc 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1771,15 +1771,19 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { + consumers := sa.consumers js.setStreamAssignmentRecovering(sa) if isRecovering { + // Since we're recovering and storing up changes, we'll need to clear out these consumers. + // Some might be removed, and we'll recover those later, must not be able to remember them. + sa.consumers = nil ru.addStream(sa) } else { js.processStreamAssignment(sa) } // We can simply process the consumers. - for _, ca := range sa.consumers { + for _, ca := range consumers { js.setConsumerAssignmentRecovering(ca) if isRecovering { ru.addOrUpdateConsumer(ca) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index ea952989648..b275b0e9169 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4214,6 +4214,8 @@ func TestJetStreamClusterMetaSnapshotReCreateConsistency(t *testing.T) { cca := ca.copyGroup() csa.Group.Name, csa.Config.Replicas = "new-group", 1 cca.Group.Name, cca.Config.Replicas = "new-group", 1 + streamAdd := encodeAddStreamAssignment(csa) + consumerAdd := encodeAddConsumerAssignment(cca) mjs.mu.Unlock() // Get the snapshot before removing the stream below so we can recover fresh. @@ -4241,8 +4243,8 @@ func TestJetStreamClusterMetaSnapshotReCreateConsistency(t *testing.T) { _, err = mjs.applyMetaEntries([]*Entry{ newEntry(EntrySnapshot, snap), newEntry(EntryNormal, streamDelete), - newEntry(EntryNormal, encodeAddStreamAssignment(csa)), - newEntry(EntryNormal, encodeAddConsumerAssignment(cca)), + newEntry(EntryNormal, streamAdd), + newEntry(EntryNormal, consumerAdd), }, ru) require_NoError(t, err) @@ -4267,6 +4269,84 @@ func TestJetStreamClusterMetaSnapshotReCreateConsistency(t *testing.T) { require_True(t, n2 == nil) } +func TestJetStreamClusterMetaSnapshotConsumerDeleteConsistency(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + scfg := &nats.StreamConfig{Name: "TEST", Replicas: 1} + _, err := js.AddStream(scfg) + require_NoError(t, err) + + ccfg := &nats.ConsumerConfig{Name: "consumer", Replicas: 1} + _, err = js.AddConsumer("TEST", ccfg) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + mjs := sl.getJetStream() + mjs.mu.Lock() + ca := mjs.consumerAssignment(globalAccountName, "TEST", "consumer") + ca.Created = time.Time{} // Simulate this consumer existed for a while already. + deleteConsumer := encodeDeleteConsumerAssignment(ca) + mjs.mu.Unlock() + + // Get the snapshot before removing the stream below so we can recover fresh. + snap, err := mjs.metaSnapshot() + require_NoError(t, err) + require_NoError(t, js.DeleteStream("TEST")) + nc.Close() + + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // Simulate recovering: + // - snapshot with a stream and consumer + // - normal entry deleting the consumer + // This should result in a consistent state. + mjs.mu.Lock() + mjs.metaRecovering = true + mjs.mu.Unlock() + _, err = mjs.applyMetaEntries([]*Entry{ + newEntry(EntrySnapshot, snap), + newEntry(EntryNormal, deleteConsumer), + }, ru) + require_NoError(t, err) + + // Recovery should contain the stream create and the consumer delete. + require_Len(t, len(ru.updateConsumers), 1) + require_Len(t, len(ru.removeConsumers), 1) + require_Len(t, len(ru.addStreams), 1) + + // Process those updates. + for _, cas := range ru.updateConsumers { + require_Len(t, len(cas), 0) + } + for _, cas := range ru.removeConsumers { + for _, ca = range cas { + mjs.processConsumerRemoval(ca) + } + } + for _, sa := range ru.addStreams { + mjs.processStreamAssignment(sa) + } + mjs.clearMetaRecovering() + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + hs := sl.healthz(&HealthzOptions{}) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) // Would previously error with "consumer not found". + } + return nil + }) +} + func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()