Skip to content

Commit b597a36

Browse files
committed
feat(jetstream): priority_groups configuration on consumers and overflow support for pull consumer
https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-42.md Signed-off-by: Alberto Ricart <[email protected]>
1 parent 5ab062e commit b597a36

14 files changed

+687
-48
lines changed

Diff for: core/src/semver.ts

+2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export enum Feature {
5353
JS_STREAM_COMPRESSION = "js_stream_compression",
5454
JS_DEFAULT_CONSUMER_LIMITS = "js_default_consumer_limits",
5555
JS_BATCH_DIRECT_GET = "js_batch_direct_get",
56+
JS_PRIORITY_GROUPS = "js_priority_groups",
5657
}
5758

5859
type FeatureVersion = {
@@ -111,6 +112,7 @@ export class Features {
111112
this.set(Feature.JS_STREAM_COMPRESSION, "2.10.0");
112113
this.set(Feature.JS_DEFAULT_CONSUMER_LIMITS, "2.10.0");
113114
this.set(Feature.JS_BATCH_DIRECT_GET, "2.11.0");
115+
this.set(Feature.JS_PRIORITY_GROUPS, "2.12.0");
114116

115117
this.disabled.forEach((f) => {
116118
this.features.delete(f);

Diff for: jetstream/src/consumer.ts

+109-9
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
delay,
3030
errors,
3131
Events,
32+
Feature,
3233
IdleHeartbeatMonitor,
3334
nanos,
3435
nuid,
@@ -43,6 +44,7 @@ import { toJsMsg } from "./jsmsg.ts";
4344
import type {
4445
ConsumerConfig,
4546
ConsumerInfo,
47+
OverflowMinPendingAndMinAck,
4648
PullOptions,
4749
} from "./jsapi_types.ts";
4850
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
@@ -54,14 +56,21 @@ import type {
5456
ConsumerCallbackFn,
5557
ConsumerMessages,
5658
ConsumerStatus,
59+
Expires,
5760
FetchMessages,
5861
FetchOptions,
62+
IdleHeartbeat,
63+
MaxBytes,
64+
MaxMessages,
5965
NextOptions,
6066
OrderedConsumerOptions,
6167
PullConsumerOptions,
68+
ThresholdBytes,
69+
ThresholdMessages,
6270
} from "./types.ts";
6371
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
6472
import { JetStreamStatus } from "./jserrors.ts";
73+
import { minValidation } from "./jsutil.ts";
6574

6675
enum PullConsumerType {
6776
Unset = -1,
@@ -85,10 +94,28 @@ export type PullConsumerInternalOptions = {
8594
ordered?: OrderedConsumerOptions;
8695
};
8796

97+
type InternalPullOptions =
98+
& MaxMessages
99+
& MaxBytes
100+
& Expires
101+
& IdleHeartbeat
102+
& ThresholdMessages
103+
& OverflowMinPendingAndMinAck
104+
& ThresholdBytes;
105+
106+
export function isOverflowOptions(
107+
opts: unknown,
108+
): opts is OverflowMinPendingAndMinAck {
109+
const oo = opts as OverflowMinPendingAndMinAck;
110+
return oo && typeof oo.group === "string" ||
111+
typeof oo.min_pending === "number" ||
112+
typeof oo.min_ack_pending === "number";
113+
}
114+
88115
export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
89116
implements ConsumerMessages {
90117
consumer: PullConsumerImpl;
91-
opts: Record<string, number>;
118+
opts: InternalPullOptions;
92119
sub!: Subscription;
93120
monitor: IdleHeartbeatMonitor | null;
94121
pending: { msgs: number; bytes: number; requests: number };
@@ -117,6 +144,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
117144
this.inbox = `${this.inboxPrefix}.${this.consumer.serial}`;
118145

119146
if (this.consumer.ordered) {
147+
if (isOverflowOptions(opts)) {
148+
throw errors.InvalidArgumentError.format([
149+
"group",
150+
"min_pending",
151+
"min_ack_pending",
152+
], "cannot be specified for ordered consumers");
153+
}
120154
if (this.consumer.orderedConsumerState === undefined) {
121155
// initialize the state for the order consumer
122156
const ocs = {} as OrderedConsumerState;
@@ -564,9 +598,21 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
564598
pullOptions(): Partial<PullOptions> {
565599
const batch = this.opts.max_messages - this.pending.msgs;
566600
const max_bytes = this.opts.max_bytes - this.pending.bytes;
567-
const idle_heartbeat = nanos(this.opts.idle_heartbeat);
568-
const expires = nanos(this.opts.expires);
569-
return { batch, max_bytes, idle_heartbeat, expires };
601+
const idle_heartbeat = nanos(this.opts.idle_heartbeat!);
602+
const expires = nanos(this.opts.expires!);
603+
604+
const opts = { batch, max_bytes, idle_heartbeat, expires } as PullOptions;
605+
606+
if (isOverflowOptions(this.opts)) {
607+
opts.group = this.opts.group;
608+
if (this.opts.min_pending) {
609+
opts.min_pending = this.opts.min_pending;
610+
}
611+
if (this.opts.min_ack_pending) {
612+
opts.min_ack_pending = this.opts.min_ack_pending;
613+
}
614+
}
615+
return opts;
570616
}
571617

572618
trackTimeout(t: Timeout<unknown>) {
@@ -608,14 +654,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
608654
parseOptions(
609655
opts: PullConsumerOptions,
610656
refilling = false,
611-
): Record<string, number> {
612-
const args = (opts || {}) as Record<string, number>;
657+
): InternalPullOptions {
658+
const args = (opts || {}) as InternalPullOptions;
613659
args.max_messages = args.max_messages || 0;
614660
args.max_bytes = args.max_bytes || 0;
615661

616662
if (args.max_messages !== 0 && args.max_bytes !== 0) {
617-
throw new Error(
618-
`only specify one of max_messages or max_bytes`,
663+
throw errors.InvalidArgumentError.format(
664+
["max_messages", "max_bytes"],
665+
"are mutually exclusive",
619666
);
620667
}
621668

@@ -654,6 +701,25 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
654701
args.threshold_bytes = args.threshold_bytes || minBytes;
655702
}
656703

704+
if (isOverflowOptions(opts)) {
705+
const { min, ok } = this.consumer.api.nc.features.get(
706+
Feature.JS_PRIORITY_GROUPS,
707+
);
708+
if (!ok) {
709+
throw new Error(`priority_groups require server ${min}`);
710+
}
711+
validateOverflowPullOptions(opts);
712+
if (opts.group) {
713+
args.group = opts.group;
714+
}
715+
if (opts.min_ack_pending) {
716+
args.min_ack_pending = opts.min_ack_pending;
717+
}
718+
if (opts.min_pending) {
719+
args.min_pending = opts.min_pending;
720+
}
721+
}
722+
657723
return args;
658724
}
659725

@@ -784,7 +850,7 @@ export class PullConsumerImpl implements Consumer {
784850
this.messages = m;
785851
}
786852
// FIXME: need some way to pad this correctly
787-
const to = Math.round(m.opts.expires * 1.05);
853+
const to = Math.round(m.opts.expires! * 1.05);
788854
const timer = timeout(to);
789855
m.closed().catch((err) => {
790856
console.log(err);
@@ -857,3 +923,37 @@ export class PullConsumerImpl implements Consumer {
857923
return this._info;
858924
}
859925
}
926+
927+
export function validateOverflowPullOptions(opts: unknown) {
928+
if (isOverflowOptions(opts)) {
929+
minValidation("group", opts.group);
930+
if (opts.group.length > 16) {
931+
throw errors.InvalidArgumentError.format(
932+
"group",
933+
"must be 16 characters or less",
934+
);
935+
}
936+
937+
const { min_pending, min_ack_pending } = opts;
938+
if (!min_pending && !min_ack_pending) {
939+
throw errors.InvalidArgumentError.format(
940+
["min_pending", "min_ack_pending"],
941+
"at least one must be specified",
942+
);
943+
}
944+
945+
if (min_pending && typeof min_pending !== "number") {
946+
throw errors.InvalidArgumentError.format(
947+
["min_pending"],
948+
"must be a number",
949+
);
950+
}
951+
952+
if (min_ack_pending && typeof min_ack_pending !== "number") {
953+
throw errors.InvalidArgumentError.format(
954+
["min_ack_pending"],
955+
"must be a number",
956+
);
957+
}
958+
}
959+
}

Diff for: jetstream/src/jsapi_types.ts

+61-5
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,12 @@ export interface JetStreamApiStats {
895895
export interface AccountInfoResponse
896896
extends ApiResponse, JetStreamAccountStats {}
897897

898-
export interface ConsumerConfig extends ConsumerUpdateConfig {
898+
export type PriorityGroups = {
899+
priority_groups?: string[];
900+
priority_policy?: PriorityPolicy;
901+
};
902+
903+
export type ConsumerConfig = ConsumerUpdateConfig & {
899904
/**
900905
* The type of acknowledgment required by the Consumer
901906
*/
@@ -952,9 +957,9 @@ export interface ConsumerConfig extends ConsumerUpdateConfig {
952957
* Specified as an ISO date time string (Date#toISOString()).
953958
*/
954959
"pause_until"?: string;
955-
}
960+
};
956961

957-
export interface ConsumerUpdateConfig {
962+
export type ConsumerUpdateConfig = PriorityGroups & {
958963
/**
959964
* A short description of the purpose of this consume
960965
*/
@@ -1037,6 +1042,11 @@ export interface ConsumerUpdateConfig {
10371042
* 2.10.x and better.
10381043
*/
10391044
metadata?: Record<string, string>;
1045+
};
1046+
1047+
export enum PriorityPolicy {
1048+
None = "none",
1049+
Overflow = "overflow",
10401050
}
10411051

10421052
export function defaultConsumer(
@@ -1052,12 +1062,54 @@ export function defaultConsumer(
10521062
}, opts);
10531063
}
10541064

1065+
export type OverflowMinPending = {
1066+
/**
1067+
* The name of the priority_group
1068+
*/
1069+
group: string;
1070+
/**
1071+
* Only deliver messages when num_pending for the consumer is greater than this value
1072+
*/
1073+
min_pending: number;
1074+
};
1075+
1076+
export type OverflowMinAckPending = {
1077+
/**
1078+
* The name of the priority_group
1079+
*/
1080+
group: string;
1081+
/**
1082+
* Only deliver messages when num_ack_pending for the consumer is greater than this value
1083+
*/
1084+
min_ack_pending: number;
1085+
};
1086+
1087+
export type OverflowMinPendingAndMinAck = {
1088+
/**
1089+
* The name of the priority_group
1090+
*/
1091+
group: string;
1092+
/**
1093+
* Only deliver messages when num_pending for the consumer is greater than this value
1094+
*/
1095+
min_pending: number;
1096+
/**
1097+
* Only deliver messages when num_ack_pending for the consumer is greater than this value
1098+
*/
1099+
min_ack_pending: number;
1100+
};
1101+
1102+
export type OverflowOptions =
1103+
| OverflowMinPending
1104+
| OverflowMinAckPending
1105+
| OverflowMinPendingAndMinAck;
1106+
10551107
/**
10561108
* Options for a JetStream pull subscription which define how long
10571109
* the pull request will remain open and limits the amount of data
10581110
* that the server could return.
10591111
*/
1060-
export interface PullOptions {
1112+
export type PullOptions = Partial<OverflowMinPendingAndMinAck> & {
10611113
/**
10621114
* Max number of messages to retrieve in a pull.
10631115
*/
@@ -1076,8 +1128,12 @@ export interface PullOptions {
10761128
* number of messages in the batch to fit within this setting.
10771129
*/
10781130
"max_bytes": number;
1131+
1132+
/**
1133+
* Number of nanos between messages for the server to emit an idle_heartbeat
1134+
*/
10791135
"idle_heartbeat": number;
1080-
}
1136+
};
10811137

10821138
export interface DeliveryInfo {
10831139
/**

Diff for: jetstream/src/jsclient.ts

+21-10
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515

1616
import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
1717
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
18-
import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal";
18+
import {
19+
backoff,
20+
delay,
21+
Empty,
22+
QueuedIteratorImpl,
23+
} from "@nats-io/nats-core/internal";
1924

2025
import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts";
2126

@@ -34,7 +39,7 @@ import type {
3439
StreamAPI,
3540
Streams,
3641
} from "./types.ts";
37-
import { errors, headers } from "@nats-io/nats-core/internal";
42+
import { errors, headers, RequestError } from "@nats-io/nats-core/internal";
3843

3944
import type {
4045
Msg,
@@ -49,7 +54,7 @@ import type {
4954
JetStreamAccountStats,
5055
} from "./jsapi_types.ts";
5156
import { DirectStreamAPIImpl } from "./jsm.ts";
52-
import { JetStreamError } from "./jserrors.ts";
57+
import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts";
5358

5459
export function toJetStreamClient(
5560
nc: NatsConnection | JetStreamClient,
@@ -205,29 +210,35 @@ export class JetStreamClientImpl extends BaseApiClientImpl
205210
ro.headers = mh;
206211
}
207212

208-
let { retries, retry_delay } = opts as {
213+
let { retries } = opts as {
209214
retries: number;
210-
retry_delay: number;
211215
};
212216
retries = retries || 1;
213-
retry_delay = retry_delay || 250;
217+
const bo = backoff();
214218

215-
let r: Msg;
219+
let r: Msg | null = null;
216220
for (let i = 0; i < retries; i++) {
217221
try {
218222
r = await this.nc.request(subj, data, ro);
219223
// if here we succeeded
220224
break;
221225
} catch (err) {
226+
const re = err instanceof RequestError ? err as RequestError : null;
222227
if (
223-
err instanceof errors.RequestError && err.isNoResponders()
228+
err instanceof errors.TimeoutError ||
229+
re?.isNoResponders() && i + 1 < retries
224230
) {
225-
await delay(retry_delay);
231+
await delay(bo.backoff(i));
226232
} else {
227-
throw err;
233+
throw re?.isNoResponders()
234+
? new JetStreamNotEnabled(`jetstream is not enabled`, {
235+
cause: err,
236+
})
237+
: err;
228238
}
229239
}
230240
}
241+
231242
const pa = this.parseJsResponse(r!) as PubAck;
232243
if (pa.stream === "") {
233244
throw new JetStreamError("invalid ack response");

0 commit comments

Comments
 (0)