Skip to content

Commit

Permalink
merge of #715
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jul 1, 2024
1 parent f7c823d commit e6d1688
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ export class NatsConnectionImpl implements NatsConnection {
}
},
});
(sub as SubscriptionImpl).requestSubject = subject;

sub.closed
.then(() => {
Expand Down Expand Up @@ -375,6 +376,7 @@ export class NatsConnectionImpl implements NatsConnection {
if (errCtx && err.code !== ErrorCode.Timeout) {
err.stack += `\n\n${errCtx.stack}`;
}
sub.unsubscribe();
d.reject(err);
} else {
err = isRequestError(msg);
Expand Down
79 changes: 79 additions & 0 deletions core/tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,85 @@ Deno.test("basics - no mux requests", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - no mux request timeout doesn't leak subs", async () => {
const { ns, nc } = await _setup(connect);

nc.subscribe("q", { callback: () => {} });
const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 1);

await assertRejects(
async () => {
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
},
Error,
"TIMEOUT",
);

assertEquals(nci.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("basics - no mux request no responders doesn't leak subs", async () => {
const { ns, nc } = await _setup(connect);

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
},
Error,
"503",
);

assertEquals(nci.protocol.subscriptions.size(), 0);
await cleanup(ns, nc);
});

Deno.test("basics - no mux request no perms doesn't leak subs", async () => {
const { ns, nc } = await _setup(connect, {
authorization: {
users: [{
user: "s",
password: "s",
permission: {
publish: "q",
subscribe: "response",
allow_responses: true,
},
}],
},
}, { user: "s", pass: "s" });

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
await nc.request("qq", Empty, {
noMux: true,
reply: "response",
timeout: 1000,
});
},
Error,
"Permissions Violation for Publish",
);

await assertRejects(
async () => {
await nc.request("q", Empty, { noMux: true, reply: "r", timeout: 1000 });
},
Error,
"Permissions Violation for Subscription",
);

assertEquals(nci.protocol.subscriptions.size(), 0);
await cleanup(ns, nc);
});

Deno.test("basics - no max_payload messages", async () => {
const { ns, nc } = await _setup(connect, { max_payload: 2048 });
const nci = nc as NatsConnectionImpl;
Expand Down
82 changes: 82 additions & 0 deletions core/tests/mrequest_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,85 @@ Deno.test("mreq - lost sub permission", async () => {
await d;
await cleanup(ns, nc);
});

Deno.test("mreq - timeout doesn't leak subs", async () => {
const { ns, nc } = await _setup(connect);

nc.subscribe("q", { callback: () => {} });
const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 1);

// there's no error here - the empty response is the timeout
const iter = await nc.requestMany("q", Empty, {
maxWait: 1000,
maxMessages: 10,
noMux: true,
});
for await (const _ of iter) {
// nothing
}

assertEquals(nci.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("mreq - no responder doesn't leak subs", async () => {
const { ns, nc } = await _setup(connect);

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
const iter = await nc.requestMany("q", Empty, {
noMux: true,
maxWait: 1000,
maxMessages: 10,
});
for await (const _ of iter) {
// nothing
}
},
Error,
"503",
);

// the mux subscription
assertEquals(nci.protocol.subscriptions.size(), 0);
await cleanup(ns, nc);
});

Deno.test("mreq - no mux request no perms doesn't leak subs", async () => {
const { ns, nc } = await _setup(connect, {
authorization: {
users: [{
user: "s",
password: "s",
permission: {
publish: "q",
subscribe: ">",
allow_responses: true,
},
}],
},
}, { user: "s", pass: "s" });

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
const iter = await nc.requestMany("qq", Empty, {
noMux: true,
maxWait: 1000,
});
for await (const _ of iter) {
// nothing
}
},
Error,
"Permissions Violation for Publish",
);

await cleanup(ns, nc);
});

0 comments on commit e6d1688

Please sign in to comment.