Skip to content

Commit

Permalink
feat(JS): getConsumerFromInfo method for pull consumer retrieval
Browse files Browse the repository at this point in the history
Introduce a `getConsumerFromInfo` method to retrieve a pull consumer without fetching ConsumerInfo. This ensures efficient consumer handling while avoiding unnecessary API calls. Included corresponding tests to validate functionality.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Jan 14, 2025
1 parent 6543319 commit f78c0c2
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
9 changes: 8 additions & 1 deletion jetstream/src/jsmstream_api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 The NATS Authors
* Copyright 2021-2025 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -244,6 +244,13 @@ export class ConsumersImpl implements Consumers {
}
}

getConsumerFromInfo(ci: ConsumerInfo): Consumer {
if (typeof ci.config.deliver_subject === "string") {
throw new Error("not a pull consumer");
}
return new PullConsumerImpl(this.api, ci);
}

async ordered(
stream: string,
opts: Partial<OrderedConsumerOptions> = {},
Expand Down
19 changes: 18 additions & 1 deletion jetstream/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 The NATS Authors
* Copyright 2023-2025 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -806,6 +806,17 @@ export type Consumers = {
| Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

/**
* Returns a PullConsumer configured for the specified ConsumerInfo.
* Note this API doesn't create or check that the consumer exists. It
* simply returns a Consumer that you can use to process messages.
* This method should only be used when avoiding extra ConsumerInfo.
* If the underlying consumer doesn't exist, the consumer will raise
* heartbeat_missed notifications.
* @param ci
*/
getConsumerFromInfo(ci: ConsumerInfo): Consumer;

getPushConsumer(
stream: string,
name?:
Expand Down Expand Up @@ -978,6 +989,12 @@ export type Stream = {

best(): Promise<Stream>;

/**
* Returns the named consumer pull consumer. If instead of a name
* an OrderedConsumerOptions configuration is passed, an
* ordered pull consumer is created and returned.
* @param name
*/
getConsumer(
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;
Expand Down
47 changes: 46 additions & 1 deletion jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 The NATS Authors
* Copyright 2022-2025 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -16,6 +16,7 @@ import { initStream } from "./jstest_util.ts";
import {
assertEquals,
assertExists,
assertFalse,
assertRejects,
assertStringIncludes,
} from "jsr:@std/assert";
Expand Down Expand Up @@ -533,3 +534,47 @@ Deno.test("consumers - processed", async () => {

await cleanup(ns, nc);
});

Deno.test("consumers - getConsumerFromInfo doesn't do info", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), {
inboxPrefix: "x",
});
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = jetstream(nc);
await js.publish("hello");
await js.publish("hello");

const ci = await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

let heardInfo = false;
nc.subscribe("$JS.API.CONSUMER.INFO.messages.c", {
callback: (_, m) => {
heardInfo = true;
},
});

const consumer = js.consumers.getConsumerFromInfo(ci);
const iter = await consumer.consume({
callback: (m) => {
m.ack();
if (m.info.pending === 0) {
iter.stop();
}
},
});

await iter.closed();
assertEquals(iter.getProcessed(), 2);
assertEquals(iter.getReceived(), 2);
assertFalse(heardInfo);

await cleanup(ns, nc);
});

0 comments on commit f78c0c2

Please sign in to comment.