Skip to content

Commit

Permalink
[FIX] name of the consumer was not captured when binding to an epheme…
Browse files Browse the repository at this point in the history
…ral, this made sub#consumerInfo() fail. (#330)
  • Loading branch information
aricart authored Jul 13, 2022
1 parent e17fb3d commit eacbc3b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
5 changes: 5 additions & 0 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ export class JetStreamClientImpl extends BaseApiClient
jsi.last = info;
jsi.config = info.config;
jsi.attached = true;
// if not a durable capture the name of the ephemeral so
// that consumerInfo from the sub will work
if (!jsi.config.durable_name) {
jsi.name = info.name;
}
}
} catch (err) {
//consumer doesn't exist
Expand Down
66 changes: 66 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3345,3 +3345,69 @@ Deno.test("jetstream - pull consumer max_bytes rejected on old servers", async (

await cleanup(ns, nc);
});

Deno.test("jetstream - bind ephemeral can get consumer info", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();

async function testEphemeral(deliverSubject = ""): Promise<void> {
const ci = await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
inactive_threshold: nanos(5000),
deliver_subject: deliverSubject,
});

const js = nc.jetstream();
const opts = consumerOpts();
opts.bind(stream, ci.name);
const sub = deliverSubject
? await js.subscribe(subj, opts)
: await js.pullSubscribe(subj, opts);
const sci = await sub.consumerInfo();
assertEquals(
sci.name,
ci.name,
`failed getting ci for ${deliverSubject ? "push" : "pull"}`,
);
}

await testEphemeral();
await testEphemeral(createInbox());

await cleanup(ns, nc);
});

Deno.test("jetstream - create ephemeral with config can get consumer info", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);
const js = nc.jetstream();

async function testEphemeral(deliverSubject = ""): Promise<void> {
const opts = {
stream,
config: {
ack_policy: AckPolicy.Explicit,
deliver_subject: deliverSubject,
},
};
const sub = deliverSubject
? await js.subscribe(subj, opts)
: await js.pullSubscribe(subj, opts);
const ci = await sub.consumerInfo();
assert(
ci.name,
`failed getting name for ${deliverSubject ? "push" : "pull"}`,
);
assert(
!ci.config.durable_name,
`unexpected durable name for ${deliverSubject ? "push" : "pull"}`,
);
}

await testEphemeral();
await testEphemeral(createInbox());

await cleanup(ns, nc);
});

0 comments on commit eacbc3b

Please sign in to comment.