diff --git a/server/jetstream.go b/server/jetstream.go index 1565243a12d..e570f697750 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1348,8 +1348,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro var offlineReason string if !supported { apiLevel := getRequiredApiLevel(cfg.Metadata) - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported stream '%s > %s': %s", a.Name, cfg.StreamConfig.Name, offlineReason) } else { offlineReason = fmt.Sprintf("decoding error: %v", strictErr) s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr) @@ -1588,8 +1592,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro var offlineReason string if !supported { apiLevel := getRequiredApiLevel(cfg.Metadata) - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason) } else { offlineReason = fmt.Sprintf("decoding error: %v", strictErr) s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) @@ -1599,7 +1607,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro if !e.mset.closed.Load() { s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) e.mset.mu.Lock() - e.mset.offlineReason = "stopped" + e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) e.mset.mu.Unlock() e.mset.stop(false, false) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 00ba6c5fc03..1cef7348847 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -162,9 +162,15 @@ type unsupportedStreamAssignment struct { infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error) *unsupportedStreamAssignment { reason := "stopped" - if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { + if err != nil { + if errstr := err.Error(); strings.HasPrefix(errstr, "json:") { + reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + reason = fmt.Sprintf("stopped - %s", errstr) + } + } else if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel) } @@ -240,9 +246,15 @@ type unsupportedConsumerAssignment struct { infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment { +func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupportedConsumerAssignment { reason := "stopped" - if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { + if err != nil { + if errstr := err.Error(); strings.HasPrefix(errstr, "json:") { + reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + reason = fmt.Sprintf("stopped - %s", errstr) + } + } else if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel) } @@ -4003,8 +4015,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) { // If unsupported, we can't register any further. if sa.unsupported != nil { sa.unsupported.setupInfoSub(s, sa) - apiLevel := getRequiredApiLevel(sa.Config.Metadata) - s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason) js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. @@ -4133,8 +4144,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { // If unsupported, we can't register any further. if sa.unsupported != nil { sa.unsupported.setupInfoSub(s, sa) - apiLevel := getRequiredApiLevel(sa.Config.Metadata) - s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason) js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. @@ -4815,12 +4825,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // If unsupported, we can't register any further. if ca.unsupported != nil { ca.unsupported.setupInfoSub(s, ca) - apiLevel := getRequiredApiLevel(ca.Config.Metadata) - s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel) + s.Warnf("Detected unsupported consumer '%s > %s > %s': %s", accName, stream, ca.Name, ca.unsupported.reason) // Mark stream as unsupported as well if sa.unsupported == nil { - sa.unsupported = newUnsupportedStreamAssignment(s, sa) + sa.unsupported = newUnsupportedStreamAssignment(s, sa, fmt.Errorf("unsupported consumer %q", ca.Name)) } sa.unsupported.setupInfoSub(s, sa) js.mu.Unlock() @@ -8008,20 +8017,21 @@ func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error { var unsupported bool var cfg StreamConfig + var err error decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&cfg); err != nil { + if err = decoder.Decode(&cfg); err != nil { unsupported = true cfg = StreamConfig{} - if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil { - return err + if err2 := json.Unmarshal(sa.ConfigJSON, &cfg); err2 != nil { + return err2 } } sa.Config = &cfg fixCfgMirrorWithDedupWindow(sa.Config) - if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { - sa.unsupported = newUnsupportedStreamAssignment(s, sa) + if unsupported || err != nil || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { + sa.unsupported = newUnsupportedStreamAssignment(s, sa, err) } return nil } @@ -8472,18 +8482,19 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { func decodeConsumerAssignmentConfig(ca *consumerAssignment) error { var unsupported bool var cfg ConsumerConfig + var err error decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&cfg); err != nil { + if err = decoder.Decode(&cfg); err != nil { unsupported = true cfg = ConsumerConfig{} - if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil { - return err + if err2 := json.Unmarshal(ca.ConfigJSON, &cfg); err2 != nil { + return err2 } } ca.Config = &cfg - if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - ca.unsupported = newUnsupportedConsumerAssignment(ca) + if unsupported || err != nil || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { + ca.unsupported = newUnsupportedConsumerAssignment(ca, err) } return nil } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 6912ef791d8..6d1ca28e4b3 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9714,7 +9714,7 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes // Stream should also be reported as offline. // Specifically, as "stopped" because it's still supported, but can't run due to the unsupported consumer. - expectStreamInfo("stopped", "DowngradeConsumerTest") + expectStreamInfo("stopped - unsupported consumer \"DowngradeConsumerTest\"", "DowngradeConsumerTest") } // Consumer should be reported as offline, but healthz should report healthy to not block downgrades. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index cf1ba2211cb..6762f4fcb86 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21951,7 +21951,7 @@ func TestJetStreamOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest") require_NoError(t, err) require_True(t, mset.closed.Load()) - require_Equal(t, mset.offlineReason, "stopped") + require_Equal(t, mset.offlineReason, "stopped - unsupported consumer \"DowngradeConsumerTest\"") obs := mset.getPublicConsumers() require_Len(t, len(obs), 1)