diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 032abd402d7..0ea01db0865 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "math" "math/rand" "os" @@ -136,14 +135,15 @@ type raftGroup struct { // streamAssignment is what the meta controller uses to assign streams to peers. type streamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` - Subject string `json:"subject,omitempty"` - Reply string `json:"reply,omitempty"` - Restore *StreamState `json:"restore_state,omitempty"` + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + ConfigJSON json.RawMessage `json:"stream"` + Config *StreamConfig `json:"-"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` + Subject string `json:"subject,omitempty"` + Reply string `json:"reply,omitempty"` + Restore *StreamState `json:"restore_state,omitempty"` // Internal consumers map[string]*consumerAssignment responded bool @@ -155,14 +155,13 @@ type streamAssignment struct { } type unsupportedStreamAssignment struct { - json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. reason string info StreamInfo sysc *client infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignment { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment { reason := "stopped" if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { @@ -170,7 +169,6 @@ func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte } } return &unsupportedStreamAssignment{ - json: json, reason: reason, info: StreamInfo{ Created: sa.Created, @@ -215,15 +213,16 @@ func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) { // consumerAssignment is what the meta controller uses to assign consumers to streams. type consumerAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Name string `json:"name"` - Stream string `json:"stream"` - Config *ConsumerConfig `json:"consumer"` - Group *raftGroup `json:"group"` - Subject string `json:"subject,omitempty"` - Reply string `json:"reply,omitempty"` - State *ConsumerState `json:"state,omitempty"` + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Name string `json:"name"` + Stream string `json:"stream"` + ConfigJSON json.RawMessage `json:"consumer"` + Config *ConsumerConfig `json:"-"` + Group *raftGroup `json:"group"` + Subject string `json:"subject,omitempty"` + Reply string `json:"reply,omitempty"` + State *ConsumerState `json:"state,omitempty"` // Internal responded bool recovering bool @@ -234,14 +233,13 @@ type consumerAssignment struct { } type unsupportedConsumerAssignment struct { - json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. reason string info ConsumerInfo sysc *client infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignment { +func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment { reason := "stopped" if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { @@ -249,7 +247,6 @@ func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsu } } return &unsupportedConsumerAssignment{ - json: json, reason: reason, info: ConsumerInfo{ Stream: ca.Stream, @@ -294,35 +291,13 @@ func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) { } type writeableConsumerAssignment struct { - consumerAssignment - // Internal - unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. -} - -func (wca *writeableConsumerAssignment) MarshalJSON() ([]byte, error) { - if wca.unsupportedJson != nil { - return wca.unsupportedJson, nil - } - return json.Marshal(wca.consumerAssignment) -} - -func (wca *writeableConsumerAssignment) UnmarshalJSON(data []byte) error { - var unsupported bool - var ca consumerAssignment - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&ca); err != nil { - unsupported = true - ca = consumerAssignment{} - if err = json.Unmarshal(data, &ca); err != nil { - return err - } - } - wca.consumerAssignment = ca - if unsupported || (wca.Config != nil && !supportsRequiredApiLevel(wca.Config.Metadata)) { - wca.unsupportedJson = data - } - return nil + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Name string `json:"name"` + Stream string `json:"stream"` + ConfigJSON json.RawMessage `json:"consumer"` + Group *raftGroup `json:"group"` + State *ConsumerState `json:"state,omitempty"` } // streamPurge is what the stream leader will replicate when purging a stream. @@ -1542,44 +1517,12 @@ func (js *jetStream) checkClusterSize() { // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { - backingStreamAssignment - // Internal - unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. -} - -type backingStreamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` - Consumers []*writeableConsumerAssignment -} - -func (wsa *writeableStreamAssignment) MarshalJSON() ([]byte, error) { - if wsa.unsupportedJson != nil { - return wsa.unsupportedJson, nil - } - return json.Marshal(wsa.backingStreamAssignment) -} - -func (wsa *writeableStreamAssignment) UnmarshalJSON(data []byte) error { - var unsupported bool - var bsa backingStreamAssignment - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&bsa); err != nil { - unsupported = true - bsa = backingStreamAssignment{} - if err = json.Unmarshal(data, &bsa); err != nil { - return err - } - } - wsa.backingStreamAssignment = bsa - if unsupported || (wsa.Config != nil && !supportsRequiredApiLevel(wsa.Config.Metadata)) { - wsa.unsupportedJson = data - } - return nil + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + ConfigJSON json.RawMessage `json:"stream"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` + Consumers []*writeableConsumerAssignment } func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConfig, bool) { @@ -1604,19 +1547,13 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { streams := make([]writeableStreamAssignment, 0, nsa) for _, asa := range cc.streams { for _, sa := range asa { - if sa.unsupported != nil && sa.unsupported.json != nil { - streams = append(streams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json}) - continue - } wsa := writeableStreamAssignment{ - backingStreamAssignment: backingStreamAssignment{ - Client: sa.Client.forAssignmentSnap(), - Created: sa.Created, - Config: sa.Config, - Group: sa.Group, - Sync: sa.Sync, - Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), - }, + Client: sa.Client.forAssignmentSnap(), + Created: sa.Created, + ConfigJSON: sa.ConfigJSON, + Group: sa.Group, + Sync: sa.Sync, + Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), } for _, ca := range sa.consumers { // Skip if the consumer is pending, we can't include it in our snapshot. @@ -1624,16 +1561,16 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { if ca.pending { continue } - if ca.unsupported != nil && ca.unsupported.json != nil { - wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json}) - nca++ - continue + wca := writeableConsumerAssignment{ + Client: ca.Client.forAssignmentSnap(), + Created: ca.Created, + Name: ca.Name, + Stream: ca.Stream, + ConfigJSON: ca.ConfigJSON, + Group: ca.Group, + State: ca.State, } - cca := *ca - cca.Stream = wsa.Config.Name // Needed for safe roll-backs. - cca.Client = cca.Client.forAssignmentSnap() - cca.Subject, cca.Reply = _EMPTY_, _EMPTY_ - wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{consumerAssignment: cca}) + wsa.Consumers = append(wsa.Consumers, &wca) nca++ } streams = append(streams, wsa) @@ -1685,30 +1622,25 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Build our new version here outside of js. streams := make(map[string]map[string]*streamAssignment) for _, wsa := range wsas { - fixCfgMirrorWithDedupWindow(wsa.Config) as := streams[wsa.Client.serviceAccount()] if as == nil { as = make(map[string]*streamAssignment) streams[wsa.Client.serviceAccount()] = as } - sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, Config: wsa.Config, Group: wsa.Group, Sync: wsa.Sync} - if wsa.unsupportedJson != nil { - sa.unsupported = newUnsupportedStreamAssignment(js.srv, sa, wsa.unsupportedJson) - } + sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, ConfigJSON: wsa.ConfigJSON, Group: wsa.Group, Sync: wsa.Sync} + decodeStreamAssignmentConfig(js.srv, sa) if len(wsa.Consumers) > 0 { sa.consumers = make(map[string]*consumerAssignment) for _, wca := range wsa.Consumers { if wca.Stream == _EMPTY_ { wca.Stream = sa.Config.Name // Rehydrate from the stream name. } - ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, Config: wca.Config, Group: wca.Group, Subject: wca.Subject, Reply: wca.Reply, State: wca.State} - if wca.unsupportedJson != nil { - ca.unsupported = newUnsupportedConsumerAssignment(ca, wca.unsupportedJson) - } + ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, ConfigJSON: wca.ConfigJSON, Group: wca.Group, State: wca.State} + decodeConsumerAssignmentConfig(ca) sa.consumers[ca.Name] = ca } } - as[wsa.Config.Name] = sa + as[sa.Config.Name] = sa } js.mu.Lock() @@ -4879,7 +4811,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Mark stream as unsupported as well if sa.unsupported == nil { - sa.unsupported = newUnsupportedStreamAssignment(s, sa, nil) + sa.unsupported = newUnsupportedStreamAssignment(s, sa) } sa.unsupported.setupInfoSub(s, sa) js.mu.Unlock() @@ -8026,6 +7958,7 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset func encodeAddStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8035,6 +7968,7 @@ func encodeAddStreamAssignment(sa *streamAssignment) []byte { func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8044,6 +7978,7 @@ func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8051,23 +7986,35 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { } func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { - var unsupported bool var sa streamAssignment - decoder := json.NewDecoder(bytes.NewReader(buf)) + if err := json.Unmarshal(buf, &sa); err != nil { + return nil, err + } + if err := decodeStreamAssignmentConfig(s, &sa); err != nil { + return nil, err + } + return &sa, nil +} + +func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error { + var unsupported bool + var cfg StreamConfig + decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&sa); err != nil { + if err := decoder.Decode(&cfg); err != nil { unsupported = true - sa = streamAssignment{} - if err = json.Unmarshal(buf, &sa); err != nil { - return nil, err + cfg = StreamConfig{} + if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil { + return err } } + sa.Config = &cfg fixCfgMirrorWithDedupWindow(sa.Config) if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { - sa.unsupported = newUnsupportedStreamAssignment(s, &sa, copyBytes(buf)) + sa.unsupported = newUnsupportedStreamAssignment(s, sa) } - return &sa, nil + return nil } func encodeDeleteRange(dr *DeleteRange) []byte { @@ -8485,6 +8432,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) json.NewEncoder(&bb).Encode(cca) @@ -8494,6 +8442,7 @@ func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) json.NewEncoder(&bb).Encode(cca) @@ -8501,27 +8450,39 @@ func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { } func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { - var unsupported bool var ca consumerAssignment - decoder := json.NewDecoder(bytes.NewReader(buf)) + if err := json.Unmarshal(buf, &ca); err != nil { + return nil, err + } + if err := decodeConsumerAssignmentConfig(&ca); err != nil { + return nil, err + } + return &ca, nil +} + +func decodeConsumerAssignmentConfig(ca *consumerAssignment) error { + var unsupported bool + var cfg ConsumerConfig + decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&ca); err != nil { + if err := decoder.Decode(&cfg); err != nil { unsupported = true - ca = consumerAssignment{} - if err = json.Unmarshal(buf, &ca); err != nil { - return nil, err + cfg = ConsumerConfig{} + if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil { + return err } } - + ca.Config = &cfg if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(buf)) + ca.unsupported = newUnsupportedConsumerAssignment(ca) } - return &ca, nil + return nil } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) s2e := s2.NewWriter(&bb) @@ -8531,32 +8492,16 @@ func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { - var unsupported bool var ca consumerAssignment bb := bytes.NewBuffer(buf) s2d := s2.NewReader(bb) decoder := json.NewDecoder(s2d) - decoder.DisallowUnknownFields() if err := decoder.Decode(&ca); err != nil { - unsupported = true - ca = consumerAssignment{} - bb = bytes.NewBuffer(buf) - s2d = s2.NewReader(bb) - if err = json.NewDecoder(s2d).Decode(&ca); err != nil { - return nil, err - } + return nil, err } - - if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - bb = bytes.NewBuffer(buf) - s2d = s2.NewReader(bb) - dec, err := io.ReadAll(s2d) - if err != nil { - return nil, err - } - ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(dec)) + if err := decodeConsumerAssignmentConfig(&ca); err != nil { + return nil, err } - return &ca, nil } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index d1a5c4248a8..512296a9d6a 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9617,8 +9617,10 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas := getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) // Update a stream that's unsupported. sjs.mu.Lock() @@ -9639,8 +9641,10 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) // Deleting a stream should always work, even if it is unsupported. require_NoError(t, js.DeleteStream("DowngradeStreamTest")) @@ -9724,14 +9728,18 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 2) for _, wca := range wsas[0].Consumers { - if wca.Config.Name == "DowngradeConsumerTest" { - require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + nca := &consumerAssignment{ConfigJSON: wca.ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + if nca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) } else { - require_Equal(t, wca.Config.Name, "consumer") + require_Equal(t, nca.Config.Name, "consumer") } } @@ -9754,14 +9762,18 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 2) for _, wca := range wsas[0].Consumers { - if wca.Config.Name == "DowngradeConsumerTest" { - require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nca := &consumerAssignment{ConfigJSON: wca.ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + if nca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) } else { - require_Equal(t, wca.Config.Name, "consumer") + require_Equal(t, nca.Config.Name, "consumer") } } @@ -9771,10 +9783,14 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 1) - require_Equal(t, wsas[0].Consumers[0].Config.Name, "consumer") + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, nca.Config.Name, "consumer") } func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { @@ -9892,7 +9908,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas := getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") // Update a stream to be unsupported. sjs.mu.Lock() @@ -9913,7 +9931,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") // Deleting a stream should always work, even if it is unsupported. require_NoError(t, js.DeleteStream("DowngradeStreamTest")) @@ -9927,8 +9947,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { sjs.mu.Lock() ccfg := &ConsumerConfig{ - Name: "DowngradeConsumerTest", - Replicas: 3, + Name: "DowngradeConsumerTest", + Replicas: 3, + MaxWaiting: JSWaitQueueDefaultMax, } rg = cc.createGroupForConsumer(ccfg, sa) ca := &consumerAssignment{ @@ -9974,7 +9995,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") require_Len(t, len(wsas[0].Consumers), 1) // Update a consumer to be unsupported. @@ -9996,10 +10019,99 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 1) - require_Equal(t, wsas[0].Consumers[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) +} + +func TestJetStreamClusterOfflineStreamAndConsumerUpdate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "DowngradeTest", Replicas: 3}) + require_NoError(t, err) + _, err = js.AddConsumer("DowngradeTest", &nats.ConsumerConfig{Durable: "D", Replicas: 3}) + require_NoError(t, err) + c.waitOnAllCurrent() + + ml := c.leader() + require_NotNil(t, ml) + sjs := ml.getJetStream() + require_NotNil(t, sjs) + + var sa *streamAssignment + var ca *consumerAssignment + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + sjs.mu.Lock() + defer sjs.mu.Unlock() + sa = sjs.streamAssignment(globalAccountName, "DowngradeTest") + if sa == nil { + return errors.New("stream assignment missing") + } + ca = sjs.consumerAssignment(globalAccountName, "DowngradeTest", "D") + if ca == nil { + return errors.New("consumer assignment missing") + } + return nil + }) + + getValidMetaSnapshot := func() (wsas []writeableStreamAssignment) { + t.Helper() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, len(snap) > 0) + dec, err := s2.Decode(nil, snap) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(dec, &wsas)) + return wsas + } + + wsas := getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 1) + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, wsas[0].Consumers[0].Name, "D") + require_Equal(t, nca.Config.Metadata["_nats.req.level"], "0") + + // Update a consumer to be unsupported. + sjs.mu.Lock() + ca.Config.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = sjs.cluster.meta.Propose(encodeAddConsumerAssignment(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + + // Update the stream to be unsupported. + sjs.mu.Lock() + sa.Config.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = sjs.cluster.meta.Propose(encodeUpdateStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + require_Len(t, len(wsas[0].Consumers), 1) + nca = &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, wsas[0].Consumers[0].Name, "D") + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) } func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { @@ -10007,31 +10119,28 @@ func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { defer s.Shutdown() unsupportedJson := []byte("{\"unknown\": true}") + unsupportedStreamJson := []byte(fmt.Sprintf("{\"stream\":%s}", unsupportedJson)) + unsupportedConsumerJson := []byte(fmt.Sprintf("{\"consumer\":%s}", unsupportedJson)) - sa, err := decodeStreamAssignment(s, unsupportedJson) + sa, err := decodeStreamAssignment(s, unsupportedStreamJson) require_NoError(t, err) - require_True(t, bytes.Equal(sa.unsupported.json, unsupportedJson)) + require_True(t, bytes.Equal(sa.ConfigJSON, unsupportedJson)) + require_True(t, sa.unsupported != nil) - ca, err := decodeConsumerAssignment(unsupportedJson) + ca, err := decodeConsumerAssignment(unsupportedConsumerJson) require_NoError(t, err) - require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) + require_True(t, bytes.Equal(ca.ConfigJSON, unsupportedJson)) + require_True(t, ca.unsupported != nil) var bb bytes.Buffer s2e := s2.NewWriter(&bb) - _, err = s2e.Write(unsupportedJson) + _, err = s2e.Write(unsupportedConsumerJson) require_NoError(t, err) require_NoError(t, s2e.Close()) ca, err = decodeConsumerAssignmentCompressed(bb.Bytes()) require_NoError(t, err) - require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) - - var wsa writeableStreamAssignment - require_NoError(t, wsa.UnmarshalJSON(unsupportedJson)) - require_True(t, bytes.Equal(wsa.unsupportedJson, unsupportedJson)) - - var wca writeableConsumerAssignment - require_NoError(t, wca.UnmarshalJSON(unsupportedJson)) - require_True(t, bytes.Equal(wca.unsupportedJson, unsupportedJson)) + require_True(t, bytes.Equal(ca.ConfigJSON, unsupportedJson)) + require_True(t, ca.unsupported != nil) } func TestJetStreamClusterStreamMonitorShutdownWithoutRaftNode(t *testing.T) {