diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a40e9590cd1..800d190703d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1220,6 +1220,52 @@ type recoveryUpdates struct { updateConsumers map[string]map[string]*consumerAssignment } +func (ru *recoveryUpdates) removeStream(sa *streamAssignment) { + key := sa.recoveryKey() + ru.removeStreams[key] = sa + delete(ru.addStreams, key) + delete(ru.updateStreams, key) + delete(ru.updateConsumers, key) + delete(ru.removeConsumers, key) +} + +func (ru *recoveryUpdates) addStream(sa *streamAssignment) { + key := sa.recoveryKey() + ru.addStreams[key] = sa + delete(ru.removeStreams, key) +} + +func (ru *recoveryUpdates) updateStream(sa *streamAssignment) { + key := sa.recoveryKey() + ru.updateStreams[key] = sa + delete(ru.addStreams, key) + delete(ru.removeStreams, key) +} + +func (ru *recoveryUpdates) removeConsumer(ca *consumerAssignment) { + key := ca.recoveryKey() + skey := ca.streamRecoveryKey() + if _, ok := ru.removeConsumers[skey]; !ok { + ru.removeConsumers[skey] = map[string]*consumerAssignment{} + } + ru.removeConsumers[skey][key] = ca + if consumers, ok := ru.updateConsumers[skey]; ok { + delete(consumers, key) + } +} + +func (ru *recoveryUpdates) addOrUpdateConsumer(ca *consumerAssignment) { + key := ca.recoveryKey() + skey := ca.streamRecoveryKey() + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } + if _, ok := ru.updateConsumers[skey]; !ok { + ru.updateConsumers[skey] = map[string]*consumerAssignment{} + } + ru.updateConsumers[skey][key] = ca +} + // Called after recovery of the cluster on startup to check for any orphans. // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. @@ -1714,12 +1760,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) if isRecovering { - key := sa.recoveryKey() - ru.removeStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.updateStreams, key) - delete(ru.updateConsumers, key) - delete(ru.removeConsumers, key) + ru.removeStream(sa) } else { js.processStreamRemoval(sa) } @@ -1727,12 +1768,20 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { js.setStreamAssignmentRecovering(sa) - js.processStreamAssignment(sa) + if isRecovering { + ru.addStream(sa) + } else { + js.processStreamAssignment(sa) + } // We can simply process the consumers. for _, ca := range sa.consumers { js.setConsumerAssignmentRecovering(ca) - js.processConsumerAssignment(ca) + if isRecovering { + ru.addOrUpdateConsumer(ca) + } else { + js.processConsumerAssignment(ca) + } } } @@ -1741,10 +1790,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, sa := range saChk { js.setStreamAssignmentRecovering(sa) if isRecovering { - key := sa.recoveryKey() - ru.updateStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.removeStreams, key) + ru.updateStream(sa) } else { js.processUpdateStreamAssignment(sa) } @@ -1754,15 +1800,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, ca := range caDel { js.setConsumerAssignmentRecovering(ca) if isRecovering { - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if _, ok := ru.removeConsumers[skey]; !ok { - ru.removeConsumers[skey] = map[string]*consumerAssignment{} - } - ru.removeConsumers[skey][key] = ca - if consumers, ok := ru.updateConsumers[skey]; ok { - delete(consumers, key) - } + ru.removeConsumer(ca) } else { js.processConsumerRemoval(ca) } @@ -1770,15 +1808,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, ca := range caAdd { js.setConsumerAssignmentRecovering(ca) if isRecovering { - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if consumers, ok := ru.removeConsumers[skey]; ok { - delete(consumers, key) - } - if _, ok := ru.updateConsumers[skey]; !ok { - ru.updateConsumers[skey] = map[string]*consumerAssignment{} - } - ru.updateConsumers[skey][key] = ca + ru.addOrUpdateConsumer(ca) } else { js.processConsumerAssignment(ca) } @@ -2046,9 +2076,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - key := sa.recoveryKey() - ru.addStreams[key] = sa - delete(ru.removeStreams, key) + ru.addStream(sa) } else { js.processStreamAssignment(sa) } @@ -2060,12 +2088,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - key := sa.recoveryKey() - ru.removeStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.updateStreams, key) - delete(ru.updateConsumers, key) - delete(ru.removeConsumers, key) + ru.removeStream(sa) } else { js.processStreamRemoval(sa) } @@ -2077,15 +2100,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setConsumerAssignmentRecovering(ca) - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if consumers, ok := ru.removeConsumers[skey]; ok { - delete(consumers, key) - } - if _, ok := ru.updateConsumers[skey]; !ok { - ru.updateConsumers[skey] = map[string]*consumerAssignment{} - } - ru.updateConsumers[skey][key] = ca + ru.addOrUpdateConsumer(ca) } else { js.processConsumerAssignment(ca) } @@ -2117,15 +2132,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setConsumerAssignmentRecovering(ca) - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if _, ok := ru.removeConsumers[skey]; !ok { - ru.removeConsumers[skey] = map[string]*consumerAssignment{} - } - ru.removeConsumers[skey][key] = ca - if consumers, ok := ru.updateConsumers[skey]; ok { - delete(consumers, key) - } + ru.removeConsumer(ca) } else { js.processConsumerRemoval(ca) } @@ -2137,10 +2144,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - key := sa.recoveryKey() - ru.updateStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.removeStreams, key) + ru.updateStream(sa) } else { js.processUpdateStreamAssignment(sa) } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 250bacb7aab..61015f8b978 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4184,6 +4184,88 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } +func TestJetStreamClusterMetaSnapshotReCreateConsistency(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + scfg := &nats.StreamConfig{Name: "TEST", Replicas: 3} + _, err := js.AddStream(scfg) + require_NoError(t, err) + + ccfg := &nats.ConsumerConfig{Name: "consumer", Replicas: 3} + _, err = js.AddConsumer("TEST", ccfg) + require_NoError(t, err) + + ml := c.leader() + mjs := ml.getJetStream() + mjs.mu.Lock() + sa := mjs.streamAssignment(globalAccountName, "TEST") + ca := mjs.consumerAssignment(globalAccountName, "TEST", "consumer") + + oldStreamGroup := sa.Group.Name + oldConsumerGroup := ca.Group.Name + streamDelete := encodeDeleteStreamAssignment(sa) + + csa := sa.copyGroup() + cca := ca.copyGroup() + csa.Group.Name, csa.Config.Replicas = "new-group", 1 + cca.Group.Name, cca.Config.Replicas = "new-group", 1 + mjs.mu.Unlock() + + // Get the snapshot before removing the stream below so we can recover fresh. + snap, err := mjs.metaSnapshot() + require_NoError(t, err) + require_NoError(t, js.DeleteStream("TEST")) + nc.Close() + + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // Simulate recovering: + // - snapshot with a stream and consumer + // - normal entry deleting the stream + // - normal entry re-adding the stream and consumer under different configs + // This should result in a consistent state. + mjs.mu.Lock() + mjs.metaRecovering = true + mjs.mu.Unlock() + _, err = mjs.applyMetaEntries([]*Entry{ + newEntry(EntrySnapshot, snap), + newEntry(EntryNormal, streamDelete), + newEntry(EntryNormal, encodeAddStreamAssignment(csa)), + newEntry(EntryNormal, encodeAddConsumerAssignment(cca)), + }, ru) + require_NoError(t, err) + + // Recovery should contain the stream and consumer create. + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.updateConsumers), 1) + + // Process those updates. + for _, sa = range ru.addStreams { + mjs.processStreamAssignment(sa) + } + for _, cas := range ru.updateConsumers { + for _, ca = range cas { + mjs.processConsumerAssignment(ca) + } + } + + // Should not have created old Raft nodes during recovery. + n1 := ml.lookupRaftNode(oldStreamGroup) + n2 := ml.lookupRaftNode(oldConsumerGroup) + require_True(t, n1 == nil) + require_True(t, n2 == nil) +} + func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()