From f78c0c23a02318aae6324f2ef80b4172c9ea16d6 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 14 Jan 2025 14:27:06 -0600 Subject: [PATCH] feat(JS): `getConsumerFromInfo` method for pull consumer retrieval Introduce a `getConsumerFromInfo` method to retrieve a pull consumer without fetching ConsumerInfo. This ensures efficient consumer handling while avoiding unnecessary API calls. Included corresponding tests to validate functionality. Signed-off-by: Alberto Ricart --- jetstream/src/jsmstream_api.ts | 9 +++++- jetstream/src/types.ts | 19 ++++++++++++- jetstream/tests/consumers_test.ts | 47 ++++++++++++++++++++++++++++++- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/jetstream/src/jsmstream_api.ts b/jetstream/src/jsmstream_api.ts index 8b663470..d0f4f26c 100644 --- a/jetstream/src/jsmstream_api.ts +++ b/jetstream/src/jsmstream_api.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 The NATS Authors + * Copyright 2021-2025 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -244,6 +244,13 @@ export class ConsumersImpl implements Consumers { } } + getConsumerFromInfo(ci: ConsumerInfo): Consumer { + if (typeof ci.config.deliver_subject === "string") { + throw new Error("not a pull consumer"); + } + return new PullConsumerImpl(this.api, ci); + } + async ordered( stream: string, opts: Partial = {}, diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 01dc704a..a65d18d5 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 The NATS Authors + * Copyright 2023-2025 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -806,6 +806,17 @@ export type Consumers = { | Partial, ): Promise; + /** + * Returns a PullConsumer configured for the specified ConsumerInfo. + * Note this API doesn't create or check that the consumer exists. It + * simply returns a Consumer that you can use to process messages. + * This method should only be used when avoiding extra ConsumerInfo. + * If the underlying consumer doesn't exist, the consumer will raise + * heartbeat_missed notifications. + * @param ci + */ + getConsumerFromInfo(ci: ConsumerInfo): Consumer; + getPushConsumer( stream: string, name?: @@ -978,6 +989,12 @@ export type Stream = { best(): Promise; + /** + * Returns the named consumer pull consumer. If instead of a name + * an OrderedConsumerOptions configuration is passed, an + * ordered pull consumer is created and returned. + * @param name + */ getConsumer( name?: string | Partial, ): Promise; diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index 207a6d79..e80f3a15 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 The NATS Authors + * Copyright 2022-2025 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -16,6 +16,7 @@ import { initStream } from "./jstest_util.ts"; import { assertEquals, assertExists, + assertFalse, assertRejects, assertStringIncludes, } from "jsr:@std/assert"; @@ -533,3 +534,47 @@ Deno.test("consumers - processed", async () => { await cleanup(ns, nc); }); + +Deno.test("consumers - getConsumerFromInfo doesn't do info", async () => { + const { ns, nc } = await setup(jetstreamServerConf(), { + inboxPrefix: "x", + }); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "messages", subjects: ["hello"] }); + + const js = jetstream(nc); + await js.publish("hello"); + await js.publish("hello"); + + const ci = await jsm.consumers.add("messages", { + durable_name: "c", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + ack_wait: nanos(3000), + max_waiting: 500, + }); + + let heardInfo = false; + nc.subscribe("$JS.API.CONSUMER.INFO.messages.c", { + callback: (_, m) => { + heardInfo = true; + }, + }); + + const consumer = js.consumers.getConsumerFromInfo(ci); + const iter = await consumer.consume({ + callback: (m) => { + m.ack(); + if (m.info.pending === 0) { + iter.stop(); + } + }, + }); + + await iter.closed(); + assertEquals(iter.getProcessed(), 2); + assertEquals(iter.getReceived(), 2); + assertFalse(heardInfo); + + await cleanup(ns, nc); +});