Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 65 additions & 61 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,52 @@ type recoveryUpdates struct {
updateConsumers map[string]map[string]*consumerAssignment
}

func (ru *recoveryUpdates) removeStream(sa *streamAssignment) {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
delete(ru.addStreams, key)
delete(ru.updateStreams, key)
delete(ru.updateConsumers, key)
delete(ru.removeConsumers, key)
}

func (ru *recoveryUpdates) addStream(sa *streamAssignment) {
key := sa.recoveryKey()
ru.addStreams[key] = sa
delete(ru.removeStreams, key)
}

func (ru *recoveryUpdates) updateStream(sa *streamAssignment) {
key := sa.recoveryKey()
ru.updateStreams[key] = sa
delete(ru.addStreams, key)
delete(ru.removeStreams, key)
}

func (ru *recoveryUpdates) removeConsumer(ca *consumerAssignment) {
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
if _, ok := ru.removeConsumers[skey]; !ok {
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
if consumers, ok := ru.updateConsumers[skey]; ok {
delete(consumers, key)
}
}

func (ru *recoveryUpdates) addOrUpdateConsumer(ca *consumerAssignment) {
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
if consumers, ok := ru.removeConsumers[skey]; ok {
delete(consumers, key)
}
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
ru.updateConsumers[skey][key] = ca
}

// Called after recovery of the cluster on startup to check for any orphans.
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
Expand Down Expand Up @@ -1714,25 +1760,28 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
for _, sa := range saDel {
js.setStreamAssignmentRecovering(sa)
if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
delete(ru.addStreams, key)
delete(ru.updateStreams, key)
delete(ru.updateConsumers, key)
delete(ru.removeConsumers, key)
ru.removeStream(sa)
} else {
js.processStreamRemoval(sa)
}
}
// Now do add for the streams. Also add in all consumers.
for _, sa := range saAdd {
js.setStreamAssignmentRecovering(sa)
js.processStreamAssignment(sa)
if isRecovering {
ru.addStream(sa)
} else {
js.processStreamAssignment(sa)
}

// We can simply process the consumers.
for _, ca := range sa.consumers {
js.setConsumerAssignmentRecovering(ca)
js.processConsumerAssignment(ca)
if isRecovering {
ru.addOrUpdateConsumer(ca)
} else {
js.processConsumerAssignment(ca)
}
}
}

Expand All @@ -1741,10 +1790,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
for _, sa := range saChk {
js.setStreamAssignmentRecovering(sa)
if isRecovering {
key := sa.recoveryKey()
ru.updateStreams[key] = sa
delete(ru.addStreams, key)
delete(ru.removeStreams, key)
ru.updateStream(sa)
} else {
js.processUpdateStreamAssignment(sa)
}
Expand All @@ -1754,31 +1800,15 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
for _, ca := range caDel {
js.setConsumerAssignmentRecovering(ca)
if isRecovering {
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
if _, ok := ru.removeConsumers[skey]; !ok {
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
if consumers, ok := ru.updateConsumers[skey]; ok {
delete(consumers, key)
}
ru.removeConsumer(ca)
} else {
js.processConsumerRemoval(ca)
}
}
for _, ca := range caAdd {
js.setConsumerAssignmentRecovering(ca)
if isRecovering {
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
if consumers, ok := ru.removeConsumers[skey]; ok {
delete(consumers, key)
}
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
ru.updateConsumers[skey][key] = ca
ru.addOrUpdateConsumer(ca)
} else {
js.processConsumerAssignment(ca)
}
Expand Down Expand Up @@ -2046,9 +2076,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
key := sa.recoveryKey()
ru.addStreams[key] = sa
delete(ru.removeStreams, key)
ru.addStream(sa)
} else {
js.processStreamAssignment(sa)
}
Expand All @@ -2060,12 +2088,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
key := sa.recoveryKey()
ru.removeStreams[key] = sa
delete(ru.addStreams, key)
delete(ru.updateStreams, key)
delete(ru.updateConsumers, key)
delete(ru.removeConsumers, key)
ru.removeStream(sa)
} else {
js.processStreamRemoval(sa)
}
Expand All @@ -2077,15 +2100,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
if consumers, ok := ru.removeConsumers[skey]; ok {
delete(consumers, key)
}
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
ru.updateConsumers[skey][key] = ca
ru.addOrUpdateConsumer(ca)
} else {
js.processConsumerAssignment(ca)
}
Expand Down Expand Up @@ -2117,15 +2132,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
if _, ok := ru.removeConsumers[skey]; !ok {
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
if consumers, ok := ru.updateConsumers[skey]; ok {
delete(consumers, key)
}
ru.removeConsumer(ca)
} else {
js.processConsumerRemoval(ca)
}
Expand All @@ -2137,10 +2144,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
key := sa.recoveryKey()
ru.updateStreams[key] = sa
delete(ru.addStreams, key)
delete(ru.removeStreams, key)
ru.updateStream(sa)
} else {
js.processUpdateStreamAssignment(sa)
}
Expand Down
82 changes: 82 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4184,6 +4184,88 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T
}
}

func TestJetStreamClusterMetaSnapshotReCreateConsistency(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

scfg := &nats.StreamConfig{Name: "TEST", Replicas: 3}
_, err := js.AddStream(scfg)
require_NoError(t, err)

ccfg := &nats.ConsumerConfig{Name: "consumer", Replicas: 3}
_, err = js.AddConsumer("TEST", ccfg)
require_NoError(t, err)

ml := c.leader()
mjs := ml.getJetStream()
mjs.mu.Lock()
sa := mjs.streamAssignment(globalAccountName, "TEST")
ca := mjs.consumerAssignment(globalAccountName, "TEST", "consumer")

oldStreamGroup := sa.Group.Name
oldConsumerGroup := ca.Group.Name
streamDelete := encodeDeleteStreamAssignment(sa)

csa := sa.copyGroup()
cca := ca.copyGroup()
csa.Group.Name, csa.Config.Replicas = "new-group", 1
cca.Group.Name, cca.Config.Replicas = "new-group", 1
mjs.mu.Unlock()

// Get the snapshot before removing the stream below so we can recover fresh.
snap, err := mjs.metaSnapshot()
require_NoError(t, err)
require_NoError(t, js.DeleteStream("TEST"))
nc.Close()

ru := &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]map[string]*consumerAssignment),
addStreams: make(map[string]*streamAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]map[string]*consumerAssignment),
}

// Simulate recovering:
// - snapshot with a stream and consumer
// - normal entry deleting the stream
// - normal entry re-adding the stream and consumer under different configs
// This should result in a consistent state.
mjs.mu.Lock()
mjs.metaRecovering = true
mjs.mu.Unlock()
_, err = mjs.applyMetaEntries([]*Entry{
newEntry(EntrySnapshot, snap),
newEntry(EntryNormal, streamDelete),
newEntry(EntryNormal, encodeAddStreamAssignment(csa)),
newEntry(EntryNormal, encodeAddConsumerAssignment(cca)),
}, ru)
require_NoError(t, err)

// Recovery should contain the stream and consumer create.
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.updateConsumers), 1)

// Process those updates.
for _, sa = range ru.addStreams {
mjs.processStreamAssignment(sa)
}
for _, cas := range ru.updateConsumers {
for _, ca = range cas {
mjs.processConsumerAssignment(ca)
}
}

// Should not have created old Raft nodes during recovery.
n1 := ml.lookupRaftNode(oldStreamGroup)
n2 := ml.lookupRaftNode(oldConsumerGroup)
require_True(t, n1 == nil)
require_True(t, n2 == nil)
}

func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
Loading