diff --git a/server/consumer.go b/server/consumer.go index 00aeb7a9cf3..4bd251686d3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -820,6 +820,9 @@ func checkConsumerCfg( } if config.PriorityPolicy != PriorityNone { + if config.DeliverSubject != "" { + return NewJSConsumerPushWithPriorityGroupError() + } if len(config.PriorityGroups) == 0 { return NewJSConsumerPriorityPolicyWithoutGroupError() } diff --git a/server/errors.json b/server/errors.json index b23e9579ede..4e60d490cd2 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1758,5 +1758,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerPushWithPriorityGroupErr", + "code": 400, + "error_code": 10178, + "description": "priority groups can not be used with push consumers", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index cdb74d0939f..3e3fad3e294 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -2170,27 +2170,29 @@ func TestJetStreamConsumerWithPriorityGroups(t *testing.T) { cluster.waitOnStreamLeader("$G", "TEST") for _, test := range []struct { - name string - nc *nats.Conn - stream string - consumer string - groups []string - mode PriorityPolicy - err *ApiError + name string + nc *nats.Conn + stream string + consumer string + groups []string + mode PriorityPolicy + deliverSubject string + err *ApiError }{ - {"Pinned Consumer with Priority Group", nc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, nil}, - {"Pinned Consumer with Priority Group, clustered", cnc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, nil}, - {"Overflow Consumer with Priority Group", nc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, nil}, - {"Overflow Consumer with Priority Group, clustered", cnc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, nil}, - {"Pinned Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Pinned Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Overflow Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Overflow Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Consumer with `none` policy priority", nc, "TEST", "NONE", []string{"A"}, PriorityNone, nil}, + {"Pinned Consumer with Priority Group", nc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, "", nil}, + {"Pinned Consumer with Priority Group, clustered", cnc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, "", nil}, + {"Overflow Consumer with Priority Group", nc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, "", nil}, + {"Overflow Consumer with Priority Group, clustered", cnc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, "", nil}, + {"Pinned Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Pinned Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Overflow Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Overflow Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Consumer with `none` policy priority", nc, "TEST", "NONE", []string{"A"}, PriorityNone, "", nil}, + {"Push consumer with Priority Group", nc, "TEST", "PUSH_WITH_POLICY", []string{"A"}, PriorityOverflow, "subject", &ApiError{ErrCode: uint16(JSConsumerPushWithPriorityGroupErr)}}, } { t.Run(test.name, func(t *testing.T) { @@ -2203,6 +2205,7 @@ func TestJetStreamConsumerWithPriorityGroups(t *testing.T) { PriorityGroups: test.groups, PriorityPolicy: test.mode, AckPolicy: AckExplicit, + DeliverSubject: test.deliverSubject, PinnedTTL: 10 * time.Second, }, } diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index f44eabcab22..018780f8d00 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -209,6 +209,9 @@ const ( // JSConsumerPushMaxWaitingErr consumer in push mode can not set max waiting JSConsumerPushMaxWaitingErr ErrorIdentifier = 10080 + // JSConsumerPushWithPriorityGroupErr priority groups can not be used with push consumers + JSConsumerPushWithPriorityGroupErr ErrorIdentifier = 10178 + // JSConsumerReplacementWithDifferentNameErr consumer replacement durable config not the same JSConsumerReplacementWithDifferentNameErr ErrorIdentifier = 10106 @@ -604,6 +607,7 @@ var ( JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires explicit ack policy on workqueue stream"}, JSConsumerPullWithRateLimitErr: {Code: 400, ErrCode: 10086, Description: "consumer in pull mode can not have rate limit set"}, JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"}, + JSConsumerPushWithPriorityGroupErr: {Code: 400, ErrCode: 10178, Description: "priority groups can not be used with push consumers"}, JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"}, JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"}, JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"}, @@ -1477,6 +1481,16 @@ func NewJSConsumerPushMaxWaitingError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerPushMaxWaitingErr] } +// NewJSConsumerPushWithPriorityGroupError creates a new JSConsumerPushWithPriorityGroupErr error: "priority groups can not be used with push consumers" +func NewJSConsumerPushWithPriorityGroupError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerPushWithPriorityGroupErr] +} + // NewJSConsumerReplacementWithDifferentNameError creates a new JSConsumerReplacementWithDifferentNameErr error: "consumer replacement durable config not the same" func NewJSConsumerReplacementWithDifferentNameError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts)