Skip to content

Commit

Permalink
Merge pull request #715 from nats-io/mux-leaks
Browse files Browse the repository at this point in the history
fixed noMux (request/requestMany) leaked subscriptions
  • Loading branch information
aricart authored Jul 1, 2024
2 parents f5a4365 + abb35f7 commit eb06fad
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
2 changes: 2 additions & 0 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ export class NatsConnectionImpl implements NatsConnection {
}
},
});
(sub as SubscriptionImpl).requestSubject = subject;

sub.closed
.then(() => {
Expand Down Expand Up @@ -389,6 +390,7 @@ export class NatsConnectionImpl implements NatsConnection {
if (errCtx && err.code !== ErrorCode.Timeout) {
err.stack += `\n\n${errCtx.stack}`;
}
sub.unsubscribe();
d.reject(err);
} else {
err = isRequestError(msg);
Expand Down
79 changes: 79 additions & 0 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,85 @@ Deno.test("basics - no mux requests timeout", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - no mux request timeout doesn't leak subs", async () => {
const { ns, nc } = await setup();

nc.subscribe("q", { callback: () => {} });
const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 1);

await assertRejects(
async () => {
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
},
Error,
"TIMEOUT",
);

assertEquals(nci.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("basics - no mux request no responders doesn't leak subs", async () => {
const { ns, nc } = await setup();

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
},
Error,
"503",
);

assertEquals(nci.protocol.subscriptions.size(), 0);
await cleanup(ns, nc);
});

Deno.test("basics - no mux request no perms doesn't leak subs", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "s",
password: "s",
permission: {
publish: "q",
subscribe: "response",
allow_responses: true,
},
}],
},
}, { user: "s", pass: "s" });

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
await nc.request("qq", Empty, {
noMux: true,
reply: "response",
timeout: 1000,
});
},
Error,
"Permissions Violation for Publish",
);

await assertRejects(
async () => {
await nc.request("q", Empty, { noMux: true, reply: "r", timeout: 1000 });
},
Error,
"Permissions Violation for Subscription",
);

assertEquals(nci.protocol.subscriptions.size(), 0);
await cleanup(ns, nc);
});

Deno.test("basics - no mux requests", async () => {
const { ns, nc } = await setup({ max_payload: 2048 });
const subj = createInbox();
Expand Down
82 changes: 82 additions & 0 deletions tests/mrequest_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,85 @@ Deno.test("mreq - lost sub permission", async () => {
await d;
await cleanup(ns, nc);
});

Deno.test("mreq - timeout doesn't leak subs", async () => {
const { ns, nc } = await setup();

nc.subscribe("q", { callback: () => {} });
const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 1);

// there's no error here - the empty response is the timeout
const iter = await nc.requestMany("q", Empty, {
maxWait: 1000,
maxMessages: 10,
noMux: true,
});
for await (const _ of iter) {
// nothing
}

assertEquals(nci.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("mreq - no responder doesn't leak subs", async () => {
const { ns, nc } = await setup();

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
const iter = await nc.requestMany("q", Empty, {
noMux: true,
maxWait: 1000,
maxMessages: 10,
});
for await (const _ of iter) {
// nothing
}
},
Error,
"503",
);

// the mux subscription
assertEquals(nci.protocol.subscriptions.size(), 0);
await cleanup(ns, nc);
});

Deno.test("mreq - no mux request no perms doesn't leak subs", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "s",
password: "s",
permission: {
publish: "q",
subscribe: ">",
allow_responses: true,
},
}],
},
}, { user: "s", pass: "s" });

const nci = nc as NatsConnectionImpl;
assertEquals(nci.protocol.subscriptions.size(), 0);

await assertRejects(
async () => {
const iter = await nc.requestMany("qq", Empty, {
noMux: true,
maxWait: 1000,
});
for await (const _ of iter) {
// nothing
}
},
Error,
"Permissions Violation for Publish",
);

await cleanup(ns, nc);
});

0 comments on commit eb06fad

Please sign in to comment.