From 3228596061a57148749f5cc5aebc4537577d4340 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 24 Sep 2025 11:53:42 +0200 Subject: [PATCH 1/2] [IMPROVED] Meta snapshot performance without offline assets Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 127 +++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 45 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 032abd402d7..66b62e0553a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -151,10 +151,10 @@ type streamAssignment struct { reassigning bool // i.e. due to placement issues, lack of resources, etc. resetting bool // i.e. there was an error, and we're stopping and starting the stream err error - unsupported *unsupportedStreamAssignment + unsupported *unsupportedStreamAssignmentState } -type unsupportedStreamAssignment struct { +type unsupportedStreamAssignmentState struct { json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. reason string info StreamInfo @@ -162,14 +162,14 @@ type unsupportedStreamAssignment struct { infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignment { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignmentState { reason := "stopped" if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel) } } - return &unsupportedStreamAssignment{ + return &unsupportedStreamAssignmentState{ json: json, reason: reason, info: StreamInfo{ @@ -181,7 +181,7 @@ func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte } } -func (usa *unsupportedStreamAssignment) setupInfoSub(s *Server, sa *streamAssignment) { +func (usa *unsupportedStreamAssignmentState) setupInfoSub(s *Server, sa *streamAssignment) { if usa.infoSub != nil { return } @@ -196,13 +196,13 @@ func (usa *unsupportedStreamAssignment) setupInfoSub(s *Server, sa *streamAssign usa.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, ic, usa.handleClusterStreamInfoRequest) } -func (usa *unsupportedStreamAssignment) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { +func (usa *unsupportedStreamAssignmentState) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { s, acc := c.srv, c.acc info := streamInfoClusterResponse{OfflineReason: usa.reason, StreamInfo: usa.info} s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay) } -func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) { +func (usa *unsupportedStreamAssignmentState) closeInfoSub(s *Server) { if usa.infoSub != nil { s.sysUnsubscribe(usa.infoSub) usa.infoSub = nil @@ -230,10 +230,10 @@ type consumerAssignment struct { pending bool deleted bool err error - unsupported *unsupportedConsumerAssignment + unsupported *unsupportedConsumerAssignmentState } -type unsupportedConsumerAssignment struct { +type unsupportedConsumerAssignmentState struct { json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. reason string info ConsumerInfo @@ -241,14 +241,14 @@ type unsupportedConsumerAssignment struct { infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignment { +func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignmentState { reason := "stopped" if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel) } } - return &unsupportedConsumerAssignment{ + return &unsupportedConsumerAssignmentState{ json: json, reason: reason, info: ConsumerInfo{ @@ -261,7 +261,7 @@ func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsu } } -func (uca *unsupportedConsumerAssignment) setupInfoSub(s *Server, ca *consumerAssignment) { +func (uca *unsupportedConsumerAssignmentState) setupInfoSub(s *Server, ca *consumerAssignment) { if uca.infoSub != nil { return } @@ -276,13 +276,13 @@ func (uca *unsupportedConsumerAssignment) setupInfoSub(s *Server, ca *consumerAs uca.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, ic, uca.handleClusterConsumerInfoRequest) } -func (uca *unsupportedConsumerAssignment) handleClusterConsumerInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { +func (uca *unsupportedConsumerAssignmentState) handleClusterConsumerInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { s, acc := c.srv, c.acc info := consumerInfoClusterResponse{OfflineReason: uca.reason, ConsumerInfo: uca.info} s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay) } -func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) { +func (uca *unsupportedConsumerAssignmentState) closeInfoSub(s *Server) { if uca.infoSub != nil { s.sysUnsubscribe(uca.infoSub) uca.infoSub = nil @@ -294,7 +294,7 @@ func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) { } type writeableConsumerAssignment struct { - consumerAssignment + *consumerAssignment // Internal unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. } @@ -318,7 +318,7 @@ func (wca *writeableConsumerAssignment) UnmarshalJSON(data []byte) error { return err } } - wca.consumerAssignment = ca + wca.consumerAssignment = &ca if unsupported || (wca.Config != nil && !supportsRequiredApiLevel(wca.Config.Metadata)) { wca.unsupportedJson = data } @@ -1542,40 +1542,49 @@ func (js *jetStream) checkClusterSize() { // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { - backingStreamAssignment + unsupportedStreamAssignment // Internal unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. } -type backingStreamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` +type unsupportedStreamAssignment struct { + backingStreamAssignment Consumers []*writeableConsumerAssignment } +type supportedStreamAssignment struct { + backingStreamAssignment + Consumers []*consumerAssignment +} + +type backingStreamAssignment struct { + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Config *StreamConfig `json:"stream"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` +} + func (wsa *writeableStreamAssignment) MarshalJSON() ([]byte, error) { if wsa.unsupportedJson != nil { return wsa.unsupportedJson, nil } - return json.Marshal(wsa.backingStreamAssignment) + return json.Marshal(wsa.unsupportedStreamAssignment) } func (wsa *writeableStreamAssignment) UnmarshalJSON(data []byte) error { var unsupported bool - var bsa backingStreamAssignment + var usa unsupportedStreamAssignment decoder := json.NewDecoder(bytes.NewReader(data)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&bsa); err != nil { + if err := decoder.Decode(&usa); err != nil { unsupported = true - bsa = backingStreamAssignment{} - if err = json.Unmarshal(data, &bsa); err != nil { + usa = unsupportedStreamAssignment{} + if err = json.Unmarshal(data, &usa); err != nil { return err } } - wsa.backingStreamAssignment = bsa + wsa.unsupportedStreamAssignment = usa if unsupported || (wsa.Config != nil && !supportsRequiredApiLevel(wsa.Config.Metadata)) { wsa.unsupportedJson = data } @@ -1601,23 +1610,25 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { for _, asa := range cc.streams { nsa += len(asa) } - streams := make([]writeableStreamAssignment, 0, nsa) + streams := make([]supportedStreamAssignment, 0, nsa) + var unsupportedStreams []writeableStreamAssignment for _, asa := range cc.streams { for _, sa := range asa { if sa.unsupported != nil && sa.unsupported.json != nil { - streams = append(streams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json}) + unsupportedStreams = append(unsupportedStreams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json}) continue } - wsa := writeableStreamAssignment{ + wsa := supportedStreamAssignment{ backingStreamAssignment: backingStreamAssignment{ - Client: sa.Client.forAssignmentSnap(), - Created: sa.Created, - Config: sa.Config, - Group: sa.Group, - Sync: sa.Sync, - Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), + Client: sa.Client.forAssignmentSnap(), + Created: sa.Created, + Config: sa.Config, + Group: sa.Group, + Sync: sa.Sync, }, + Consumers: make([]*consumerAssignment, 0, len(sa.consumers)), } + var unsupportedConsumers []*writeableConsumerAssignment for _, ca := range sa.consumers { // Skip if the consumer is pending, we can't include it in our snapshot. // If the proposal fails after we marked it pending, it would result in a ghost consumer. @@ -1625,7 +1636,7 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { continue } if ca.unsupported != nil && ca.unsupported.json != nil { - wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json}) + unsupportedConsumers = append(unsupportedConsumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json}) nca++ continue } @@ -1633,14 +1644,26 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { cca.Stream = wsa.Config.Name // Needed for safe roll-backs. cca.Client = cca.Client.forAssignmentSnap() cca.Subject, cca.Reply = _EMPTY_, _EMPTY_ - wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{consumerAssignment: cca}) + wsa.Consumers = append(wsa.Consumers, &cca) nca++ } + if len(unsupportedConsumers) > 0 { + for _, ca := range wsa.Consumers { + unsupportedConsumers = append(unsupportedConsumers, &writeableConsumerAssignment{consumerAssignment: ca}) + } + unsupportedStreams = append(unsupportedStreams, writeableStreamAssignment{ + unsupportedStreamAssignment: unsupportedStreamAssignment{ + backingStreamAssignment: wsa.backingStreamAssignment, + Consumers: unsupportedConsumers, + }}, + ) + continue + } streams = append(streams, wsa) } } - if len(streams) == 0 { + if len(streams) == 0 && len(unsupportedStreams) == 0 { js.mu.RUnlock() return nil, nil } @@ -1648,15 +1671,29 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { // Track how long it took to marshal the JSON mstart := time.Now() b, err := json.Marshal(streams) - mend := time.Since(mstart) - - js.mu.RUnlock() - // Must not be possible for a JSON marshaling error to result // in an empty snapshot. if err != nil { + js.mu.RUnlock() return nil, err } + // Add unsupported streams separately. + if len(unsupportedStreams) > 0 { + bu, err := json.Marshal(unsupportedStreams) + if err != nil { + js.mu.RUnlock() + return nil, err + } + b = b[:len(b)-1] // Remove the last ']' + if len(streams) > 0 { + b = append(b, ',') + } + b = append(b, bu[1:]...) // Remove the first '[' + } + + mend := time.Since(mstart) + + js.mu.RUnlock() // Track how long it took to compress the JSON cstart := time.Now() From 4fb9b1d50e1d9ad1d0518de8381c0953fe9748e2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 24 Sep 2025 16:35:16 +0200 Subject: [PATCH 2/2] [FIXED] Lost consumers on unsupported offline stream Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 338 +++++++++++------------------ server/jetstream_cluster_1_test.go | 187 ++++++++++++---- 2 files changed, 271 insertions(+), 254 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 66b62e0553a..0ea01db0865 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "math" "math/rand" "os" @@ -136,14 +135,15 @@ type raftGroup struct { // streamAssignment is what the meta controller uses to assign streams to peers. type streamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` - Subject string `json:"subject,omitempty"` - Reply string `json:"reply,omitempty"` - Restore *StreamState `json:"restore_state,omitempty"` + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + ConfigJSON json.RawMessage `json:"stream"` + Config *StreamConfig `json:"-"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` + Subject string `json:"subject,omitempty"` + Reply string `json:"reply,omitempty"` + Restore *StreamState `json:"restore_state,omitempty"` // Internal consumers map[string]*consumerAssignment responded bool @@ -151,26 +151,24 @@ type streamAssignment struct { reassigning bool // i.e. due to placement issues, lack of resources, etc. resetting bool // i.e. there was an error, and we're stopping and starting the stream err error - unsupported *unsupportedStreamAssignmentState + unsupported *unsupportedStreamAssignment } -type unsupportedStreamAssignmentState struct { - json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. +type unsupportedStreamAssignment struct { reason string info StreamInfo sysc *client infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignmentState { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment { reason := "stopped" if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel) } } - return &unsupportedStreamAssignmentState{ - json: json, + return &unsupportedStreamAssignment{ reason: reason, info: StreamInfo{ Created: sa.Created, @@ -181,7 +179,7 @@ func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte } } -func (usa *unsupportedStreamAssignmentState) setupInfoSub(s *Server, sa *streamAssignment) { +func (usa *unsupportedStreamAssignment) setupInfoSub(s *Server, sa *streamAssignment) { if usa.infoSub != nil { return } @@ -196,13 +194,13 @@ func (usa *unsupportedStreamAssignmentState) setupInfoSub(s *Server, sa *streamA usa.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, ic, usa.handleClusterStreamInfoRequest) } -func (usa *unsupportedStreamAssignmentState) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { +func (usa *unsupportedStreamAssignment) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { s, acc := c.srv, c.acc info := streamInfoClusterResponse{OfflineReason: usa.reason, StreamInfo: usa.info} s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay) } -func (usa *unsupportedStreamAssignmentState) closeInfoSub(s *Server) { +func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) { if usa.infoSub != nil { s.sysUnsubscribe(usa.infoSub) usa.infoSub = nil @@ -215,41 +213,40 @@ func (usa *unsupportedStreamAssignmentState) closeInfoSub(s *Server) { // consumerAssignment is what the meta controller uses to assign consumers to streams. type consumerAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Name string `json:"name"` - Stream string `json:"stream"` - Config *ConsumerConfig `json:"consumer"` - Group *raftGroup `json:"group"` - Subject string `json:"subject,omitempty"` - Reply string `json:"reply,omitempty"` - State *ConsumerState `json:"state,omitempty"` + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Name string `json:"name"` + Stream string `json:"stream"` + ConfigJSON json.RawMessage `json:"consumer"` + Config *ConsumerConfig `json:"-"` + Group *raftGroup `json:"group"` + Subject string `json:"subject,omitempty"` + Reply string `json:"reply,omitempty"` + State *ConsumerState `json:"state,omitempty"` // Internal responded bool recovering bool pending bool deleted bool err error - unsupported *unsupportedConsumerAssignmentState + unsupported *unsupportedConsumerAssignment } -type unsupportedConsumerAssignmentState struct { - json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. +type unsupportedConsumerAssignment struct { reason string info ConsumerInfo sysc *client infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignmentState { +func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment { reason := "stopped" if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel) } } - return &unsupportedConsumerAssignmentState{ - json: json, + return &unsupportedConsumerAssignment{ reason: reason, info: ConsumerInfo{ Stream: ca.Stream, @@ -261,7 +258,7 @@ func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsu } } -func (uca *unsupportedConsumerAssignmentState) setupInfoSub(s *Server, ca *consumerAssignment) { +func (uca *unsupportedConsumerAssignment) setupInfoSub(s *Server, ca *consumerAssignment) { if uca.infoSub != nil { return } @@ -276,13 +273,13 @@ func (uca *unsupportedConsumerAssignmentState) setupInfoSub(s *Server, ca *consu uca.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, ic, uca.handleClusterConsumerInfoRequest) } -func (uca *unsupportedConsumerAssignmentState) handleClusterConsumerInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { +func (uca *unsupportedConsumerAssignment) handleClusterConsumerInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { s, acc := c.srv, c.acc info := consumerInfoClusterResponse{OfflineReason: uca.reason, ConsumerInfo: uca.info} s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay) } -func (uca *unsupportedConsumerAssignmentState) closeInfoSub(s *Server) { +func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) { if uca.infoSub != nil { s.sysUnsubscribe(uca.infoSub) uca.infoSub = nil @@ -294,35 +291,13 @@ func (uca *unsupportedConsumerAssignmentState) closeInfoSub(s *Server) { } type writeableConsumerAssignment struct { - *consumerAssignment - // Internal - unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. -} - -func (wca *writeableConsumerAssignment) MarshalJSON() ([]byte, error) { - if wca.unsupportedJson != nil { - return wca.unsupportedJson, nil - } - return json.Marshal(wca.consumerAssignment) -} - -func (wca *writeableConsumerAssignment) UnmarshalJSON(data []byte) error { - var unsupported bool - var ca consumerAssignment - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&ca); err != nil { - unsupported = true - ca = consumerAssignment{} - if err = json.Unmarshal(data, &ca); err != nil { - return err - } - } - wca.consumerAssignment = &ca - if unsupported || (wca.Config != nil && !supportsRequiredApiLevel(wca.Config.Metadata)) { - wca.unsupportedJson = data - } - return nil + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Name string `json:"name"` + Stream string `json:"stream"` + ConfigJSON json.RawMessage `json:"consumer"` + Group *raftGroup `json:"group"` + State *ConsumerState `json:"state,omitempty"` } // streamPurge is what the stream leader will replicate when purging a stream. @@ -1542,53 +1517,12 @@ func (js *jetStream) checkClusterSize() { // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { - unsupportedStreamAssignment - // Internal - unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. -} - -type unsupportedStreamAssignment struct { - backingStreamAssignment - Consumers []*writeableConsumerAssignment -} - -type supportedStreamAssignment struct { - backingStreamAssignment - Consumers []*consumerAssignment -} - -type backingStreamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` -} - -func (wsa *writeableStreamAssignment) MarshalJSON() ([]byte, error) { - if wsa.unsupportedJson != nil { - return wsa.unsupportedJson, nil - } - return json.Marshal(wsa.unsupportedStreamAssignment) -} - -func (wsa *writeableStreamAssignment) UnmarshalJSON(data []byte) error { - var unsupported bool - var usa unsupportedStreamAssignment - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&usa); err != nil { - unsupported = true - usa = unsupportedStreamAssignment{} - if err = json.Unmarshal(data, &usa); err != nil { - return err - } - } - wsa.unsupportedStreamAssignment = usa - if unsupported || (wsa.Config != nil && !supportsRequiredApiLevel(wsa.Config.Metadata)) { - wsa.unsupportedJson = data - } - return nil + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + ConfigJSON json.RawMessage `json:"stream"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` + Consumers []*writeableConsumerAssignment } func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConfig, bool) { @@ -1610,60 +1544,40 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { for _, asa := range cc.streams { nsa += len(asa) } - streams := make([]supportedStreamAssignment, 0, nsa) - var unsupportedStreams []writeableStreamAssignment + streams := make([]writeableStreamAssignment, 0, nsa) for _, asa := range cc.streams { for _, sa := range asa { - if sa.unsupported != nil && sa.unsupported.json != nil { - unsupportedStreams = append(unsupportedStreams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json}) - continue - } - wsa := supportedStreamAssignment{ - backingStreamAssignment: backingStreamAssignment{ - Client: sa.Client.forAssignmentSnap(), - Created: sa.Created, - Config: sa.Config, - Group: sa.Group, - Sync: sa.Sync, - }, - Consumers: make([]*consumerAssignment, 0, len(sa.consumers)), + wsa := writeableStreamAssignment{ + Client: sa.Client.forAssignmentSnap(), + Created: sa.Created, + ConfigJSON: sa.ConfigJSON, + Group: sa.Group, + Sync: sa.Sync, + Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), } - var unsupportedConsumers []*writeableConsumerAssignment for _, ca := range sa.consumers { // Skip if the consumer is pending, we can't include it in our snapshot. // If the proposal fails after we marked it pending, it would result in a ghost consumer. if ca.pending { continue } - if ca.unsupported != nil && ca.unsupported.json != nil { - unsupportedConsumers = append(unsupportedConsumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json}) - nca++ - continue + wca := writeableConsumerAssignment{ + Client: ca.Client.forAssignmentSnap(), + Created: ca.Created, + Name: ca.Name, + Stream: ca.Stream, + ConfigJSON: ca.ConfigJSON, + Group: ca.Group, + State: ca.State, } - cca := *ca - cca.Stream = wsa.Config.Name // Needed for safe roll-backs. - cca.Client = cca.Client.forAssignmentSnap() - cca.Subject, cca.Reply = _EMPTY_, _EMPTY_ - wsa.Consumers = append(wsa.Consumers, &cca) + wsa.Consumers = append(wsa.Consumers, &wca) nca++ } - if len(unsupportedConsumers) > 0 { - for _, ca := range wsa.Consumers { - unsupportedConsumers = append(unsupportedConsumers, &writeableConsumerAssignment{consumerAssignment: ca}) - } - unsupportedStreams = append(unsupportedStreams, writeableStreamAssignment{ - unsupportedStreamAssignment: unsupportedStreamAssignment{ - backingStreamAssignment: wsa.backingStreamAssignment, - Consumers: unsupportedConsumers, - }}, - ) - continue - } streams = append(streams, wsa) } } - if len(streams) == 0 && len(unsupportedStreams) == 0 { + if len(streams) == 0 { js.mu.RUnlock() return nil, nil } @@ -1671,29 +1585,15 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { // Track how long it took to marshal the JSON mstart := time.Now() b, err := json.Marshal(streams) + mend := time.Since(mstart) + + js.mu.RUnlock() + // Must not be possible for a JSON marshaling error to result // in an empty snapshot. if err != nil { - js.mu.RUnlock() return nil, err } - // Add unsupported streams separately. - if len(unsupportedStreams) > 0 { - bu, err := json.Marshal(unsupportedStreams) - if err != nil { - js.mu.RUnlock() - return nil, err - } - b = b[:len(b)-1] // Remove the last ']' - if len(streams) > 0 { - b = append(b, ',') - } - b = append(b, bu[1:]...) // Remove the first '[' - } - - mend := time.Since(mstart) - - js.mu.RUnlock() // Track how long it took to compress the JSON cstart := time.Now() @@ -1722,30 +1622,25 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Build our new version here outside of js. streams := make(map[string]map[string]*streamAssignment) for _, wsa := range wsas { - fixCfgMirrorWithDedupWindow(wsa.Config) as := streams[wsa.Client.serviceAccount()] if as == nil { as = make(map[string]*streamAssignment) streams[wsa.Client.serviceAccount()] = as } - sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, Config: wsa.Config, Group: wsa.Group, Sync: wsa.Sync} - if wsa.unsupportedJson != nil { - sa.unsupported = newUnsupportedStreamAssignment(js.srv, sa, wsa.unsupportedJson) - } + sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, ConfigJSON: wsa.ConfigJSON, Group: wsa.Group, Sync: wsa.Sync} + decodeStreamAssignmentConfig(js.srv, sa) if len(wsa.Consumers) > 0 { sa.consumers = make(map[string]*consumerAssignment) for _, wca := range wsa.Consumers { if wca.Stream == _EMPTY_ { wca.Stream = sa.Config.Name // Rehydrate from the stream name. } - ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, Config: wca.Config, Group: wca.Group, Subject: wca.Subject, Reply: wca.Reply, State: wca.State} - if wca.unsupportedJson != nil { - ca.unsupported = newUnsupportedConsumerAssignment(ca, wca.unsupportedJson) - } + ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, ConfigJSON: wca.ConfigJSON, Group: wca.Group, State: wca.State} + decodeConsumerAssignmentConfig(ca) sa.consumers[ca.Name] = ca } } - as[wsa.Config.Name] = sa + as[sa.Config.Name] = sa } js.mu.Lock() @@ -4916,7 +4811,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Mark stream as unsupported as well if sa.unsupported == nil { - sa.unsupported = newUnsupportedStreamAssignment(s, sa, nil) + sa.unsupported = newUnsupportedStreamAssignment(s, sa) } sa.unsupported.setupInfoSub(s, sa) js.mu.Unlock() @@ -8063,6 +7958,7 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset func encodeAddStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8072,6 +7968,7 @@ func encodeAddStreamAssignment(sa *streamAssignment) []byte { func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8081,6 +7978,7 @@ func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8088,23 +7986,35 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { } func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { - var unsupported bool var sa streamAssignment - decoder := json.NewDecoder(bytes.NewReader(buf)) + if err := json.Unmarshal(buf, &sa); err != nil { + return nil, err + } + if err := decodeStreamAssignmentConfig(s, &sa); err != nil { + return nil, err + } + return &sa, nil +} + +func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error { + var unsupported bool + var cfg StreamConfig + decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&sa); err != nil { + if err := decoder.Decode(&cfg); err != nil { unsupported = true - sa = streamAssignment{} - if err = json.Unmarshal(buf, &sa); err != nil { - return nil, err + cfg = StreamConfig{} + if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil { + return err } } + sa.Config = &cfg fixCfgMirrorWithDedupWindow(sa.Config) if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { - sa.unsupported = newUnsupportedStreamAssignment(s, &sa, copyBytes(buf)) + sa.unsupported = newUnsupportedStreamAssignment(s, sa) } - return &sa, nil + return nil } func encodeDeleteRange(dr *DeleteRange) []byte { @@ -8522,6 +8432,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) json.NewEncoder(&bb).Encode(cca) @@ -8531,6 +8442,7 @@ func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) json.NewEncoder(&bb).Encode(cca) @@ -8538,27 +8450,39 @@ func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { } func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { - var unsupported bool var ca consumerAssignment - decoder := json.NewDecoder(bytes.NewReader(buf)) + if err := json.Unmarshal(buf, &ca); err != nil { + return nil, err + } + if err := decodeConsumerAssignmentConfig(&ca); err != nil { + return nil, err + } + return &ca, nil +} + +func decodeConsumerAssignmentConfig(ca *consumerAssignment) error { + var unsupported bool + var cfg ConsumerConfig + decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&ca); err != nil { + if err := decoder.Decode(&cfg); err != nil { unsupported = true - ca = consumerAssignment{} - if err = json.Unmarshal(buf, &ca); err != nil { - return nil, err + cfg = ConsumerConfig{} + if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil { + return err } } - + ca.Config = &cfg if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(buf)) + ca.unsupported = newUnsupportedConsumerAssignment(ca) } - return &ca, nil + return nil } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) s2e := s2.NewWriter(&bb) @@ -8568,32 +8492,16 @@ func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { - var unsupported bool var ca consumerAssignment bb := bytes.NewBuffer(buf) s2d := s2.NewReader(bb) decoder := json.NewDecoder(s2d) - decoder.DisallowUnknownFields() if err := decoder.Decode(&ca); err != nil { - unsupported = true - ca = consumerAssignment{} - bb = bytes.NewBuffer(buf) - s2d = s2.NewReader(bb) - if err = json.NewDecoder(s2d).Decode(&ca); err != nil { - return nil, err - } + return nil, err } - - if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - bb = bytes.NewBuffer(buf) - s2d = s2.NewReader(bb) - dec, err := io.ReadAll(s2d) - if err != nil { - return nil, err - } - ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(dec)) + if err := decodeConsumerAssignmentConfig(&ca); err != nil { + return nil, err } - return &ca, nil } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index d1a5c4248a8..512296a9d6a 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9617,8 +9617,10 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas := getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) // Update a stream that's unsupported. sjs.mu.Lock() @@ -9639,8 +9641,10 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) // Deleting a stream should always work, even if it is unsupported. require_NoError(t, js.DeleteStream("DowngradeStreamTest")) @@ -9724,14 +9728,18 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 2) for _, wca := range wsas[0].Consumers { - if wca.Config.Name == "DowngradeConsumerTest" { - require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + nca := &consumerAssignment{ConfigJSON: wca.ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + if nca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) } else { - require_Equal(t, wca.Config.Name, "consumer") + require_Equal(t, nca.Config.Name, "consumer") } } @@ -9754,14 +9762,18 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 2) for _, wca := range wsas[0].Consumers { - if wca.Config.Name == "DowngradeConsumerTest" { - require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nca := &consumerAssignment{ConfigJSON: wca.ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + if nca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) } else { - require_Equal(t, wca.Config.Name, "consumer") + require_Equal(t, nca.Config.Name, "consumer") } } @@ -9771,10 +9783,14 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 1) - require_Equal(t, wsas[0].Consumers[0].Config.Name, "consumer") + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, nca.Config.Name, "consumer") } func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { @@ -9892,7 +9908,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas := getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") // Update a stream to be unsupported. sjs.mu.Lock() @@ -9913,7 +9931,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") // Deleting a stream should always work, even if it is unsupported. require_NoError(t, js.DeleteStream("DowngradeStreamTest")) @@ -9927,8 +9947,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { sjs.mu.Lock() ccfg := &ConsumerConfig{ - Name: "DowngradeConsumerTest", - Replicas: 3, + Name: "DowngradeConsumerTest", + Replicas: 3, + MaxWaiting: JSWaitQueueDefaultMax, } rg = cc.createGroupForConsumer(ccfg, sa) ca := &consumerAssignment{ @@ -9974,7 +9995,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") require_Len(t, len(wsas[0].Consumers), 1) // Update a consumer to be unsupported. @@ -9996,10 +10019,99 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 1) - require_Equal(t, wsas[0].Consumers[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) +} + +func TestJetStreamClusterOfflineStreamAndConsumerUpdate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "DowngradeTest", Replicas: 3}) + require_NoError(t, err) + _, err = js.AddConsumer("DowngradeTest", &nats.ConsumerConfig{Durable: "D", Replicas: 3}) + require_NoError(t, err) + c.waitOnAllCurrent() + + ml := c.leader() + require_NotNil(t, ml) + sjs := ml.getJetStream() + require_NotNil(t, sjs) + + var sa *streamAssignment + var ca *consumerAssignment + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + sjs.mu.Lock() + defer sjs.mu.Unlock() + sa = sjs.streamAssignment(globalAccountName, "DowngradeTest") + if sa == nil { + return errors.New("stream assignment missing") + } + ca = sjs.consumerAssignment(globalAccountName, "DowngradeTest", "D") + if ca == nil { + return errors.New("consumer assignment missing") + } + return nil + }) + + getValidMetaSnapshot := func() (wsas []writeableStreamAssignment) { + t.Helper() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, len(snap) > 0) + dec, err := s2.Decode(nil, snap) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(dec, &wsas)) + return wsas + } + + wsas := getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 1) + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, wsas[0].Consumers[0].Name, "D") + require_Equal(t, nca.Config.Metadata["_nats.req.level"], "0") + + // Update a consumer to be unsupported. + sjs.mu.Lock() + ca.Config.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = sjs.cluster.meta.Propose(encodeAddConsumerAssignment(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + + // Update the stream to be unsupported. + sjs.mu.Lock() + sa.Config.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = sjs.cluster.meta.Propose(encodeUpdateStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + require_Len(t, len(wsas[0].Consumers), 1) + nca = &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, wsas[0].Consumers[0].Name, "D") + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) } func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { @@ -10007,31 +10119,28 @@ func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { defer s.Shutdown() unsupportedJson := []byte("{\"unknown\": true}") + unsupportedStreamJson := []byte(fmt.Sprintf("{\"stream\":%s}", unsupportedJson)) + unsupportedConsumerJson := []byte(fmt.Sprintf("{\"consumer\":%s}", unsupportedJson)) - sa, err := decodeStreamAssignment(s, unsupportedJson) + sa, err := decodeStreamAssignment(s, unsupportedStreamJson) require_NoError(t, err) - require_True(t, bytes.Equal(sa.unsupported.json, unsupportedJson)) + require_True(t, bytes.Equal(sa.ConfigJSON, unsupportedJson)) + require_True(t, sa.unsupported != nil) - ca, err := decodeConsumerAssignment(unsupportedJson) + ca, err := decodeConsumerAssignment(unsupportedConsumerJson) require_NoError(t, err) - require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) + require_True(t, bytes.Equal(ca.ConfigJSON, unsupportedJson)) + require_True(t, ca.unsupported != nil) var bb bytes.Buffer s2e := s2.NewWriter(&bb) - _, err = s2e.Write(unsupportedJson) + _, err = s2e.Write(unsupportedConsumerJson) require_NoError(t, err) require_NoError(t, s2e.Close()) ca, err = decodeConsumerAssignmentCompressed(bb.Bytes()) require_NoError(t, err) - require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) - - var wsa writeableStreamAssignment - require_NoError(t, wsa.UnmarshalJSON(unsupportedJson)) - require_True(t, bytes.Equal(wsa.unsupportedJson, unsupportedJson)) - - var wca writeableConsumerAssignment - require_NoError(t, wca.UnmarshalJSON(unsupportedJson)) - require_True(t, bytes.Equal(wca.unsupportedJson, unsupportedJson)) + require_True(t, bytes.Equal(ca.ConfigJSON, unsupportedJson)) + require_True(t, ca.unsupported != nil) } func TestJetStreamClusterStreamMonitorShutdownWithoutRaftNode(t *testing.T) {