Skip to content

Commit

Permalink
feat(core): opt-in slow consumer notifications
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 9, 2024
1 parent c7d578b commit 423ba19
Show file tree
Hide file tree
Showing 25 changed files with 231 additions and 49 deletions.
2 changes: 1 addition & 1 deletion core/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-36",
"version": "3.0.0-37",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-36",
"version": "3.0.0-37",
"files": [
"lib/",
"LICENSE",
Expand Down
11 changes: 11 additions & 0 deletions core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export enum Events {
LDM = "ldm",
/** Client received an async error from the server */
Error = "error",
/** Slow Consumer - a buffered subscription (iterator) that is accumulating messages beyond a specify threshold */
SlowConsumer = "slow_consumer",
}

/**
Expand Down Expand Up @@ -74,6 +76,15 @@ export interface SubOpts<T> {
* @param msg
*/
callback?: MsgCallback<T>;

/**
* Number of pending messages in a subscription to exceed prior to considering
* a subscription a Slow Consumer. By default, slow consumer is on a subscription
* is not accounted for.
*
* This is an experimental option.
*/
slow?: number;
}

export interface DnsResolveFn {
Expand Down
12 changes: 11 additions & 1 deletion core/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import type {
Subscription,
SubscriptionOptions,
} from "./core.ts";
import { createInbox, RequestStrategy } from "./core.ts";
import { createInbox, Events, RequestStrategy } from "./core.ts";
import { errors, InvalidArgumentError, TimeoutError } from "./errors.ts";

export class NatsConnectionImpl implements NatsConnection {
Expand Down Expand Up @@ -138,6 +138,16 @@ export class NatsConnectionImpl implements NatsConnection {
): Subscription {
this._check(subject, true, false);
const sub = new SubscriptionImpl(this.protocol, subject, opts);

if (typeof opts.callback !== "function" && typeof opts.slow === "number") {
const subj = sub.getSubject();
sub.setSlowNotificationFn(opts.slow, (pending: number) => {
this.protocol.dispatchStatus({
type: Events.SlowConsumer,
data: `subscription (${sub.sid}) ${subj} is slow: msgs ${pending}`,
});
});
}
this.protocol.subscribe(sub);
return sub;
}
Expand Down
39 changes: 39 additions & 0 deletions core/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ export class Connect {
}
}

class SlowNotifier {
slow: number;
cb: (pending: number) => void;
notified: boolean;

constructor(slow: number, cb: (pending: number) => void) {
this.slow = slow;
this.cb = cb;
this.notified = false;
}

maybeNotify(pending: number): void {
// if we are below the threshold reset the ability to notify
if (pending <= this.slow) {
this.notified = false;
} else {
if (!this.notified) {
// crossed the threshold, notify and silence.
this.cb(pending);
this.notified = true;
}
}
}
}

export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
implements Subscription {
sid!: number;
Expand All @@ -119,6 +144,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
cleanupFn?: (sub: Subscription, info?: unknown) => void;
closed: Deferred<void>;
requestSubject?: string;
slow?: SlowNotifier;

constructor(
protocol: ProtocolHandler,
Expand Down Expand Up @@ -160,9 +186,22 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

setSlowNotificationFn(slow: number, fn?: (pending: number) => void): void {
this.slow = undefined;
if (fn) {
if (this.noIterator) {
throw new Error("callbacks don't support slow notifications");
}
this.slow = new SlowNotifier(slow, fn);
}
}

callback(err: Error | null, msg: Msg) {
this.cancelTimeout();
err ? this.stop(err) : this.push(msg);
if (!err && this.slow) {
this.slow.maybeNotify(this.getPending());
}
}

close(): void {
Expand Down
2 changes: 1 addition & 1 deletion core/src/version.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// This file is generated - do not edit
export const version = "3.0.0-36";
export const version = "3.0.0-37";
67 changes: 67 additions & 0 deletions core/tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
deferred,
delay,
Empty,
Events,
Feature,
headers,
isIP,
Expand Down Expand Up @@ -1539,6 +1540,72 @@ Deno.test("basics - stats", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - slow", async () => {
const { ns, nc } = await setup();

let slow = 0;
(async () => {
for await (const m of nc.status()) {
//@ts-ignore: test
if (m.type === Events.SlowConsumer) {
console.log(m);
slow++;
}
}
})().catch();
const sub = nc.subscribe("test", { slow: 10 });
const s = syncIterator(sub);

// we go over, should have a notification
for (let i = 0; i < 11; i++) {
nc.publish("test", "");
}

await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);
slow = 0;

// send one more, no more notifications until we drop below 10
nc.publish("test", "");
await delay(100); // 12
assertEquals(sub.getPending(), 12);
assertEquals(slow, 0);

await s.next(); // 12
await s.next(); // 11
await s.next(); // 10

nc.publish("test", ""); // 11
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 0);

// now this will notify
await s.next(); // 11
await s.next(); // 10
await delay(100);
assertEquals(sub.getPending(), 9);

await s.next(); // 9
nc.publish("test", "");
await delay(100);
assertEquals(sub.getPending(), 9);
assertEquals(slow, 0);

nc.publish("test", ""); // 10
await delay(100);
assertEquals(sub.getPending(), 10);
assertEquals(slow, 0);

nc.publish("test", ""); // 11
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);

await cleanup(ns, nc);
});

class MM implements Msg {
data!: Uint8Array;
sid: number;
Expand Down
4 changes: 2 additions & 2 deletions jetstream/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-25",
"version": "3.0.0-26",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down Expand Up @@ -33,6 +33,6 @@
"test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36"
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37"
}
}
4 changes: 2 additions & 2 deletions jetstream/import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"imports": {
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-37/internal",
"test_helpers": "../test_helpers/mod.ts",
"@std/io": "jsr:@std/[email protected]"
}
Expand Down
4 changes: 2 additions & 2 deletions jetstream/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-25",
"version": "3.0.0-26",
"files": [
"lib/",
"LICENSE",
Expand Down Expand Up @@ -34,7 +34,7 @@
},
"description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients",
"dependencies": {
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-37"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
4 changes: 2 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@ export type ConsumerUpdateConfig = PriorityGroups & {
*/
"inactive_threshold"?: Nanos;
/**
* List of durations in nanoseconds format that represents a retry timescale for
* NaK'd messages or those being normally retried
* List of durations in nanoseconds that represents a retry timescale for
* the redelivery of messages
*/
"backoff"?: Nanos[];
/**
Expand Down
59 changes: 57 additions & 2 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from "@nats-io/nats-core";
import {
assert,
assertAlmostEquals,
assertEquals,
assertExists,
assertInstanceOf,
Expand Down Expand Up @@ -707,7 +708,8 @@ Deno.test("jetstream - backoff", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const backoff = [nanos(250), nanos(1000), nanos(3000)];
const ms = [250, 1000, 3000];
const backoff = ms.map((n) => nanos(n));
const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
Expand All @@ -723,10 +725,11 @@ Deno.test("jetstream - backoff", async () => {
const js = jetstream(nc);
await js.publish(subj);

const when: number[] = [];
const c = await js.consumers.get(stream, "me");
const iter = await c.consume({
callback: (m) => {
console.log(m.info.redeliveryCount);
when.push(Date.now());
if (m.info.redeliveryCount === 4) {
iter.stop();
}
Expand All @@ -735,6 +738,58 @@ Deno.test("jetstream - backoff", async () => {

await iter.closed();

const offset = when.map((n, idx) => {
const p = idx > 0 ? idx - 1 : 0;
return n - when[p];
});

offset.slice(1).forEach((n, idx) => {
assertAlmostEquals(n, ms[idx], 20);
});

await cleanup(ns, nc);
});

Deno.test("jetstream - redelivery", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.7.2")) {
return;
}

const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_deliver: 4,
ack_wait: nanos(1000),
});

assertEquals(ci.config.max_deliver, 4);

const js = jetstream(nc);
await js.publish(subj);

const c = await js.consumers.get(stream, "me");

let redeliveries = 0;
const iter = await c.consume({
callback: (m) => {
if (m.redelivered) {
redeliveries++;
}
if (m.info.redeliveryCount === 4) {
setTimeout(() => {
iter.stop();
}, 2000);
}
},
});

await iter.closed();
assertEquals(redeliveries, 3);

await cleanup(ns, nc);
});

Expand Down
6 changes: 3 additions & 3 deletions kv/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/kv",
"version": "3.0.0-19",
"version": "3.0.0-20",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down Expand Up @@ -33,7 +33,7 @@
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25"
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-26"
}
}
8 changes: 4 additions & 4 deletions kv/import_map.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-25/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-37/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-26",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-26/internal",
"test_helpers": "../test_helpers/mod.ts",
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
Expand Down
Loading

0 comments on commit 423ba19

Please sign in to comment.