diff --git a/server/consumer.go b/server/consumer.go index 293e06923a9..1617a0dfda2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -21,6 +21,8 @@ import ( "fmt" "math" "math/rand" + "os" + "path/filepath" "reflect" "regexp" "slices" @@ -71,6 +73,13 @@ type ConsumerInfo struct { PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"` } +// consumerInfoClusterResponse is a response used in a cluster to communicate the consumer info +// back to the meta leader as part of a consumer list request. +type consumerInfoClusterResponse struct { + ConsumerInfo + OfflineReason string `json:"offline_reason,omitempty"` // Reporting when a consumer is offline. +} + type PriorityGroupState struct { Group string `json:"group"` PinnedClientID string `json:"pinned_client_id,omitempty"` @@ -510,6 +519,10 @@ type consumer struct { /// pinnedTtl is the remaining time before the current PinId expires. pinnedTtl *time.Timer pinnedTS time.Time + + // If standalone/single-server, the offline reason needs to be stored directly in the consumer. + // Otherwise, if clustered it will be part of the consumer assignment. + offlineReason string } // A single subject filter. @@ -5059,7 +5072,7 @@ func (o *consumer) setMaxPendingBytes(limit int) { // This does some quick sanity checks to see if we should re-calculate num pending. // Lock should be held. func (o *consumer) checkNumPending() uint64 { - if o.mset != nil { + if o.mset != nil && o.mset.store != nil { var state StreamState o.mset.store.FastState(&state) npc := o.numPending() @@ -6103,6 +6116,14 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { } else { err = store.Stop() } + } else if dflag { + // If there's no store (for example, when it's offline), manually delete the directories. + o.mu.RLock() + stream, consumer := o.stream, o.name + o.mu.RUnlock() + accDir := filepath.Join(js.config.StoreDir, a.GetName()) + consumersDir := filepath.Join(accDir, streamsDir, stream, consumerDir) + os.RemoveAll(filepath.Join(consumersDir, consumer)) } return err diff --git a/server/errors.json b/server/errors.json index 0e2b1ee149a..94d5cb6615a 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1918,5 +1918,25 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSStreamOfflineReasonErrF", + "code": 500, + "error_code": 10194, + "description": "stream is offline: {err}", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerOfflineReasonErrF", + "code": 500, + "error_code": 10195, + "description": "consumer is offline: {err}", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream.go b/server/jetstream.go index 76e5c308d50..21458ab1d16 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -14,6 +14,7 @@ package server import ( + "bytes" "crypto/hmac" "crypto/sha256" "encoding/binary" @@ -1333,8 +1334,54 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } var cfg FileStreamInfo - if err := json.Unmarshal(buf, &cfg); err != nil { - s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err) + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + strictErr := decoder.Decode(&cfg) + if strictErr != nil { + cfg = FileStreamInfo{} + if err := json.Unmarshal(buf, &cfg); err != nil { + s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err) + continue + } + } + if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { + var offlineReason string + if !supported { + apiLevel := getRequiredApiLevel(cfg.Metadata) + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel) + } else { + offlineReason = fmt.Sprintf("decoding error: %v", strictErr) + s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() + if singleServerMode { + // Fake a stream, so we can respond to API requests as single-server. + mset := &stream{ + acc: a, + jsa: jsa, + cfg: cfg.StreamConfig, + js: js, + srv: s, + stype: cfg.Storage, + consumers: make(map[string]*consumer), + active: false, + created: time.Now().UTC(), + offlineReason: offlineReason, + } + if !cfg.Created.IsZero() { + mset.created = cfg.Created + } + mset.closed.Store(true) + + jsa.mu.Lock() + jsa.streams[cfg.Name] = mset + jsa.mu.Unlock() + + // Now do the consumers. + odir := filepath.Join(sdir, fi.Name(), consumerDir) + consumers = append(consumers, &ce{mset, odir}) + } continue } @@ -1510,13 +1557,66 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } var cfg FileConsumerInfo - if err := json.Unmarshal(buf, &cfg); err != nil { - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + strictErr := decoder.Decode(&cfg) + if strictErr != nil { + cfg = FileConsumerInfo{} + if err := json.Unmarshal(buf, &cfg); err != nil { + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) + continue + } + } + if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { + var offlineReason string + if !supported { + apiLevel := getRequiredApiLevel(cfg.Metadata) + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel) + } else { + offlineReason = fmt.Sprintf("decoding error: %v", strictErr) + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() + if singleServerMode { + if !e.mset.closed.Load() { + s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) + e.mset.mu.Lock() + e.mset.offlineReason = "stopped" + e.mset.mu.Unlock() + e.mset.stop(false, false) + } + + // Fake a consumer, so we can respond to API requests as single-server. + o := &consumer{ + mset: e.mset, + js: s.getJetStream(), + acc: a, + srv: s, + cfg: cfg.ConsumerConfig, + active: false, + stream: e.mset.name(), + name: cfg.Name, + dseq: 1, + sseq: 1, + created: time.Now().UTC(), + closed: true, + offlineReason: offlineReason, + } + if !cfg.Created.IsZero() { + o.created = cfg.Created + } + + e.mset.mu.Lock() + e.mset.setConsumer(o) + e.mset.mu.Unlock() + } continue } + isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) if isEphemeral { - // This is an ephermal consumer and this could fail on restart until + // This is an ephemeral consumer and this could fail on restart until // the consumer can reconnect. We will create it as a durable and switch it. cfg.ConsumerConfig.Durable = ofi.Name() } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 05b88aa94b4..dbf0e6b14d7 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -499,8 +499,9 @@ type JSApiStreamListRequest struct { type JSApiStreamListResponse struct { ApiResponse ApiPaged - Streams []*StreamInfo `json:"streams"` - Missing []string `json:"missing,omitempty"` + Streams []*StreamInfo `json:"streams"` + Missing []string `json:"missing,omitempty"` + Offline map[string]string `json:"offline,omitempty"` } const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response" @@ -766,8 +767,9 @@ const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_ type JSApiConsumerListResponse struct { ApiResponse ApiPaged - Consumers []*ConsumerInfo `json:"consumers"` - Missing []string `json:"missing,omitempty"` + Consumers []*ConsumerInfo `json:"consumers"` + Missing []string `json:"missing,omitempty"` + Offline map[string]string `json:"offline,omitempty"` } const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response" @@ -1794,6 +1796,11 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if mset.offlineReason != _EMPTY_ { + resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + return + } // Update asset version metadata. setStaticStreamMetadata(&cfg) @@ -2035,7 +2042,17 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s offset = scnt } + var missingNames []string for _, mset := range msets[offset:] { + if mset.offlineReason != _EMPTY_ { + if resp.Offline == nil { + resp.Offline = make(map[string]string, 1) + } + resp.Offline[mset.getCfgName()] = mset.offlineReason + missingNames = append(missingNames, mset.getCfgName()) + continue + } + config := mset.config() resp.Streams = append(resp.Streams, &StreamInfo{ Created: mset.createdTime(), @@ -2053,6 +2070,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s resp.Total = scnt resp.Limit = JSApiListLimit resp.Offset = offset + resp.Missing = missingNames s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -2097,6 +2115,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s if sa != nil { clusterWideConsCount = len(sa.consumers) offline = s.allPeersOffline(sa.Group) + if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) { + // If we're a member for this stream, and it's not supported, report it as offline. + resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + js.mu.RUnlock() + return + } } js.mu.RUnlock() @@ -2202,6 +2227,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s } } + if mset.offlineReason != _EMPTY_ { + resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + return + } + config := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), @@ -3584,6 +3615,10 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if mset.offlineReason != _EMPTY_ { + // Just let the request time out. + return + } // Reject request if we can't guarantee the precondition of min last sequence. if req.MinLastSeq > 0 && mset.lastSeq() < req.MinLastSeq { @@ -3687,6 +3722,11 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if sa.unsupported != nil { + js.mu.RUnlock() + // Just let the request time out. + return + } ca, ok := sa.consumers[consumer] if !ok || ca == nil { @@ -3695,6 +3735,11 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if ca.unsupported != nil { + js.mu.RUnlock() + // Just let the request time out. + return + } js.mu.RUnlock() // Then check if we are the leader. @@ -3726,12 +3771,20 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if mset.offlineReason != _EMPTY_ { + // Just let the request time out. + return + } o := mset.lookupConsumer(consumer) if o == nil { resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if o.offlineReason != _EMPTY_ { + // Just let the request time out. + return + } var foundPriority bool for _, group := range o.config().PriorityGroups { @@ -4604,8 +4657,18 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if stream.offlineReason != _EMPTY_ { + resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + return + } if o := stream.lookupConsumer(consumerName); o != nil { + if o.offlineReason != _EMPTY_ { + resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + return + } // If the consumer already exists then don't allow updating the PauseUntil, just set // it back to whatever the current configured value is. o.mu.RLock() @@ -4855,7 +4918,16 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, offset = ocnt } + var missingNames []string for _, o := range obs[offset:] { + if o.offlineReason != _EMPTY_ { + if resp.Offline == nil { + resp.Offline = make(map[string]string, 1) + } + resp.Offline[o.name] = o.offlineReason + missingNames = append(missingNames, o.name) + continue + } if cinfo := o.info(); cinfo != nil { resp.Consumers = append(resp.Consumers, cinfo) } @@ -4866,6 +4938,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, resp.Total = ocnt resp.Limit = JSApiListLimit resp.Offset = offset + resp.Missing = missingNames s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -4922,6 +4995,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, offline = s.allPeersOffline(rg) isMember = rg.isMember(ourID) } + if ca.unsupported != nil && isMember { + // If we're a member for this consumer, and it's not supported, report it as offline. + resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + js.mu.RUnlock() + return + } } // Capture consumer leader here. isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName) @@ -5048,6 +5128,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, return } + if obs.offlineReason != _EMPTY_ { + resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason)) + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) + return + } + if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil { // This consumer returned nil which means it's closed. Respond with not found. resp.Error = NewJSConsumerNotFoundError() @@ -5199,6 +5285,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if sa.unsupported != nil { + js.mu.RUnlock() + // Just let the request time out. + return + } ca, ok := sa.consumers[consumer] if !ok || ca == nil { @@ -5207,6 +5298,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if ca.unsupported != nil { + js.mu.RUnlock() + // Just let the request time out. + return + } nca := *ca ncfg := *ca.Config @@ -5240,6 +5336,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if mset.offlineReason != _EMPTY_ { + // Just let the request time out. + return + } obs := mset.lookupConsumer(consumer) if obs == nil { @@ -5247,6 +5347,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + if obs.offlineReason != _EMPTY_ { + // Just let the request time out. + return + } ncfg := obs.cfg pauseUTC := req.PauseUntil.UTC() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index dfb9d375a48..852979e1d6d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "math" "math/rand" "os" @@ -147,6 +148,66 @@ type streamAssignment struct { reassigning bool // i.e. due to placement issues, lack of resources, etc. resetting bool // i.e. there was an error, and we're stopping and starting the stream err error + unsupported *unsupportedStreamAssignment +} + +type unsupportedStreamAssignment struct { + json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. + reason string + info StreamInfo + sysc *client + infoSub *subscription +} + +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignment { + reason := "stopped" + if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { + if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { + reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel) + } + } + return &unsupportedStreamAssignment{ + json: json, + reason: reason, + info: StreamInfo{ + Created: sa.Created, + Config: *setDynamicStreamMetadata(sa.Config), + Domain: s.getOpts().JetStreamDomain, + TimeStamp: time.Now().UTC(), + }, + } +} + +func (usa *unsupportedStreamAssignment) setupInfoSub(s *Server, sa *streamAssignment) { + if usa.infoSub != nil { + return + } + + // Bind to the system account. + ic := s.createInternalJetStreamClient() + ic.registerWithAccount(s.SystemAccount()) + usa.sysc = ic + + // Note below the way we subscribe here is so that we can send requests to ourselves. + isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) + usa.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, ic, usa.handleClusterStreamInfoRequest) +} + +func (usa *unsupportedStreamAssignment) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { + s, acc := c.srv, c.acc + info := streamInfoClusterResponse{OfflineReason: usa.reason, StreamInfo: usa.info} + s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay) +} + +func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) { + if usa.infoSub != nil { + s.sysUnsubscribe(usa.infoSub) + usa.infoSub = nil + } + if usa.sysc != nil { + usa.sysc.closeConnection(ClientClosed) + usa.sysc = nil + } } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -161,11 +222,104 @@ type consumerAssignment struct { Reply string `json:"reply,omitempty"` State *ConsumerState `json:"state,omitempty"` // Internal - responded bool - recovering bool - pending bool - deleted bool - err error + responded bool + recovering bool + pending bool + deleted bool + err error + unsupported *unsupportedConsumerAssignment +} + +type unsupportedConsumerAssignment struct { + json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. + reason string + info ConsumerInfo + sysc *client + infoSub *subscription +} + +func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignment { + reason := "stopped" + if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { + if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { + reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel) + } + } + return &unsupportedConsumerAssignment{ + json: json, + reason: reason, + info: ConsumerInfo{ + Stream: ca.Stream, + Name: ca.Name, + Created: ca.Created, + Config: setDynamicConsumerMetadata(ca.Config), + TimeStamp: time.Now().UTC(), + }, + } +} + +func (uca *unsupportedConsumerAssignment) setupInfoSub(s *Server, ca *consumerAssignment) { + if uca.infoSub != nil { + return + } + + // Bind to the system account. + ic := s.createInternalJetStreamClient() + ic.registerWithAccount(s.SystemAccount()) + uca.sysc = ic + + // Note below the way we subscribe here is so that we can send requests to ourselves. + isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), ca.Stream, ca.Name) + uca.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, ic, uca.handleClusterConsumerInfoRequest) +} + +func (uca *unsupportedConsumerAssignment) handleClusterConsumerInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) { + s, acc := c.srv, c.acc + info := consumerInfoClusterResponse{OfflineReason: uca.reason, ConsumerInfo: uca.info} + s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay) +} + +func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) { + if uca.infoSub != nil { + s.sysUnsubscribe(uca.infoSub) + uca.infoSub = nil + } + if uca.sysc != nil { + uca.sysc.closeConnection(ClientClosed) + uca.sysc = nil + } +} + +type writeableConsumerAssignment struct { + consumerAssignment + // Internal + unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. +} + +func (wca *writeableConsumerAssignment) MarshalJSON() ([]byte, error) { + if wca.unsupportedJson != nil { + return wca.unsupportedJson, nil + } + return json.Marshal(wca.consumerAssignment) +} + +func (wca *writeableConsumerAssignment) UnmarshalJSON(data []byte) error { + var unsupported bool + var ca consumerAssignment + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&ca); err != nil { + unsupported = true + ca = consumerAssignment{} + if err = json.Unmarshal(data, &ca); err != nil { + return err + } + } + wca.consumerAssignment = ca + if unsupported || (wca.Config != nil && !supportsRequiredApiLevel(wca.Config.Metadata)) { + wca.unsupportedJson = data + } + return nil } // streamPurge is what the stream leader will replicate when purging a stream. @@ -450,6 +604,10 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // For R1 it will make sure the stream is present on this server. func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error { js.mu.RLock() + if sa != nil && sa.unsupported != nil { + js.mu.RUnlock() + return nil + } s, cc := js.srv, js.cluster if cc == nil { // Non-clustered mode @@ -503,10 +661,14 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error { // isConsumerHealthy will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) error { + js.mu.RLock() + if ca != nil && ca.unsupported != nil { + js.mu.RUnlock() + return nil + } if mset == nil { return errors.New("stream missing") } - js.mu.RLock() s, cc := js.srv, js.cluster if cc == nil { // Non-clustered mode @@ -1388,12 +1550,44 @@ func (js *jetStream) checkClusterSize() { // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { + backingStreamAssignment + // Internal + unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. +} + +type backingStreamAssignment struct { Client *ClientInfo `json:"client,omitempty"` Created time.Time `json:"created"` Config *StreamConfig `json:"stream"` Group *raftGroup `json:"group"` Sync string `json:"sync"` - Consumers []*consumerAssignment + Consumers []*writeableConsumerAssignment +} + +func (wsa *writeableStreamAssignment) MarshalJSON() ([]byte, error) { + if wsa.unsupportedJson != nil { + return wsa.unsupportedJson, nil + } + return json.Marshal(wsa.backingStreamAssignment) +} + +func (wsa *writeableStreamAssignment) UnmarshalJSON(data []byte) error { + var unsupported bool + var bsa backingStreamAssignment + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&bsa); err != nil { + unsupported = true + bsa = backingStreamAssignment{} + if err = json.Unmarshal(data, &bsa); err != nil { + return err + } + } + wsa.backingStreamAssignment = bsa + if unsupported || (wsa.Config != nil && !supportsRequiredApiLevel(wsa.Config.Metadata)) { + wsa.unsupportedJson = data + } + return nil } func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConfig, bool) { @@ -1418,13 +1612,19 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { streams := make([]writeableStreamAssignment, 0, nsa) for _, asa := range cc.streams { for _, sa := range asa { + if sa.unsupported != nil && sa.unsupported.json != nil { + streams = append(streams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json}) + continue + } wsa := writeableStreamAssignment{ - Client: sa.Client.forAssignmentSnap(), - Created: sa.Created, - Config: sa.Config, - Group: sa.Group, - Sync: sa.Sync, - Consumers: make([]*consumerAssignment, 0, len(sa.consumers)), + backingStreamAssignment: backingStreamAssignment{ + Client: sa.Client.forAssignmentSnap(), + Created: sa.Created, + Config: sa.Config, + Group: sa.Group, + Sync: sa.Sync, + Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), + }, } for _, ca := range sa.consumers { // Skip if the consumer is pending, we can't include it in our snapshot. @@ -1432,11 +1632,16 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { if ca.pending { continue } + if ca.unsupported != nil && ca.unsupported.json != nil { + wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json}) + nca++ + continue + } cca := *ca cca.Stream = wsa.Config.Name // Needed for safe roll-backs. cca.Client = cca.Client.forAssignmentSnap() cca.Subject, cca.Reply = _EMPTY_, _EMPTY_ - wsa.Consumers = append(wsa.Consumers, &cca) + wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{consumerAssignment: cca}) nca++ } streams = append(streams, wsa) @@ -1495,11 +1700,18 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove streams[wsa.Client.serviceAccount()] = as } sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, Config: wsa.Config, Group: wsa.Group, Sync: wsa.Sync} + if wsa.unsupportedJson != nil { + sa.unsupported = newUnsupportedStreamAssignment(js.srv, sa, wsa.unsupportedJson) + } if len(wsa.Consumers) > 0 { sa.consumers = make(map[string]*consumerAssignment) - for _, ca := range wsa.Consumers { - if ca.Stream == _EMPTY_ { - ca.Stream = sa.Config.Name // Rehydrate from the stream name. + for _, wca := range wsa.Consumers { + if wca.Stream == _EMPTY_ { + wca.Stream = sa.Config.Name // Rehydrate from the stream name. + } + ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, Config: wca.Config, Group: wca.Group, Subject: wca.Subject, Reply: wca.Reply, State: wca.State} + if wca.unsupportedJson != nil { + ca.unsupported = newUnsupportedConsumerAssignment(ca, wca.unsupportedJson) } sa.consumers[ca.Name] = ca } @@ -1702,6 +1914,9 @@ func (js *jetStream) processAddPeer(peer string) { for _, asa := range cc.streams { for _, sa := range asa { + if sa.unsupported != nil { + continue + } if sa.missingPeers() { // Make sure the right cluster etc. if si.cluster != sa.Client.Cluster { @@ -1713,6 +1928,9 @@ func (js *jetStream) processAddPeer(peer string) { // Send our proposal for this csa. Also use same group definition for all the consumers as well. cc.meta.Propose(encodeAddStreamAssignment(csa)) for _, ca := range sa.consumers { + if ca.unsupported != nil { + continue + } // Ephemerals are R=1, so only auto-remap durables, or R>1. if ca.Config.Durable != _EMPTY_ || len(ca.Group.Peers) > 1 { cca := ca.copyGroup() @@ -1770,6 +1988,9 @@ func (js *jetStream) processRemovePeer(peer string) { for _, asa := range cc.streams { for _, sa := range asa { + if sa.unsupported != nil { + continue + } if rg := sa.Group; rg.isMember(peer) { js.removePeerFromStreamLocked(sa, peer) } @@ -1803,6 +2024,9 @@ func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer strin cc.meta.Propose(encodeAddStreamAssignment(csa)) rg := csa.Group for _, ca := range sa.consumers { + if ca.unsupported != nil { + continue + } // Ephemerals are R=1, so only auto-remap durables, or R>1. if ca.Config.Durable != _EMPTY_ { cca := ca.copyGroup() @@ -1869,7 +2093,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo buf := e.Data switch entryOp(buf[0]) { case assignStreamOp: - sa, err := decodeStreamAssignment(buf[1:]) + sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err @@ -1879,11 +2103,11 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo key := sa.recoveryKey() ru.addStreams[key] = sa delete(ru.removeStreams, key) - } else if js.processStreamAssignment(sa) { - didRemoveStream = true + } else { + js.processStreamAssignment(sa) } case removeStreamOp: - sa, err := decodeStreamAssignment(buf[1:]) + sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err @@ -1962,7 +2186,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo didRemoveConsumer = true } case updateStreamOp: - sa, err := decodeStreamAssignment(buf[1:]) + sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err @@ -2668,6 +2892,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps js.mu.RLock() var needToWait bool for name, c := range sa.consumers { + if c.unsupported != nil { + continue + } for _, peer := range c.Group.Peers { // If we have peers still in the old set block. if oldPeerSet[peer] { @@ -2927,6 +3154,9 @@ func (mset *stream) resetClusteredState(err error) bool { if cc := js.cluster; cc != nil && cc.meta != nil { ourID := cc.meta.ID() for _, ca := range sa.consumers { + if ca.unsupported != nil { + continue + } if rg := ca.Group; rg != nil && rg.isMember(ourID) { rg.node = nil // Erase group raft/node state. consumers = append(consumers, ca) @@ -3497,7 +3727,7 @@ func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignm } // processStreamAssignment is called when followers have replicated an assignment. -func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { +func (js *jetStream) processStreamAssignment(sa *streamAssignment) { js.mu.Lock() s, cc := js.srv, js.cluster accName, stream := sa.Client.serviceAccount(), sa.Config.Name @@ -3516,26 +3746,57 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { if s == nil || noMeta { js.mu.Unlock() - return false + return } accStreams := cc.streams[accName] if accStreams == nil { accStreams = make(map[string]*streamAssignment) - } else if osa := accStreams[stream]; osa != nil && osa != sa { - // Copy over private existing state from former SA. - if sa.Group != nil { - sa.Group.node = osa.Group.node + } else if osa := accStreams[stream]; osa != nil { + if osa != sa { + // Copy over private existing state from former SA. + if sa.Group != nil { + sa.Group.node = osa.Group.node + } + sa.consumers = osa.consumers + sa.responded = osa.responded + sa.err = osa.err + } + // Unsubscribe if it was previously unsupported. + if osa.unsupported != nil { + osa.unsupported.closeInfoSub(js.srv) + // If we've seen unsupported once, it remains for the lifetime of this server process. + if sa.unsupported == nil { + sa.unsupported = osa.unsupported + } } - sa.consumers = osa.consumers - sa.responded = osa.responded - sa.err = osa.err } // Update our state. accStreams[stream] = sa cc.streams[accName] = accStreams hasResponded := sa.responded + + // If unsupported, we can't register any further. + if sa.unsupported != nil { + sa.unsupported.setupInfoSub(s, sa) + apiLevel := getRequiredApiLevel(sa.Config.Metadata) + s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + js.mu.Unlock() + + // Need to stop the stream, we can't keep running with an old config. + acc, err := s.LookupAccount(accName) + if err != nil { + return + } + mset, err := acc.lookupStream(stream) + if err != nil || mset.closed.Load() { + return + } + s.Warnf("Stopping unsupported stream '%s > %s'", accName, stream) + mset.stop(false, false) + return + } js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -3556,11 +3817,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { } else { s.Debugf(ll) } - return false + return } - var didRemove bool - // Check if this is for us.. if isMember { js.processClusterCreateStream(acc, sa) @@ -3574,10 +3833,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { js.mu.Lock() cc.streamsCheck = true js.mu.Unlock() - return false } - - return didRemove } // processUpdateStreamAssignment is called when followers have replicated an updated assignment. @@ -3641,6 +3897,36 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { sa.Group.node = nil } } + + // Unsubscribe if it was previously unsupported. + if osa.unsupported != nil { + osa.unsupported.closeInfoSub(js.srv) + // If we've seen unsupported once, it remains for the lifetime of this server process. + if sa.unsupported == nil { + sa.unsupported = osa.unsupported + } + } + + // If unsupported, we can't register any further. + if sa.unsupported != nil { + sa.unsupported.setupInfoSub(s, sa) + apiLevel := getRequiredApiLevel(sa.Config.Metadata) + s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + js.mu.Unlock() + + // Need to stop the stream, we can't keep running with an old config. + acc, err := s.LookupAccount(accName) + if err != nil { + return + } + mset, err := acc.lookupStream(stream) + if err != nil || mset.closed.Load() { + return + } + s.Warnf("Stopping unsupported stream '%s > %s'", accName, stream) + mset.stop(false, false) + return + } js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -4255,6 +4541,13 @@ func (js *jetStream) processStreamRemoval(sa *streamAssignment) { accStreams := cc.streams[sa.Client.serviceAccount()] needDelete := accStreams != nil && accStreams[stream] != nil if needDelete { + if osa := accStreams[stream]; osa != nil && osa.unsupported != nil { + osa.unsupported.closeInfoSub(js.srv) + // Remember we used to be unsupported, just so we can send a successful delete response. + if sa.unsupported == nil { + sa.unsupported = osa.unsupported + } + } delete(accStreams, stream) if len(accStreams) == 0 { delete(cc.streams, sa.Client.serviceAccount()) @@ -4275,7 +4568,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, s := js.srv node := sa.Group.node hadLeader := node == nil || !node.Leaderless() - offline := s.allPeersOffline(sa.Group) + offline := s.allPeersOffline(sa.Group) || sa.unsupported != nil var isMetaLeader bool if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() @@ -4313,7 +4606,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, } // This is a stop gap cleanup in case - // 1) the account does not exist (and mset couldn't be stopped) and/or + // 1) the account or mset does not exist and/or // 2) node was nil (and couldn't be deleted) if !stopped || node == nil { if sacc := s.SystemAccount(); sacc != nil { @@ -4408,6 +4701,15 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { } ca.responded = oca.responded ca.err = oca.err + + // Unsubscribe if it was previously unsupported. + if oca.unsupported != nil { + oca.unsupported.closeInfoSub(s) + // If we've seen unsupported once, it remains for the lifetime of this server process. + if ca.unsupported == nil { + ca.unsupported = oca.unsupported + } + } } // Capture the optional state. We will pass it along if we are a member to apply. @@ -4419,6 +4721,35 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca ca.pending = false + + // If unsupported, we can't register any further. + if ca.unsupported != nil { + ca.unsupported.setupInfoSub(s, ca) + apiLevel := getRequiredApiLevel(ca.Config.Metadata) + s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel) + + // Mark stream as unsupported as well + if sa.unsupported == nil { + sa.unsupported = newUnsupportedStreamAssignment(s, sa, nil) + } + sa.unsupported.setupInfoSub(s, sa) + js.mu.Unlock() + + // Be conservative by protecting the whole stream, even if just one consumer is unsupported. + // This ensures it's safe, even with Interest-based retention where it would otherwise + // continue accepting but dropping messages. + acc, err := s.LookupAccount(accName) + if err != nil { + return + } + mset, err := acc.lookupStream(stream) + if err != nil || mset.closed.Load() { + return + } + s.Warnf("Stopping unsupported stream '%s > %s'", accName, stream) + mset.stop(false, false) + return + } js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -4514,6 +4845,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { js.mu.Unlock() return } + wasLeader := cc.isConsumerLeader(ca.Client.serviceAccount(), ca.Stream, ca.Name) // Delete from our state. @@ -4526,6 +4858,10 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { needDelete = true oca.deleted = true delete(sa.consumers, ca.Name) + // Remember we used to be unsupported, just so we can send a successful delete response. + if ca.unsupported == nil { + ca.unsupported = oca.unsupported + } } } } @@ -4807,7 +5143,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, wasLea js.mu.RLock() s := js.srv node := ca.Group.node - offline := s.allPeersOffline(ca.Group) + offline := s.allPeersOffline(ca.Group) || ca.unsupported != nil var isMetaLeader bool if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() @@ -4815,6 +5151,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, wasLea recovering := ca.recovering js.mu.RUnlock() + stopped := false var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} var err error var acc *Account @@ -4824,13 +5161,9 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, wasLea if mset, _ := acc.lookupStream(ca.Stream); mset != nil { if o := mset.lookupConsumer(ca.Name); o != nil { err = o.stopWithFlags(true, false, true, wasLeader) + stopped = true } } - } else if ca.Group != nil { - // We have a missing account, see if we can cleanup. - if sacc := s.SystemAccount(); sacc != nil { - os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) - } } // Always delete the node if present. @@ -4838,6 +5171,19 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, wasLea node.Delete() } + // This is a stop gap cleanup in case + // 1) the account, mset, or consumer does not exist and/or + // 2) node was nil (and couldn't be deleted) + if !stopped || node == nil { + if sacc := s.SystemAccount(); sacc != nil { + os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) + } + } + + accDir := filepath.Join(js.config.StoreDir, ca.Client.serviceAccount()) + consumersDir := filepath.Join(accDir, streamsDir, ca.Stream, consumerDir) + os.RemoveAll(filepath.Join(consumersDir, ca.Name)) + if !wasLeader || ca.Reply == _EMPTY_ { if !(offline && isMetaLeader) { return @@ -5871,6 +6217,9 @@ func (js *jetStream) processLeaderChange(isLeader bool) { cc := js.cluster for acc, asa := range cc.streams { for _, sa := range asa { + if sa.unsupported != nil { + continue + } if sa.Sync == _EMPTY_ { s.Warnf("Stream assignment corrupt for stream '%s > %s'", acc, sa.Config.Name) nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client} @@ -6123,6 +6472,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo peerHA := make(map[string]int, len(peers)) for _, asa := range cc.streams { for _, sa := range asa { + if sa.unsupported != nil { + continue + } isHA := len(sa.Group.Peers) > 1 for _, peer := range sa.Group.Peers { peerStreams[peer]++ @@ -7044,11 +7396,11 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt // Create an inbox for our responses and send out our requests. s.mu.Lock() inbox := s.newRespInbox() - rc := make(chan *StreamInfo, len(streams)) + rc := make(chan *streamInfoClusterResponse, len(streams)) // Store our handler. s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { - var si StreamInfo + var si streamInfoClusterResponse if err := json.Unmarshal(msg, &si); err != nil { s.Warnf("Error unmarshalling clustered stream info response:%v", err) return @@ -7116,10 +7468,14 @@ LOOP: si.State.Consumers = consCount } delete(sent, si.Config.Name) - resp.Streams = append(resp.Streams, si) - // Check to see if we are done. - if len(resp.Streams) == len(streams) { - break LOOP + if si.OfflineReason == _EMPTY_ { + resp.Streams = append(resp.Streams, &si.StreamInfo) + } else if _, ok := resp.Offline[si.Config.Name]; !ok { + if resp.Offline == nil { + resp.Offline = make(map[string]string, 1) + } + resp.Offline[si.Config.Name] = si.OfflineReason + missingNames = append(missingNames, si.Config.Name) } } } @@ -7191,11 +7547,11 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of // Create an inbox for our responses and send out requests. s.mu.Lock() inbox := s.newRespInbox() - rc := make(chan *ConsumerInfo, len(consumers)) + rc := make(chan *consumerInfoClusterResponse, len(consumers)) // Store our handler. s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { - var ci ConsumerInfo + var ci consumerInfoClusterResponse if err := json.Unmarshal(msg, &ci); err != nil { s.Warnf("Error unmarshaling clustered consumer info response:%v", err) return @@ -7259,10 +7615,14 @@ LOOP: break LOOP case ci := <-rc: delete(sent, ci.Name) - resp.Consumers = append(resp.Consumers, ci) - // Check to see if we are done. - if len(resp.Consumers) == len(consumers) { - break LOOP + if ci.OfflineReason == _EMPTY_ { + resp.Consumers = append(resp.Consumers, &ci.ConsumerInfo) + } else if _, ok := resp.Offline[ci.Name]; !ok { + if resp.Offline == nil { + resp.Offline = make(map[string]string, 1) + } + resp.Offline[ci.Name] = ci.OfflineReason + missingNames = append(missingNames, ci.Name) } } } @@ -7415,14 +7775,24 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { return bb.Bytes() } -func decodeStreamAssignment(buf []byte) (*streamAssignment, error) { +func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { + var unsupported bool var sa streamAssignment - err := json.Unmarshal(buf, &sa) - if err != nil { - return nil, err + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&sa); err != nil { + unsupported = true + sa = streamAssignment{} + if err = json.Unmarshal(buf, &sa); err != nil { + return nil, err + } } fixCfgMirrorWithDedupWindow(sa.Config) - return &sa, err + + if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { + sa.unsupported = newUnsupportedStreamAssignment(s, &sa, copyBytes(buf)) + } + return &sa, nil } func encodeDeleteRange(dr *DeleteRange) []byte { @@ -7554,6 +7924,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // Don't count DIRECTS. total := 0 for cn, ca := range sa.consumers { + if ca.unsupported != nil { + continue + } // If the consumer name is specified and we think it already exists, then // we're likely updating an existing consumer, so don't count it. Otherwise // we will incorrectly return NewJSMaximumConsumersLimitError for an update. @@ -7853,9 +8226,22 @@ func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { } func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { + var unsupported bool var ca consumerAssignment - err := json.Unmarshal(buf, &ca) - return &ca, err + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&ca); err != nil { + unsupported = true + ca = consumerAssignment{} + if err = json.Unmarshal(buf, &ca); err != nil { + return nil, err + } + } + + if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { + ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(buf)) + } + return &ca, nil } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { @@ -7870,10 +8256,33 @@ func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { + var unsupported bool var ca consumerAssignment bb := bytes.NewBuffer(buf) s2d := s2.NewReader(bb) - return &ca, json.NewDecoder(s2d).Decode(&ca) + decoder := json.NewDecoder(s2d) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&ca); err != nil { + unsupported = true + ca = consumerAssignment{} + bb = bytes.NewBuffer(buf) + s2d = s2.NewReader(bb) + if err = json.NewDecoder(s2d).Decode(&ca); err != nil { + return nil, err + } + } + + if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { + bb = bytes.NewBuffer(buf) + s2d = s2.NewReader(bb) + dec, err := io.ReadAll(s2d) + if err != nil { + return nil, err + } + ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(dec)) + } + + return &ca, nil } var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg") @@ -8950,6 +9359,9 @@ func (js *jetStream) streamAlternates(ci *ClientInfo, stream string) []StreamAlt var alts []StreamAlternate for _, sa := range cc.streams[acc.Name] { + if sa.unsupported != nil { + continue + } // Add in ourselves and any mirrors. if sa.Config.Name == stream || (sa.Config.Mirror != nil && sa.Config.Mirror.Name == stream) { alts = append(alts, StreamAlternate{Name: sa.Config.Name, Domain: domain, Cluster: sa.Group.Cluster}) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b78aaa5ebd7..93da665c76d 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -23,16 +23,19 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/rand" "os" "path/filepath" "reflect" + "strconv" "strings" "sync" "sync/atomic" "testing" "time" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) @@ -10134,6 +10137,552 @@ func TestJetStreamClusterScheduledDelayedMessage(t *testing.T) { } } +func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *testing.T) { + clusterName := "R3S" + c := createJetStreamClusterExplicit(t, clusterName, 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + ml := c.leader() + require_NotNil(t, ml) + + sjs := ml.getJetStream() + require_NotNil(t, sjs) + sjs.mu.Lock() + cc := sjs.cluster + if cc == nil || cc.meta == nil { + sjs.mu.Unlock() + t.Fatalf("Expected cluster to be initialized") + } + + restart := func() { + t.Helper() + for _, s := range c.servers { + sjs = s.getJetStream() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + meta := sjs.getMetaGroup() + meta.InstallSnapshot(snap) + } + + c.stopAll() + c.restartAllSamePorts() + c.waitOnLeader() + ml = c.leader() + require_NotNil(t, ml) + require_NoError(t, nc.ForceReconnect()) + + sjs = ml.getJetStream() + require_NotNil(t, sjs) + sjs.mu.Lock() + cc = sjs.cluster + if cc == nil || cc.meta == nil { + sjs.mu.Unlock() + t.Fatalf("Expected cluster to be initialized") + } + sjs.mu.Unlock() + } + + getValidMetaSnapshot := func() (wsas []writeableStreamAssignment) { + t.Helper() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, len(snap) > 0) + dec, err := s2.Decode(nil, snap) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(dec, &wsas)) + return wsas + } + + // Create a stream that's unsupported. + ci := &ClientInfo{ + Account: globalAccountName, + Cluster: clusterName, + } + scfg := &StreamConfig{ + Name: "DowngradeStreamTest", + Storage: FileStorage, + Replicas: 3, + Metadata: map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt - 1)}, + } + rg, perr := sjs.createGroupForStream(ci, scfg) + if perr != nil { + sjs.mu.Unlock() + require_NoError(t, perr) + } + sa := &streamAssignment{ + Config: scfg, + Group: rg, + Created: time.Now().UTC(), + Client: ci, + } + err := cc.meta.Propose(encodeAddStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + unsupported := func(requiredApiLevel int) string { + return fmt.Sprintf("unsupported - required API level: %d, current API level: %d", requiredApiLevel, JSApiLevel) + } + expectStreamInfo := func(offlineReason, streamName string) { + var msg *nats.Msg + checkFor(t, 3*time.Second, 200*time.Millisecond, func() error { + msg, err = nc.Request(fmt.Sprintf(JSApiStreamInfoT, streamName), nil, time.Second) + return err + }) + var si JSApiStreamInfoResponse + require_NoError(t, json.Unmarshal(msg.Data, &si)) + require_NotNil(t, si.Error) + require_Error(t, si.Error, NewJSStreamOfflineReasonError(errors.New(offlineReason))) + + var sn JSApiStreamNamesResponse + msg, err = nc.Request(JSApiStreams, nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &sn)) + require_Len(t, len(sn.Streams), 1) + require_Equal(t, sn.Streams[0], streamName) + + var sl JSApiStreamListResponse + msg, err = nc.Request(JSApiStreamList, nil, 2*time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &sl)) + require_Len(t, len(sl.Streams), 0) + require_Len(t, len(sl.Missing), 1) + require_Equal(t, sl.Missing[0], streamName) + require_Len(t, len(sl.Offline), 1) + require_Equal(t, sl.Offline[streamName], offlineReason) + } + + // Stream should be reported as offline, but healthz should report healthy to not block downgrades. + expectStreamInfo(unsupported(math.MaxInt-1), "DowngradeStreamTest") + health := ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectStreamInfo(unsupported(math.MaxInt-1), "DowngradeStreamTest") + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas := getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + + // Update a stream that's unsupported. + sjs.mu.Lock() + scfg.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = cc.meta.Propose(encodeUpdateStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + // Stream should be reported as offline, but healthz should report healthy to not block downgrades. + expectStreamInfo(unsupported(math.MaxInt), "DowngradeStreamTest") + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectStreamInfo(unsupported(math.MaxInt), "DowngradeStreamTest") + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + + // Deleting a stream should always work, even if it is unsupported. + require_NoError(t, js.DeleteStream("DowngradeStreamTest")) + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, snap == nil) + + // Create a supported stream and consumer. + _, err = js.AddStream(&nats.StreamConfig{Name: "DowngradeConsumerTest", Replicas: 3}) + require_NoError(t, err) + _, err = js.AddConsumer("DowngradeConsumerTest", &nats.ConsumerConfig{Name: "consumer"}) + require_NoError(t, err) + + // Create a consumer that's unsupported. + sjs.mu.Lock() + ccfg := &ConsumerConfig{ + Name: "DowngradeConsumerTest", + Replicas: 3, + Metadata: map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt - 1)}, + } + rg = cc.createGroupForConsumer(ccfg, sa) + ca := &consumerAssignment{ + Config: ccfg, + Group: rg, + Stream: "DowngradeConsumerTest", + Name: "DowngradeConsumerTest", + Created: time.Now().UTC(), + Client: ci, + } + err = cc.meta.Propose(encodeAddConsumerAssignment(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + expectConsumerInfo := func(offlineReason string) { + var msg *nats.Msg + checkFor(t, 3*time.Second, 200*time.Millisecond, func() error { + msg, err = nc.Request(fmt.Sprintf(JSApiConsumerInfoT, "DowngradeConsumerTest", "DowngradeConsumerTest"), nil, time.Second) + return err + }) + var ci JSApiConsumerInfoResponse + require_NoError(t, json.Unmarshal(msg.Data, &ci)) + require_NotNil(t, ci.Error) + require_Error(t, ci.Error, NewJSConsumerOfflineReasonError(errors.New(offlineReason))) + + var cn JSApiConsumerNamesResponse + msg, err = nc.Request(fmt.Sprintf(JSApiConsumersT, "DowngradeConsumerTest"), nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &cn)) + require_Len(t, len(cn.Consumers), 2) + require_Equal(t, cn.Consumers[0], "DowngradeConsumerTest") + for _, name := range cn.Consumers { + require_True(t, name == "consumer" || name == "DowngradeConsumerTest") + } + + var cl JSApiConsumerListResponse + msg, err = nc.Request(fmt.Sprintf(JSApiConsumerListT, "DowngradeConsumerTest"), nil, 5*time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &cl)) + require_Len(t, len(cl.Consumers), 0) + require_Len(t, len(cl.Missing), 2) + for _, name := range cl.Missing { + require_True(t, name == "consumer" || name == "DowngradeConsumerTest") + } + require_Len(t, len(cl.Offline), 1) + require_Equal(t, cl.Offline["DowngradeConsumerTest"], offlineReason) + + // Stream should also be reported as offline. + // Specifically, as "stopped" because it's still supported, but can't run due to the unsupported consumer. + expectStreamInfo("stopped", "DowngradeConsumerTest") + } + + // Consumer should be reported as offline, but healthz should report healthy to not block downgrades. + expectConsumerInfo(unsupported(math.MaxInt - 1)) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectConsumerInfo(unsupported(math.MaxInt - 1)) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 2) + for _, wca := range wsas[0].Consumers { + if wca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + } else { + require_Equal(t, wca.Config.Name, "consumer") + } + } + + // Update a consumer (with compressed data) that's unsupported. + ccfg.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + sjs.mu.Lock() + err = cc.meta.Propose(encodeAddConsumerAssignmentCompressed(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + // Consumer should be reported as offline, but healthz should report healthy to not block downgrades. + expectConsumerInfo(unsupported(math.MaxInt)) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectConsumerInfo(unsupported(math.MaxInt)) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 2) + for _, wca := range wsas[0].Consumers { + if wca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + } else { + require_Equal(t, wca.Config.Name, "consumer") + } + } + + // Deleting a consumer should always work, even if it is unsupported. + require_NoError(t, js.DeleteConsumer("DowngradeConsumerTest", "DowngradeConsumerTest")) + c.waitOnAllCurrent() + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 1) + require_Equal(t, wsas[0].Consumers[0].Config.Name, "consumer") +} + +func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { + clusterName := "R3S" + c := createJetStreamClusterExplicit(t, clusterName, 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + ml := c.leader() + require_NotNil(t, ml) + + sjs := ml.getJetStream() + require_NotNil(t, sjs) + sjs.mu.Lock() + cc := sjs.cluster + if cc == nil || cc.meta == nil { + sjs.mu.Unlock() + t.Fatalf("Expected cluster to be initialized") + } + + restart := func() { + t.Helper() + for _, s := range c.servers { + sjs = s.getJetStream() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + meta := sjs.getMetaGroup() + meta.InstallSnapshot(snap) + } + + c.stopAll() + c.restartAllSamePorts() + c.waitOnLeader() + ml = c.leader() + require_NotNil(t, ml) + require_NoError(t, nc.ForceReconnect()) + + sjs = ml.getJetStream() + require_NotNil(t, sjs) + sjs.mu.Lock() + cc = sjs.cluster + if cc == nil || cc.meta == nil { + sjs.mu.Unlock() + t.Fatalf("Expected cluster to be initialized") + } + sjs.mu.Unlock() + } + + getValidMetaSnapshot := func() (wsas []writeableStreamAssignment) { + t.Helper() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, len(snap) > 0) + dec, err := s2.Decode(nil, snap) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(dec, &wsas)) + return wsas + } + + // Create a stream that's unsupported. + ci := &ClientInfo{ + Account: globalAccountName, + Cluster: clusterName, + } + scfg := &StreamConfig{ + Name: "DowngradeStreamTest", + Storage: FileStorage, + Replicas: 3, + } + rg, perr := sjs.createGroupForStream(ci, scfg) + if perr != nil { + sjs.mu.Unlock() + require_NoError(t, perr) + } + sa := &streamAssignment{ + Config: scfg, + Group: rg, + Created: time.Now().UTC(), + Client: ci, + } + err := cc.meta.Propose(encodeAddStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "DowngradeStreamTest") + + expectStreamInfo := func(offline bool) { + if !offline { + c.waitOnStreamLeader(globalAccountName, "DowngradeStreamTest") + } + var msg *nats.Msg + checkFor(t, 3*time.Second, 200*time.Millisecond, func() error { + msg, err = nc.Request(fmt.Sprintf(JSApiStreamInfoT, "DowngradeStreamTest"), nil, time.Second) + return err + }) + var si JSApiStreamInfoResponse + require_NoError(t, json.Unmarshal(msg.Data, &si)) + if !offline { + require_True(t, si.Error == nil) + } else { + require_NotNil(t, si.Error) + require_Contains(t, si.Error.Error(), "stream is offline", "unsupported", "required API level") + } + } + + // Stream is still supported, so it should be available and healthz should report healthy. + expectStreamInfo(false) + health := ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectStreamInfo(false) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas := getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + + // Update a stream to be unsupported. + sjs.mu.Lock() + scfg.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = cc.meta.Propose(encodeUpdateStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + // Stream should be reported as offline, but healthz should report healthy to not block downgrades. + expectStreamInfo(true) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectStreamInfo(true) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + + // Deleting a stream should always work, even if it is unsupported. + require_NoError(t, js.DeleteStream("DowngradeStreamTest")) + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, snap == nil) + + // Create a supported stream and consumer. + _, err = js.AddStream(&nats.StreamConfig{Name: "DowngradeConsumerTest", Replicas: 3}) + require_NoError(t, err) + + sjs.mu.Lock() + ccfg := &ConsumerConfig{ + Name: "DowngradeConsumerTest", + Replicas: 3, + } + rg = cc.createGroupForConsumer(ccfg, sa) + ca := &consumerAssignment{ + Config: ccfg, + Group: rg, + Stream: "DowngradeConsumerTest", + Name: "DowngradeConsumerTest", + Created: time.Now().UTC(), + Client: ci, + } + err = cc.meta.Propose(encodeAddConsumerAssignment(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnConsumerLeader(globalAccountName, "DowngradeConsumerTest", "DowngradeConsumerTest") + + expectConsumerInfo := func(offline bool) { + if !offline { + c.waitOnConsumerLeader(globalAccountName, "DowngradeConsumerTest", "DowngradeConsumerTest") + } + var msg *nats.Msg + checkFor(t, 3*time.Second, 200*time.Millisecond, func() error { + msg, err = nc.Request(fmt.Sprintf(JSApiConsumerInfoT, "DowngradeConsumerTest", "DowngradeConsumerTest"), nil, 2*time.Second) + return err + }) + var ci JSApiConsumerInfoResponse + require_NoError(t, json.Unmarshal(msg.Data, &ci)) + if !offline { + require_True(t, ci.Error == nil) + } else { + require_NotNil(t, ci.Error) + require_Contains(t, ci.Error.Error(), "consumer is offline", "unsupported", "required API level") + } + } + + // Consumer is still supported, so it should be available and healthz should report healthy. + expectConsumerInfo(false) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectConsumerInfo(false) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + require_Len(t, len(wsas[0].Consumers), 1) + + // Update a consumer to be unsupported. + ccfg.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + sjs.mu.Lock() + err = cc.meta.Propose(encodeAddConsumerAssignment(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + // Consumer should be reported as offline, but healthz should report healthy to not block downgrades. + expectConsumerInfo(true) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + restart() + expectConsumerInfo(true) + health = ml.healthz(&HealthzOptions{}) + require_Equal(t, health.StatusCode, 200) + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 1) + require_Equal(t, wsas[0].Consumers[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) +} + +func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + unsupportedJson := []byte("{\"unknown\": true}") + + sa, err := decodeStreamAssignment(s, unsupportedJson) + require_NoError(t, err) + require_True(t, bytes.Equal(sa.unsupported.json, unsupportedJson)) + + ca, err := decodeConsumerAssignment(unsupportedJson) + require_NoError(t, err) + require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) + + var bb bytes.Buffer + s2e := s2.NewWriter(&bb) + _, err = s2e.Write(unsupportedJson) + require_NoError(t, err) + require_NoError(t, s2e.Close()) + ca, err = decodeConsumerAssignmentCompressed(bb.Bytes()) + require_NoError(t, err) + require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) + + var wsa writeableStreamAssignment + require_NoError(t, wsa.UnmarshalJSON(unsupportedJson)) + require_True(t, bytes.Equal(wsa.unsupportedJson, unsupportedJson)) + + var wca writeableConsumerAssignment + require_NoError(t, wca.UnmarshalJSON(unsupportedJson)) + require_True(t, bytes.Equal(wca.unsupportedJson, unsupportedJson)) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 865d1422a06..99048acfd70 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -200,6 +200,9 @@ const ( // JSConsumerOfflineErr consumer is offline JSConsumerOfflineErr ErrorIdentifier = 10119 + // JSConsumerOfflineReasonErrF consumer is offline: {err} + JSConsumerOfflineReasonErrF ErrorIdentifier = 10195 + // JSConsumerOnMappedErr consumer direct on a mapped consumer JSConsumerOnMappedErr ErrorIdentifier = 10092 @@ -515,6 +518,9 @@ const ( // JSStreamOfflineErr stream is offline JSStreamOfflineErr ErrorIdentifier = 10118 + // JSStreamOfflineReasonErrF stream is offline: {err} + JSStreamOfflineReasonErrF ErrorIdentifier = 10194 + // JSStreamPurgeFailedF Generic stream purge failure error string ({err}) JSStreamPurgeFailedF ErrorIdentifier = 10110 @@ -649,6 +655,7 @@ var ( JSConsumerNameTooLongErrF: {Code: 400, ErrCode: 10102, Description: "consumer name is too long, maximum allowed is {max}"}, JSConsumerNotFoundErr: {Code: 404, ErrCode: 10014, Description: "consumer not found"}, JSConsumerOfflineErr: {Code: 500, ErrCode: 10119, Description: "consumer is offline"}, + JSConsumerOfflineReasonErrF: {Code: 500, ErrCode: 10195, Description: "consumer is offline: {err}"}, JSConsumerOnMappedErr: {Code: 400, ErrCode: 10092, Description: "consumer direct on a mapped consumer"}, JSConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10138, Description: "consumer subject filters cannot overlap"}, JSConsumerPriorityPolicyWithoutGroup: {Code: 400, ErrCode: 10159, Description: "Setting PriorityPolicy requires at least one PriorityGroup to be set"}, @@ -754,6 +761,7 @@ var ( JSStreamNotFoundErr: {Code: 404, ErrCode: 10059, Description: "stream not found"}, JSStreamNotMatchErr: {Code: 400, ErrCode: 10060, Description: "expected stream does not match"}, JSStreamOfflineErr: {Code: 500, ErrCode: 10118, Description: "stream is offline"}, + JSStreamOfflineReasonErrF: {Code: 500, ErrCode: 10194, Description: "stream is offline: {err}"}, JSStreamPurgeFailedF: {Code: 500, ErrCode: 10110, Description: "{err}"}, JSStreamReplicasNotSupportedErr: {Code: 500, ErrCode: 10074, Description: "replicas > 1 not supported in non-clustered mode"}, JSStreamReplicasNotUpdatableErr: {Code: 400, ErrCode: 10061, Description: "Replicas configuration can not be updated"}, @@ -1517,6 +1525,22 @@ func NewJSConsumerOfflineError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerOfflineErr] } +// NewJSConsumerOfflineReasonError creates a new JSConsumerOfflineReasonErrF error: "consumer is offline: {err}" +func NewJSConsumerOfflineReasonError(err error, opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + e := ApiErrors[JSConsumerOfflineReasonErrF] + args := e.toReplacerArgs([]interface{}{"{err}", err}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } +} + // NewJSConsumerOnMappedError creates a new JSConsumerOnMappedErr error: "consumer direct on a mapped consumer" func NewJSConsumerOnMappedError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -2705,6 +2729,22 @@ func NewJSStreamOfflineError(opts ...ErrorOption) *ApiError { return ApiErrors[JSStreamOfflineErr] } +// NewJSStreamOfflineReasonError creates a new JSStreamOfflineReasonErrF error: "stream is offline: {err}" +func NewJSStreamOfflineReasonError(err error, opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + e := ApiErrors[JSStreamOfflineReasonErrF] + args := e.toReplacerArgs([]interface{}{"{err}", err}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } +} + // NewJSStreamPurgeFailedError creates a new JSStreamPurgeFailedF error: "{err}" func NewJSStreamPurgeFailedError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 202dd18b08a..2d802e06d02 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21811,3 +21811,112 @@ func TestJetStreamScheduledMirrorOrSource(t *testing.T) { }) require_Error(t, err, NewJSSourceWithMsgSchedulesError()) } + +func TestJetStreamOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + port := s.getOpts().Port + sd := s.JetStreamConfig().StoreDir + + _, err := s.globalAccount().addStream(&StreamConfig{ + Name: "DowngradeStreamTest", + Storage: FileStorage, + Replicas: 1, + Metadata: map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)}, + }) + require_NoError(t, err) + + s.Shutdown() + s = RunJetStreamServerOnPort(port, sd) + defer s.Shutdown() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + offlineReason := fmt.Sprintf("unsupported - required API level: %d, current API level: %d", math.MaxInt, JSApiLevel) + msg, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "DowngradeStreamTest"), nil, time.Second) + require_NoError(t, err) + var si JSApiStreamInfoResponse + require_NoError(t, json.Unmarshal(msg.Data, &si)) + require_NotNil(t, si.Error) + require_Error(t, si.Error, NewJSStreamOfflineReasonError(errors.New(offlineReason))) + + var sn JSApiStreamNamesResponse + msg, err = nc.Request(JSApiStreams, nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &sn)) + require_Len(t, len(sn.Streams), 1) + require_Equal(t, sn.Streams[0], "DowngradeStreamTest") + + var sl JSApiStreamListResponse + msg, err = nc.Request(JSApiStreamList, nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &sl)) + require_Len(t, len(sl.Streams), 0) + require_Len(t, len(sl.Missing), 1) + require_Equal(t, sl.Missing[0], "DowngradeStreamTest") + require_Len(t, len(sl.Offline), 1) + require_Equal(t, sl.Offline["DowngradeStreamTest"], offlineReason) + + mset, err := s.globalAccount().lookupStream("DowngradeStreamTest") + require_NoError(t, err) + require_True(t, mset.closed.Load()) + require_Equal(t, mset.offlineReason, offlineReason) + require_NoError(t, mset.delete()) + + s.Shutdown() + s = RunJetStreamServerOnPort(port, sd) + defer s.Shutdown() + + _, err = s.globalAccount().addStream(&StreamConfig{ + Name: "DowngradeConsumerTest", + Storage: FileStorage, + Replicas: 1, + }) + require_NoError(t, err) + mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest") + require_NoError(t, err) + _, err = mset.addConsumer(&ConsumerConfig{ + Name: "DowngradeConsumerTest", + Metadata: map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)}, + }) + require_NoError(t, err) + + s.Shutdown() + s = RunJetStreamServerOnPort(port, sd) + defer s.Shutdown() + + mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest") + require_NoError(t, err) + require_True(t, mset.closed.Load()) + require_Equal(t, mset.offlineReason, "stopped") + + obs := mset.getPublicConsumers() + require_Len(t, len(obs), 1) + require_True(t, obs[0].isClosed()) + require_Equal(t, obs[0].offlineReason, offlineReason) + + msg, err = nc.Request(fmt.Sprintf(JSApiConsumerInfoT, "DowngradeConsumerTest", "DowngradeConsumerTest"), nil, time.Second) + require_NoError(t, err) + var ci JSApiConsumerInfoResponse + require_NoError(t, json.Unmarshal(msg.Data, &ci)) + require_NotNil(t, ci.Error) + require_Error(t, ci.Error, NewJSConsumerOfflineReasonError(errors.New(offlineReason))) + + var cn JSApiConsumerNamesResponse + msg, err = nc.Request(fmt.Sprintf(JSApiConsumersT, "DowngradeConsumerTest"), nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &cn)) + require_Len(t, len(cn.Consumers), 1) + require_Equal(t, cn.Consumers[0], "DowngradeConsumerTest") + + var cl JSApiConsumerListResponse + msg, err = nc.Request(fmt.Sprintf(JSApiConsumerListT, "DowngradeConsumerTest"), nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &cl)) + require_Len(t, len(cl.Consumers), 0) + require_Len(t, len(cl.Missing), 1) + require_Equal(t, cl.Missing[0], "DowngradeConsumerTest") + require_Len(t, len(cl.Offline), 1) + require_Equal(t, cl.Offline["DowngradeConsumerTest"], offlineReason) +} diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index e5f10f3dd9e..08a7b51fc59 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -24,6 +24,23 @@ const ( JSServerLevelMetadataKey = "_nats.level" ) +// getRequiredApiLevel returns the required API level for the JetStream asset. +func getRequiredApiLevel(metadata map[string]string) string { + if l, ok := metadata[JSRequiredLevelMetadataKey]; ok && l != _EMPTY_ { + return l + } + return _EMPTY_ +} + +// supportsRequiredApiLevel returns whether the required API level for the JetStream asset is supported. +func supportsRequiredApiLevel(metadata map[string]string) bool { + if l := getRequiredApiLevel(metadata); l != _EMPTY_ { + li, err := strconv.Atoi(l) + return err == nil && li <= JSApiLevel + } + return true +} + // setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level. // Any dynamic metadata is removed, it must not be stored and only be added for responses. func setStaticStreamMetadata(cfg *StreamConfig) { @@ -65,10 +82,15 @@ func setStaticStreamMetadata(cfg *StreamConfig) { // setDynamicStreamMetadata adds dynamic fields into the (copied) metadata. func setDynamicStreamMetadata(cfg *StreamConfig) *StreamConfig { - newCfg := *cfg + var newCfg StreamConfig + if cfg != nil { + newCfg = *cfg + } newCfg.Metadata = make(map[string]string) - for key, value := range cfg.Metadata { - newCfg.Metadata[key] = value + if cfg != nil { + for key, value := range cfg.Metadata { + newCfg.Metadata[key] = value + } } newCfg.Metadata[JSServerVersionMetadataKey] = VERSION newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel) @@ -136,10 +158,15 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig) { // setDynamicConsumerMetadata adds dynamic fields into the (copied) metadata. func setDynamicConsumerMetadata(cfg *ConsumerConfig) *ConsumerConfig { - newCfg := *cfg + var newCfg ConsumerConfig + if cfg != nil { + newCfg = *cfg + } newCfg.Metadata = make(map[string]string) - for key, value := range cfg.Metadata { - newCfg.Metadata[key] = value + if cfg != nil { + for key, value := range cfg.Metadata { + newCfg.Metadata[key] = value + } } newCfg.Metadata[JSServerVersionMetadataKey] = VERSION newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index fc65b268b5a..150c7bf7329 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -30,6 +30,19 @@ import ( "github.com/nats-io/nats.go" ) +func TestGetAndSupportsRequiredApiLevel(t *testing.T) { + require_Equal(t, getRequiredApiLevel(nil), _EMPTY_) + require_Equal(t, getRequiredApiLevel(map[string]string{}), _EMPTY_) + require_Equal(t, getRequiredApiLevel(map[string]string{JSRequiredLevelMetadataKey: "1"}), "1") + require_Equal(t, getRequiredApiLevel(map[string]string{JSRequiredLevelMetadataKey: "text"}), "text") + + require_True(t, supportsRequiredApiLevel(nil)) + require_True(t, supportsRequiredApiLevel(map[string]string{})) + require_True(t, supportsRequiredApiLevel(map[string]string{JSRequiredLevelMetadataKey: "1"})) + require_True(t, supportsRequiredApiLevel(map[string]string{JSRequiredLevelMetadataKey: strconv.Itoa(JSApiLevel)})) + require_False(t, supportsRequiredApiLevel(map[string]string{JSRequiredLevelMetadataKey: "text"})) +} + func metadataAtLevel(featureLevel string) map[string]string { return map[string]string{ JSRequiredLevelMetadataKey: featureLevel, diff --git a/server/monitor.go b/server/monitor.go index 2fe64fd530b..84d44279c8c 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3842,6 +3842,9 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { } for stream, sa := range asa { + if sa != nil && sa.unsupported != nil { + continue + } // Make sure we can look up if err := js.isStreamHealthy(acc, sa); err != nil { if !details { diff --git a/server/stream.go b/server/stream.go index 1661ecb6632..852d75536d9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -111,10 +111,10 @@ type StreamConfig struct { SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"` // AllowMsgCounter allows a stream to use (only) counter CRDTs. - AllowMsgCounter bool `json:"allow_msg_counter"` + AllowMsgCounter bool `json:"allow_msg_counter,omitempty"` // AllowAtomicPublish allows atomic batch publishing into the stream. - AllowAtomicPublish bool `json:"allow_atomic"` + AllowAtomicPublish bool `json:"allow_atomic,omitempty"` // AllowMsgSchedules allows the scheduling of messages. AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"` @@ -228,6 +228,13 @@ type StreamInfo struct { TimeStamp time.Time `json:"ts"` } +// streamInfoClusterResponse is a response used in a cluster to communicate the stream info +// back to the meta leader as part of a stream list request. +type streamInfoClusterResponse struct { + StreamInfo + OfflineReason string `json:"offline_reason,omitempty"` // Reporting when a stream is offline. +} + type StreamAlternate struct { Name string `json:"name"` Domain string `json:"domain,omitempty"` @@ -422,6 +429,10 @@ type stream struct { monitorWg sync.WaitGroup // Wait group for the monitor routine. batches *batching // Inflight batches prior to committing them. + + // If standalone/single-server, the offline reason needs to be stored directly in the stream. + // Otherwise, if clustered it will be part of the stream assignment. + offlineReason string } // inflightSubjectRunningTotal stores a running total of inflight messages for a specific subject. @@ -6754,7 +6765,7 @@ func (mset *stream) delete() error { // Internal function to stop or delete the stream. func (mset *stream) stop(deleteFlag, advisory bool) error { mset.mu.RLock() - js, jsa, name := mset.js, mset.jsa, mset.cfg.Name + js, jsa, name, offlineReason := mset.js, mset.jsa, mset.cfg.Name, mset.offlineReason mset.mu.RUnlock() if jsa == nil { @@ -6765,7 +6776,10 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Remove from our account map first. jsa.mu.Lock() - delete(jsa.streams, name) + // Preserve in the account if it's marked offline, to have it remain queryable. + if deleteFlag || offlineReason == _EMPTY_ { + delete(jsa.streams, name) + } accName := jsa.account.Name jsa.mu.Unlock() @@ -6798,9 +6812,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { for _, o := range mset.consumers { obs = append(obs, o) } - mset.clsMu.Lock() - mset.consumers, mset.cList, mset.csl = nil, nil, nil - mset.clsMu.Unlock() + // Preserve the consumers if it's marked offline, to have them remain queryable. + if deleteFlag || offlineReason == _EMPTY_ { + mset.clsMu.Lock() + mset.consumers, mset.cList, mset.csl = nil, nil, nil + mset.clsMu.Unlock() + } // Check if we are a mirror. if mset.mirror != nil && mset.mirror.sub != nil { @@ -6902,14 +6919,17 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } if deleteFlag { + // cleanup directories after the stream + accDir := filepath.Join(js.config.StoreDir, accName) if store != nil { // Ignore errors. store.Delete(false) + } else { + streamDir := filepath.Join(accDir, streamsDir) + os.RemoveAll(filepath.Join(streamDir, name)) } // Release any resources. js.releaseStreamResources(&mset.cfg) - // cleanup directories after the stream - accDir := filepath.Join(js.config.StoreDir, accName) // Do cleanup in separate go routine similar to how fs will use purge here.. go func() { // no op if not empty