Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,8 +1348,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel)
if strictErr != nil {
offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
}
s.Warnf(" Detected unsupported stream '%s > %s': %s", a.Name, cfg.StreamConfig.Name, offlineReason)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr)
Expand Down Expand Up @@ -1588,8 +1592,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel)
if strictErr != nil {
offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
}
s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr)
Expand All @@ -1599,7 +1607,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if !e.mset.closed.Load() {
s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name())
e.mset.mu.Lock()
e.mset.offlineReason = "stopped"
e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name)
e.mset.mu.Unlock()
e.mset.stop(false, false)
}
Expand Down
53 changes: 32 additions & 21 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,15 @@ type unsupportedStreamAssignment struct {
infoSub *subscription
}

func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment {
func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error) *unsupportedStreamAssignment {
reason := "stopped"
if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) {
if err != nil {
if errstr := err.Error(); strings.HasPrefix(errstr, "json:") {
reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
reason = fmt.Sprintf("stopped - %s", errstr)
}
} else if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) {
if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ {
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel)
}
Expand Down Expand Up @@ -240,9 +246,15 @@ type unsupportedConsumerAssignment struct {
infoSub *subscription
}

func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment {
func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupportedConsumerAssignment {
reason := "stopped"
if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) {
if err != nil {
if errstr := err.Error(); strings.HasPrefix(errstr, "json:") {
reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
reason = fmt.Sprintf("stopped - %s", errstr)
}
} else if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) {
if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ {
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel)
}
Expand Down Expand Up @@ -4003,8 +4015,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
// If unsupported, we can't register any further.
if sa.unsupported != nil {
sa.unsupported.setupInfoSub(s, sa)
apiLevel := getRequiredApiLevel(sa.Config.Metadata)
s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel)
s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason)
js.mu.Unlock()

// Need to stop the stream, we can't keep running with an old config.
Expand Down Expand Up @@ -4133,8 +4144,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
// If unsupported, we can't register any further.
if sa.unsupported != nil {
sa.unsupported.setupInfoSub(s, sa)
apiLevel := getRequiredApiLevel(sa.Config.Metadata)
s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel)
s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason)
js.mu.Unlock()

// Need to stop the stream, we can't keep running with an old config.
Expand Down Expand Up @@ -4815,12 +4825,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// If unsupported, we can't register any further.
if ca.unsupported != nil {
ca.unsupported.setupInfoSub(s, ca)
apiLevel := getRequiredApiLevel(ca.Config.Metadata)
s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel)
s.Warnf("Detected unsupported consumer '%s > %s > %s': %s", accName, stream, ca.Name, ca.unsupported.reason)

// Mark stream as unsupported as well
if sa.unsupported == nil {
sa.unsupported = newUnsupportedStreamAssignment(s, sa)
sa.unsupported = newUnsupportedStreamAssignment(s, sa, fmt.Errorf("unsupported consumer %q", ca.Name))
}
sa.unsupported.setupInfoSub(s, sa)
js.mu.Unlock()
Expand Down Expand Up @@ -8008,20 +8017,21 @@ func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) {
func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error {
var unsupported bool
var cfg StreamConfig
var err error
decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&cfg); err != nil {
if err = decoder.Decode(&cfg); err != nil {
unsupported = true
cfg = StreamConfig{}
if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil {
return err
if err2 := json.Unmarshal(sa.ConfigJSON, &cfg); err2 != nil {
return err2
}
}
sa.Config = &cfg
fixCfgMirrorWithDedupWindow(sa.Config)

if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) {
sa.unsupported = newUnsupportedStreamAssignment(s, sa)
if unsupported || err != nil || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) {
sa.unsupported = newUnsupportedStreamAssignment(s, sa, err)
}
return nil
}
Expand Down Expand Up @@ -8472,18 +8482,19 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
func decodeConsumerAssignmentConfig(ca *consumerAssignment) error {
var unsupported bool
var cfg ConsumerConfig
var err error
decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&cfg); err != nil {
if err = decoder.Decode(&cfg); err != nil {
unsupported = true
cfg = ConsumerConfig{}
if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil {
return err
if err2 := json.Unmarshal(ca.ConfigJSON, &cfg); err2 != nil {
return err2
}
}
ca.Config = &cfg
if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
ca.unsupported = newUnsupportedConsumerAssignment(ca)
if unsupported || err != nil || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
ca.unsupported = newUnsupportedConsumerAssignment(ca, err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9714,7 +9714,7 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes

// Stream should also be reported as offline.
// Specifically, as "stopped" because it's still supported, but can't run due to the unsupported consumer.
expectStreamInfo("stopped", "DowngradeConsumerTest")
expectStreamInfo("stopped - unsupported consumer \"DowngradeConsumerTest\"", "DowngradeConsumerTest")
}

// Consumer should be reported as offline, but healthz should report healthy to not block downgrades.
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21951,7 +21951,7 @@ func TestJetStreamOfflineStreamAndConsumerAfterDowngrade(t *testing.T) {
mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest")
require_NoError(t, err)
require_True(t, mset.closed.Load())
require_Equal(t, mset.offlineReason, "stopped")
require_Equal(t, mset.offlineReason, "stopped - unsupported consumer \"DowngradeConsumerTest\"")

obs := mset.getPublicConsumers()
require_Len(t, len(obs), 1)
Expand Down
Loading