diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 725b842ab8d..f6e148f566d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1413,13 +1413,13 @@ func (js *jetStream) monitorCluster() { go checkHealth() continue } - if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, err := js.applyMetaEntries(ce.Entries, ru); err == nil { var nb uint64 // Some entries can fail without an error when shutting down, don't move applied forward. if !js.isShuttingDown() { _, nb = n.Applied(ce.Index) } - if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { + if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) { doSnapshot() } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() @@ -1998,8 +1998,8 @@ func (ca *consumerAssignment) recoveryKey() string { return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name } -func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) { - var didSnap, didRemoveStream, didRemoveConsumer bool +func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, error) { + var didSnap bool isRecovering := js.isMetaRecovering() for _, e := range entries { @@ -2021,7 +2021,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2035,7 +2035,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2047,13 +2047,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) - didRemoveStream = true } case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2073,7 +2072,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2093,7 +2092,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2108,13 +2107,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } } else { js.processConsumerRemoval(ca) - didRemoveConsumer = true } case updateStreamOp: sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2124,16 +2122,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) - // Since an update can be lowering replica count, we want upper layer to treat - // similar to a removal and snapshot to collapse old entries. - didRemoveStream = true } default: panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0]))) } } } - return didSnap, didRemoveStream, didRemoveConsumer, nil + return didSnap, nil } func (rg *raftGroup) isMember(id string) bool { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 512296a9d6a..50b73756b11 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6641,13 +6641,13 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { } // Push recovery entries that create the stream & consumer. - _, _, _, err := js.applyMetaEntries(create, ru) + _, err := js.applyMetaEntries(create, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers), 1) // Now push another recovery entry that deletes the stream. The // entry that creates the consumer should now be gone. - _, _, _, err = js.applyMetaEntries(delete, ru) + _, err = js.applyMetaEntries(delete, ru) require_NoError(t, err) require_Len(t, len(ru.removeStreams), 1) require_Len(t, len(ru.updateConsumers), 0) @@ -6695,27 +6695,27 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { } // We created a file-based stream first, but deleted it shortly after. - _, _, _, err := js.applyMetaEntries(createFileStream, ru) + _, err := js.applyMetaEntries(createFileStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) // Now push another recovery entry that deletes the stream. // The file-based stream should not have been created. - _, _, _, err = js.applyMetaEntries(deleteFileStream, ru) + _, err = js.applyMetaEntries(deleteFileStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 0) require_Len(t, len(ru.removeStreams), 1) // Now stage a memory-based stream to be created. - _, _, _, err = js.applyMetaEntries(createMemoryStream, ru) + _, err = js.applyMetaEntries(createMemoryStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) require_Len(t, len(ru.updateConsumers), 0) // Also create a consumer on that memory-based stream. - _, _, _, err = js.applyMetaEntries(createConsumer, ru) + _, err = js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) @@ -6752,19 +6752,19 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } // Creating the consumer should append to update consumers list. - _, _, _, err := js.applyMetaEntries(createConsumer, ru) + _, err := js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers[":TEST"]), 1) require_Len(t, len(ru.removeConsumers), 0) // Deleting the consumer should append to remove consumers list and remove from update list. - _, _, _, err = js.applyMetaEntries(deleteConsumer, ru) + _, err = js.applyMetaEntries(deleteConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.removeConsumers[":TEST"]), 1) require_Len(t, len(ru.updateConsumers[":TEST"]), 0) // When re-creating the consumer, add to update list and remove from remove list. - _, _, _, err = js.applyMetaEntries(createConsumer, ru) + _, err = js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers[":TEST"]), 1) require_Len(t, len(ru.removeConsumers[":TEST"]), 0)