Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ func checkConsumerCfg(
}

if config.PriorityPolicy != PriorityNone {
if config.DeliverSubject != "" {
return NewJSConsumerPushWithPriorityGroupError()
}
if len(config.PriorityGroups) == 0 {
return NewJSConsumerPriorityPolicyWithoutGroupError()
}
Expand Down
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1758,5 +1758,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerPushWithPriorityGroupErr",
"code": 400,
"error_code": 10178,
"description": "priority groups can not be used with push consumers",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
43 changes: 23 additions & 20 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2170,27 +2170,29 @@ func TestJetStreamConsumerWithPriorityGroups(t *testing.T) {
cluster.waitOnStreamLeader("$G", "TEST")

for _, test := range []struct {
name string
nc *nats.Conn
stream string
consumer string
groups []string
mode PriorityPolicy
err *ApiError
name string
nc *nats.Conn
stream string
consumer string
groups []string
mode PriorityPolicy
deliverSubject string
err *ApiError
}{
{"Pinned Consumer with Priority Group", nc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, nil},
{"Pinned Consumer with Priority Group, clustered", cnc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, nil},
{"Overflow Consumer with Priority Group", nc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, nil},
{"Overflow Consumer with Priority Group, clustered", cnc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, nil},
{"Pinned Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Pinned Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Overflow Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Overflow Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Consumer with `none` policy priority", nc, "TEST", "NONE", []string{"A"}, PriorityNone, nil},
{"Pinned Consumer with Priority Group", nc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, "", nil},
{"Pinned Consumer with Priority Group, clustered", cnc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, "", nil},
{"Overflow Consumer with Priority Group", nc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, "", nil},
{"Overflow Consumer with Priority Group, clustered", cnc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, "", nil},
{"Pinned Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Pinned Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Overflow Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Overflow Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}},
{"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}},
{"Consumer with `none` policy priority", nc, "TEST", "NONE", []string{"A"}, PriorityNone, "", nil},
{"Push consumer with Priority Group", nc, "TEST", "PUSH_WITH_POLICY", []string{"A"}, PriorityOverflow, "subject", &ApiError{ErrCode: uint16(JSConsumerPushWithPriorityGroupErr)}},
} {
t.Run(test.name, func(t *testing.T) {

Expand All @@ -2203,6 +2205,7 @@ func TestJetStreamConsumerWithPriorityGroups(t *testing.T) {
PriorityGroups: test.groups,
PriorityPolicy: test.mode,
AckPolicy: AckExplicit,
DeliverSubject: test.deliverSubject,
PinnedTTL: 10 * time.Second,
},
}
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ const (
// JSConsumerPushMaxWaitingErr consumer in push mode can not set max waiting
JSConsumerPushMaxWaitingErr ErrorIdentifier = 10080

// JSConsumerPushWithPriorityGroupErr priority groups can not be used with push consumers
JSConsumerPushWithPriorityGroupErr ErrorIdentifier = 10178

// JSConsumerReplacementWithDifferentNameErr consumer replacement durable config not the same
JSConsumerReplacementWithDifferentNameErr ErrorIdentifier = 10106

Expand Down Expand Up @@ -604,6 +607,7 @@ var (
JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires explicit ack policy on workqueue stream"},
JSConsumerPullWithRateLimitErr: {Code: 400, ErrCode: 10086, Description: "consumer in pull mode can not have rate limit set"},
JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"},
JSConsumerPushWithPriorityGroupErr: {Code: 400, ErrCode: 10178, Description: "priority groups can not be used with push consumers"},
JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"},
JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"},
JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"},
Expand Down Expand Up @@ -1477,6 +1481,16 @@ func NewJSConsumerPushMaxWaitingError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerPushMaxWaitingErr]
}

// NewJSConsumerPushWithPriorityGroupError creates a new JSConsumerPushWithPriorityGroupErr error: "priority groups can not be used with push consumers"
func NewJSConsumerPushWithPriorityGroupError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSConsumerPushWithPriorityGroupErr]
}

// NewJSConsumerReplacementWithDifferentNameError creates a new JSConsumerReplacementWithDifferentNameErr error: "consumer replacement durable config not the same"
func NewJSConsumerReplacementWithDifferentNameError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
Loading