diff --git a/server/jetstream_api.go b/server/jetstream_api.go index ddfe9e47165..a653b5036ab 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1224,11 +1224,6 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1247,6 +1242,12 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if !doErr { return @@ -1319,11 +1320,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1342,6 +1338,12 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1436,11 +1438,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1459,6 +1456,12 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1539,11 +1542,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1562,6 +1560,12 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1674,11 +1678,6 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s ApiResponse: ApiResponse{Type: JSApiStreamListResponseType}, Streams: []*StreamInfo{}, } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1697,6 +1696,12 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1797,11 +1802,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse { resp.ApiResponse.Type = JSApiStreamCreateResponseType } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } var clusterWideConsCount int @@ -1891,6 +1891,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -2016,11 +2022,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * name := tokenAt(subject, 6) var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are not in clustered mode this is a failed request. if !s.JetStreamIsClustered() { @@ -2052,6 +2053,12 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * return } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -2128,11 +2135,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ } var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are not in clustered mode this is a failed request. if !s.JetStreamIsClustered() { @@ -2167,6 +2169,13 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ } else if sa == nil { return } + + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + var ca *consumerAssignment if sa.consumers != nil { ca = sa.consumers[consumer] @@ -2254,11 +2263,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco name := tokenAt(subject, 6) var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are not in clustered mode this is a failed request. if !s.JetStreamIsClustered() { @@ -2287,6 +2291,12 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco return } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -2779,11 +2789,6 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac accName := tokenAt(subject, 5) var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if !s.JetStreamIsClustered() { var streams []*stream @@ -2824,6 +2829,12 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac return } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if js.isMetaRecovering() { // While in recovery mode, the data structures are not fully initialized resp.Error = NewJSClusterNotAvailError() @@ -3046,11 +3057,6 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -3069,6 +3075,12 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3121,11 +3133,6 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su stream := tokenAt(subject, 6) var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { @@ -3174,6 +3181,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3245,11 +3258,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje stream := tokenAt(subject, 6) var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { @@ -3298,6 +3306,12 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3402,31 +3416,8 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account stream := streamNameFromSubject(subject) consumer := consumerNameFromSubject(subject) - var req JSApiConsumerUnpinRequest var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if err := json.Unmarshal(msg, &req); err != nil { - resp.Error = NewJSInvalidJSONError(err) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - - if req.Group == _EMPTY_ { - resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified")) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - - if !validGroupName.MatchString(req.Group) { - resp.Error = NewJSConsumerInvalidGroupNameError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if s.JetStreamIsClustered() { // Check to make sure the stream is assigned. js, cc := s.getJetStreamCluster() @@ -3478,6 +3469,31 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + var req JSApiConsumerUnpinRequest + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError(err) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if req.Group == _EMPTY_ { + resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified")) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if !validGroupName.MatchString(req.Group) { + resp.Error = NewJSConsumerInvalidGroupNameError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3541,11 +3557,6 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, stream := streamNameFromSubject(subject) var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { @@ -3597,6 +3608,12 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -4214,11 +4231,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } var req CreateConsumerRequest if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil { @@ -4255,6 +4267,20 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { + resp.Error = NewJSNotEnabledForAccountError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + return + } + var streamName, consumerName, filteredSubject string var rt ccReqType @@ -4287,14 +4313,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } } - if hasJS, doErr := acc.checkJetStream(); !hasJS { - if doErr { - resp.Error = NewJSNotEnabledForAccountError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - } - return - } - if streamName != req.Stream { resp.Error = NewJSStreamMismatchError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -4439,11 +4457,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType}, Consumers: []string{}, } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -4462,6 +4475,12 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -4566,11 +4585,6 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType}, Consumers: []*ConsumerInfo{}, } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -4590,7 +4604,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, } if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSClusterNotAvailError() + resp.Error = NewJSRequiredApiLevelError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -4680,11 +4694,6 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, consumerName := consumerNameFromSubject(subject) var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if !isEmptyRequest(msg) { resp.Error = NewJSNotEmptyRequestError() @@ -4835,6 +4844,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if !acc.JetStreamEnabled() { resp.Error = NewJSNotEnabledForAccountError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -4882,11 +4897,6 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun } var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -4905,6 +4915,12 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -4960,11 +4976,6 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account var req JSApiConsumerPauseRequest var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if isJSONObjectOrArray(msg) { if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil { @@ -4992,6 +5003,12 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index c98a1cc6ec3..49c0dd75f64 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -23,6 +23,7 @@ import ( "math" "reflect" "strconv" + "strings" "testing" "time" @@ -658,7 +659,12 @@ func TestJetStreamApiErrorOnRequiredApiLevel(t *testing.T) { var resp ApiResponse require_NoError(t, json.Unmarshal(msg.Data, &resp)) require_True(t, resp.Error != nil) - require_Error(t, resp.Error, NewJSRequiredApiLevelError()) + // Peer remove or stepdown is not supported if not clustered. + if strings.Contains(apiSubject, ".STEPDOWN.") || strings.Contains(apiSubject, ".PEER.") { + require_Error(t, resp.Error, NewJSClusterRequiredError()) + } else { + require_Error(t, resp.Error, NewJSRequiredApiLevelError()) + } }) } }