Skip to content

Commit

Permalink
Merge branch 'main' into fix-134
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Nov 13, 2024
2 parents 18d8b49 + ab296de commit 46f17ff
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
7 changes: 7 additions & 0 deletions core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ export type ForceReconnectStatus = {
type: "forceReconnect";
};

export type SlowConsumerStatus = {
type: "slowConsumer";
sub: Subscription;
pending: number;
};

export type Status =
| DisconnectStatus
| ReconnectStatus
Expand All @@ -68,6 +74,7 @@ export type Status =
| ServerErrorStatus
| ClientPingStatus
| StaleConnectionStatus
| SlowConsumerStatus
| ForceReconnectStatus;

export type MsgCallback<T> = (err: Error | null, msg: T) => void;
Expand Down
8 changes: 4 additions & 4 deletions core/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import type {
Subscription,
SubscriptionOptions,
} from "./core.ts";
import { createInbox, Events, RequestStrategy } from "./core.ts";
import { createInbox, RequestStrategy } from "./core.ts";
import { errors, InvalidArgumentError, TimeoutError } from "./errors.ts";

export class NatsConnectionImpl implements NatsConnection {
Expand Down Expand Up @@ -130,11 +130,11 @@ export class NatsConnectionImpl implements NatsConnection {
const sub = new SubscriptionImpl(this.protocol, subject, opts);

if (typeof opts.callback !== "function" && typeof opts.slow === "number") {
const subj = sub.getSubject();
sub.setSlowNotificationFn(opts.slow, (pending: number) => {
this.protocol.dispatchStatus({
type: Events.SlowConsumer,
data: `subscription (${sub.sid}) ${subj} is slow: msgs ${pending}`,
type: "slowConsumer",
sub,
pending,
});
});
}
Expand Down
3 changes: 1 addition & 2 deletions core/tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
deferred,
delay,
Empty,
Events,
Feature,
headers,
isIP,
Expand Down Expand Up @@ -1547,7 +1546,7 @@ Deno.test("basics - slow", async () => {
(async () => {
for await (const m of nc.status()) {
//@ts-ignore: test
if (m.type === Events.SlowConsumer) {
if (m.type === "slowConsumer") {
console.log(m);
slow++;
}
Expand Down
10 changes: 5 additions & 5 deletions core/tests/timeout_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ import {
assertRejects,
assertStringIncludes,
} from "jsr:@std/assert";
import { connect } from "./connect.ts";
import { createInbox, Empty, errors } from "../src/internal_mod.ts";
import { cleanup, setup } from "../../test_helpers/mod.ts";

Deno.test("timeout - request noMux stack is useful", async () => {
const nc = await connect({ servers: "demo.nats.io" });
const { ns, nc } = await setup();
const subj = createInbox();
const err = await assertRejects(() => {
return nc.request(subj, Empty, { noMux: true, timeout: 250 });
}, errors.RequestError);
assertInstanceOf(err.cause, errors.NoRespondersError);
assertStringIncludes((err as Error).stack || "", "timeout_test");
await nc.close();
await cleanup(ns, nc);
});

Deno.test("timeout - request stack is useful", async () => {
const nc = await connect({ servers: "demo.nats.io" });
const { ns, nc } = await setup();
const subj = createInbox();
const err = await assertRejects(() => {
return nc.request(subj, Empty, { timeout: 250 });
}, errors.RequestError);
assertInstanceOf(err.cause, errors.NoRespondersError);
assertStringIncludes((err as Error).stack || "", "timeout_test");
await nc.close();
await cleanup(ns, nc);
});

0 comments on commit 46f17ff

Please sign in to comment.