Skip to content

Commit

Permalink
feat(core): added ability for subscriptions to have a slow option t…
Browse files Browse the repository at this point in the history
…hat when specified will check the depth of the iterator and report when the subscription is running slow.

fix #125

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 9, 2024
1 parent 68f2ea5 commit 68bb674
Show file tree
Hide file tree
Showing 21 changed files with 152 additions and 25 deletions.
2 changes: 1 addition & 1 deletion core/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-36",
"version": "3.0.0-37",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-36",
"version": "3.0.0-37",
"files": [
"lib/",
"LICENSE",
Expand Down
11 changes: 11 additions & 0 deletions core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export enum Events {
LDM = "ldm",
/** Client received an async error from the server */
Error = "error",
/** Slow Consumer - a buffered subscription (iterator) that is accumulating messages beyond a specify threshold */
SlowConsumer = "slow_consumer",
}

/**
Expand Down Expand Up @@ -74,6 +76,15 @@ export interface SubOpts<T> {
* @param msg
*/
callback?: MsgCallback<T>;

/**
* Number of pending messages in a subscription to exceed prior to considering
* a subscription a Slow Consumer. By default, slow consumer is on a subscription
* is not accounted for.
*
* This is an experimental option.
*/
slow?: number;
}

export interface DnsResolveFn {
Expand Down
12 changes: 11 additions & 1 deletion core/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import type {
Subscription,
SubscriptionOptions,
} from "./core.ts";
import { createInbox, RequestStrategy } from "./core.ts";
import { createInbox, Events, RequestStrategy } from "./core.ts";
import { errors, InvalidArgumentError, TimeoutError } from "./errors.ts";

export class NatsConnectionImpl implements NatsConnection {
Expand Down Expand Up @@ -138,6 +138,16 @@ export class NatsConnectionImpl implements NatsConnection {
): Subscription {
this._check(subject, true, false);
const sub = new SubscriptionImpl(this.protocol, subject, opts);

if (typeof opts.callback !== "function" && typeof opts.slow === "number") {
const subj = sub.getSubject();
sub.setSlowNotificationFn(opts.slow, (pending: number) => {
this.protocol.dispatchStatus({
type: Events.SlowConsumer,
data: `subscription (${sub.sid}) ${subj} is slow: msgs ${pending}`,
});
});
}
this.protocol.subscribe(sub);
return sub;
}
Expand Down
39 changes: 39 additions & 0 deletions core/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ export class Connect {
}
}

class SlowNotifier {
slow: number;
cb: (pending: number) => void;
notified: boolean;

constructor(slow: number, cb: (pending: number) => void) {
this.slow = slow;
this.cb = cb;
this.notified = false;
}

maybeNotify(pending: number): void {
// if we are below the threshold reset the ability to notify
if (pending <= this.slow) {
this.notified = false;
} else {
if (!this.notified) {
// crossed the threshold, notify and silence.
this.cb(pending);
this.notified = true;
}
}
}
}

export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
implements Subscription {
sid!: number;
Expand All @@ -119,6 +144,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
cleanupFn?: (sub: Subscription, info?: unknown) => void;
closed: Deferred<void>;
requestSubject?: string;
slow?: SlowNotifier;

constructor(
protocol: ProtocolHandler,
Expand Down Expand Up @@ -160,9 +186,22 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

setSlowNotificationFn(slow: number, fn?: (pending: number) => void): void {
this.slow = undefined;
if (fn) {
if (this.noIterator) {
throw new Error("callbacks don't support slow notifications");
}
this.slow = new SlowNotifier(slow, fn);
}
}

callback(err: Error | null, msg: Msg) {
this.cancelTimeout();
err ? this.stop(err) : this.push(msg);
if (!err && this.slow) {
this.slow.maybeNotify(this.getPending());
}
}

close(): void {
Expand Down
2 changes: 1 addition & 1 deletion core/src/version.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// This file is generated - do not edit
export const version = "3.0.0-36";
export const version = "3.0.0-37";
67 changes: 67 additions & 0 deletions core/tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
deferred,
delay,
Empty,
Events,
Feature,
headers,
isIP,
Expand Down Expand Up @@ -1539,6 +1540,72 @@ Deno.test("basics - stats", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - slow", async () => {
const { ns, nc } = await setup();

let slow = 0;
(async () => {
for await (const m of nc.status()) {
//@ts-ignore: test
if (m.type === Events.SlowConsumer) {
console.log(m);
slow++;
}
}
})().catch();
const sub = nc.subscribe("test", { slow: 10 });
const s = syncIterator(sub);

// we go over, should have a notification
for (let i = 0; i < 11; i++) {
nc.publish("test", "");
}

await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);
slow = 0;

// send one more, no more notifications until we drop below 10
nc.publish("test", "");
await delay(100); // 12
assertEquals(sub.getPending(), 12);
assertEquals(slow, 0);

await s.next(); // 12
await s.next(); // 11
await s.next(); // 10

nc.publish("test", ""); // 11
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 0);

// now this will notify
await s.next(); // 11
await s.next(); // 10
await delay(100);
assertEquals(sub.getPending(), 9);

await s.next(); // 9
nc.publish("test", "");
await delay(100);
assertEquals(sub.getPending(), 9);
assertEquals(slow, 0);

nc.publish("test", ""); // 10
await delay(100);
assertEquals(sub.getPending(), 10);
assertEquals(slow, 0);

nc.publish("test", ""); // 11
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);

await cleanup(ns, nc);
});

class MM implements Msg {
data!: Uint8Array;
sid: number;
Expand Down
2 changes: 1 addition & 1 deletion jetstream/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
"test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36"
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37"
}
}
4 changes: 2 additions & 2 deletions jetstream/import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"imports": {
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-37/internal",
"test_helpers": "../test_helpers/mod.ts",
"@std/io": "jsr:@std/[email protected]"
}
Expand Down
2 changes: 1 addition & 1 deletion jetstream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients",
"dependencies": {
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-37"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
2 changes: 1 addition & 1 deletion kv/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25"
}
}
4 changes: 2 additions & 2 deletions kv/import_map.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-37/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-25/internal",
"test_helpers": "../test_helpers/mod.ts",
Expand Down
2 changes: 1 addition & 1 deletion kv/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"description": "kv library - this library implements all the base functionality for NATS KV javascript clients",
"dependencies": {
"@nats-io/jetstream": "3.0.0-25",
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-37"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
2 changes: 1 addition & 1 deletion obj/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25"
}
}
4 changes: 2 additions & 2 deletions obj/import_map.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-37/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-25/internal",
"test_helpers": "../test_helpers/mod.ts",
Expand Down
2 changes: 1 addition & 1 deletion obj/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients",
"dependencies": {
"@nats-io/jetstream": "3.0.0-25",
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-37"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
2 changes: 1 addition & 1 deletion services/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36"
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37"
}
}
4 changes: 2 additions & 2 deletions services/import_map.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-37/internal",
"test_helpers": "../test_helpers/mod.ts",
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
Expand Down
2 changes: 1 addition & 1 deletion services/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"description": "services library - this library implements all the base functionality for NATS services for javascript clients",
"dependencies": {
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-37"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
2 changes: 1 addition & 1 deletion transport-deno/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"imports": {
"@std/io": "jsr:@std/[email protected]",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-37",
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]"
}
Expand Down
8 changes: 4 additions & 4 deletions transport-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"node": ">= 18.0.0"
},
"dependencies": {
"@nats-io/nats-core": "3.0.0-36",
"@nats-io/nats-core": "3.0.0-37",
"@nats-io/nkeys": "1.2.0-7",
"@nats-io/nuid": "2.0.1-2"
},
Expand All @@ -64,8 +64,8 @@
"nats-jwt": "^0.0.9",
"shx": "^0.3.3",
"typescript": "5.6.3",
"@nats-io/jetstream": "3.0.0-24",
"@nats-io/kv": "3.0.0-18",
"@nats-io/obj": "3.0.0-19"
"@nats-io/jetstream": "3.0.0-25",
"@nats-io/kv": "3.0.0-19",
"@nats-io/obj": "3.0.0-20"
}
}

0 comments on commit 68bb674

Please sign in to comment.