Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,13 +1413,13 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
if didSnap, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
var nb uint64
// Some entries can fail without an error when shutting down, don't move applied forward.
if !js.isShuttingDown() {
_, nb = n.Applied(ce.Index)
}
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) {
doSnapshot()
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
Expand Down Expand Up @@ -1998,8 +1998,8 @@ func (ca *consumerAssignment) recoveryKey() string {
return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name
}

func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) {
var didSnap, didRemoveStream, didRemoveConsumer bool
func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, error) {
var didSnap bool
isRecovering := js.isMetaRecovering()

for _, e := range entries {
Expand All @@ -2021,7 +2021,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -2035,7 +2035,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -2047,13 +2047,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.removeConsumers, key)
} else {
js.processStreamRemoval(sa)
didRemoveStream = true
}
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -2073,7 +2072,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -2093,7 +2092,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -2108,13 +2107,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
} else {
js.processConsumerRemoval(ca)
didRemoveConsumer = true
}
case updateStreamOp:
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -2124,16 +2122,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.removeStreams, key)
} else {
js.processUpdateStreamAssignment(sa)
// Since an update can be lowering replica count, we want upper layer to treat
// similar to a removal and snapshot to collapse old entries.
didRemoveStream = true
}
default:
panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0])))
}
}
}
return didSnap, didRemoveStream, didRemoveConsumer, nil
return didSnap, nil
}

func (rg *raftGroup) isMember(id string) bool {
Expand Down
18 changes: 9 additions & 9 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6641,13 +6641,13 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) {
}

// Push recovery entries that create the stream & consumer.
_, _, _, err := js.applyMetaEntries(create, ru)
_, err := js.applyMetaEntries(create, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers), 1)

// Now push another recovery entry that deletes the stream. The
// entry that creates the consumer should now be gone.
_, _, _, err = js.applyMetaEntries(delete, ru)
_, err = js.applyMetaEntries(delete, ru)
require_NoError(t, err)
require_Len(t, len(ru.removeStreams), 1)
require_Len(t, len(ru.updateConsumers), 0)
Expand Down Expand Up @@ -6695,27 +6695,27 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) {
}

// We created a file-based stream first, but deleted it shortly after.
_, _, _, err := js.applyMetaEntries(createFileStream, ru)
_, err := js.applyMetaEntries(createFileStream, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.removeStreams), 0)

// Now push another recovery entry that deletes the stream.
// The file-based stream should not have been created.
_, _, _, err = js.applyMetaEntries(deleteFileStream, ru)
_, err = js.applyMetaEntries(deleteFileStream, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 0)
require_Len(t, len(ru.removeStreams), 1)

// Now stage a memory-based stream to be created.
_, _, _, err = js.applyMetaEntries(createMemoryStream, ru)
_, err = js.applyMetaEntries(createMemoryStream, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.removeStreams), 0)
require_Len(t, len(ru.updateConsumers), 0)

// Also create a consumer on that memory-based stream.
_, _, _, err = js.applyMetaEntries(createConsumer, ru)
_, err = js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.removeStreams), 0)
Expand Down Expand Up @@ -6752,19 +6752,19 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) {
}

// Creating the consumer should append to update consumers list.
_, _, _, err := js.applyMetaEntries(createConsumer, ru)
_, err := js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers[":TEST"]), 1)
require_Len(t, len(ru.removeConsumers), 0)

// Deleting the consumer should append to remove consumers list and remove from update list.
_, _, _, err = js.applyMetaEntries(deleteConsumer, ru)
_, err = js.applyMetaEntries(deleteConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.removeConsumers[":TEST"]), 1)
require_Len(t, len(ru.updateConsumers[":TEST"]), 0)

// When re-creating the consumer, add to update list and remove from remove list.
_, _, _, err = js.applyMetaEntries(createConsumer, ru)
_, err = js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers[":TEST"]), 1)
require_Len(t, len(ru.removeConsumers[":TEST"]), 0)
Expand Down