diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index 6ed6a863..2bb1049a 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -420,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } } else { // push the user message - this._push(toJsMsg(msg)); + this._push(toJsMsg(msg, this.consumer.api.timeout)); this.received++; if (this.pending.msgs) { this.pending.msgs--; diff --git a/jetstream/jsclient.ts b/jetstream/jsclient.ts index 58b4487c..03245a06 100644 --- a/jetstream/jsclient.ts +++ b/jetstream/jsclient.ts @@ -276,7 +276,7 @@ export class JetStreamClientImpl extends BaseApiClient if (err) { throw err; } - return toJsMsg(msg); + return toJsMsg(msg, this.timeout); } /* @@ -389,7 +389,7 @@ export class JetStreamClientImpl extends BaseApiClient // if we are doing heartbeats, message resets monitor?.work(); qi.received++; - qi.push(toJsMsg(msg)); + qi.push(toJsMsg(msg, this.timeout)); } }, }); @@ -653,7 +653,7 @@ export class JetStreamClientImpl extends BaseApiClient jsi: JetStreamSubscriptionInfo, ): TypedSubscriptionOptions { const so = {} as TypedSubscriptionOptions; - so.adapter = msgAdapter(jsi.callbackFn === undefined); + so.adapter = msgAdapter(jsi.callbackFn === undefined, this.timeout); so.ingestionFilterFn = JetStreamClientImpl.ingestionFn(jsi.ordered); so.protocolFilterFn = (jm, ingest = false): boolean => { const jsmi = jm as JsMsgImpl; @@ -979,44 +979,50 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl } } -function msgAdapter(iterator: boolean): MsgAdapter { +function msgAdapter(iterator: boolean, ackTimeout: number): MsgAdapter { if (iterator) { - return iterMsgAdapter; + return iterMsgAdapter(ackTimeout); } else { - return cbMsgAdapter; + return cbMsgAdapter(ackTimeout); } } -function cbMsgAdapter( - err: NatsError | null, - msg: Msg, -): [NatsError | null, JsMsg | null] { - if (err) { - return [err, null]; - } - err = checkJsError(msg); - if (err) { - return [err, null]; - } - // assuming that the protocolFilterFn is set! - return [null, toJsMsg(msg)]; +function cbMsgAdapter(ackTimeout: number): MsgAdapter { + return ( + err: NatsError | null, + msg: Msg, + ): [NatsError | null, JsMsg | null] => { + if (err) { + return [err, null]; + } + err = checkJsError(msg); + if (err) { + return [err, null]; + } + // assuming that the protocolFilterFn is set! + return [null, toJsMsg(msg, ackTimeout)]; + }; } function iterMsgAdapter( - err: NatsError | null, - msg: Msg, -): [NatsError | null, JsMsg | null] { - if (err) { - return [err, null]; - } - // iterator will close if we have an error - // check for errors that shouldn't close it - const ne = checkJsError(msg); - if (ne !== null) { - return [hideNonTerminalJsErrors(ne), null]; - } - // assuming that the protocolFilterFn is set - return [null, toJsMsg(msg)]; + ackTimeout: number, +): MsgAdapter { + return ( + err: NatsError | null, + msg: Msg, + ): [NatsError | null, JsMsg | null] => { + if (err) { + return [err, null]; + } + // iterator will close if we have an error + // check for errors that shouldn't close it + const ne = checkJsError(msg); + if (ne !== null) { + return [hideNonTerminalJsErrors(ne), null]; + } + // assuming that the protocolFilterFn is set + return [null, toJsMsg(msg, ackTimeout)]; + }; } function hideNonTerminalJsErrors(ne: NatsError): NatsError | null { diff --git a/jetstream/jsmsg.ts b/jetstream/jsmsg.ts index 10bcdcf4..734be6d5 100644 --- a/jetstream/jsmsg.ts +++ b/jetstream/jsmsg.ts @@ -110,7 +110,7 @@ export interface JsMsg { * successfully and that the JetStream server should acknowledge back * that the acknowledgement was received. */ - ackAck(): Promise; + ackAck(opts?: Partial<{ timeout: number }>): Promise; /** * Convenience method to parse the message payload as JSON. This method @@ -125,8 +125,8 @@ export interface JsMsg { string(): string; } -export function toJsMsg(m: Msg): JsMsg { - return new JsMsgImpl(m); +export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg { + return new JsMsgImpl(m, ackTimeout); } export function parseInfo(s: string): DeliveryInfo { @@ -164,10 +164,12 @@ export class JsMsgImpl implements JsMsg { msg: Msg; di?: DeliveryInfo; didAck: boolean; + timeout: number; - constructor(msg: Msg) { + constructor(msg: Msg, timeout: number) { this.msg = msg; this.didAck = false; + this.timeout = timeout; } get subject(): string { @@ -220,7 +222,9 @@ export class JsMsgImpl implements JsMsg { // this has to dig into the internals as the message has access // to the protocol but not the high-level client. - async ackAck(): Promise { + async ackAck(opts?: Partial<{ timeout: number }>): Promise { + opts = opts || {}; + opts.timeout = opts.timeout || this.timeout; const d = deferred(); if (!this.didAck) { this.didAck = true; @@ -229,7 +233,7 @@ export class JsMsgImpl implements JsMsg { const proto = mi.publisher as unknown as ProtocolHandler; const trace = !(proto.options?.noAsyncTraces || false); const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, { - timeout: 1000, + timeout: opts.timeout, }, trace); proto.request(r); try { diff --git a/jetstream/tests/jsmsg_test.ts b/jetstream/tests/jsmsg_test.ts index e0d0b416..f7c18dbb 100644 --- a/jetstream/tests/jsmsg_test.ts +++ b/jetstream/tests/jsmsg_test.ts @@ -29,6 +29,7 @@ import { } from "../../src/mod.ts"; import { JsMsgImpl, parseInfo, toJsMsg } from "../jsmsg.ts"; import { + assertBetween, cleanup, jetstreamServerConf, setup, @@ -225,3 +226,97 @@ Deno.test("jsmsg - explicit consumer ackAck timeout", async () => { await cleanup(ns, nc); }); + +Deno.test("jsmsg - custom consumer ackAck timeout", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager() as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = nc.jetstream(); + await js.publish("a.a"); + + await jsm.consumers.add("A", { durable_name: "a" }); + const c = await js.consumers.get("A", "a"); + const jm = await c.next(); + // change the subject + ((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx"; + nc.subscribe("xxxx"); + const start = Date.now(); + await assertRejects( + (): Promise => { + return jm!.ackAck({ timeout: 1500 }); + }, + Error, + "TIMEOUT", + ); + assertBetween(Date.now() - start, 1300, 1700); + await cleanup(ns, nc); +}); + +Deno.test("jsmsg - custom consumer ackAck timeout in jsopts", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager() as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = nc.jetstream({ timeout: 2000 }); + await js.publish("a.a"); + + await jsm.consumers.add("A", { durable_name: "a" }); + const c = await js.consumers.get("A", "a"); + const jm = await c.next(); + // change the subject + ((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx"; + nc.subscribe("xxxx"); + const start = Date.now(); + await assertRejects( + (): Promise => { + return jm!.ackAck(); + }, + Error, + "TIMEOUT", + ); + assertBetween(Date.now() - start, 1800, 2200); + + await cleanup(ns, nc); +}); + +Deno.test("jsmsg - ackAck() timeout legacy jsopts", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager() as JetStreamManagerImpl; + await jsm.streams.add({ + name: "A", + subjects: ["a.>"], + storage: StorageType.Memory, + allow_direct: true, + }); + + const js = nc.jetstream({ timeout: 1500 }); + await js.publish("a.a"); + + await jsm.consumers.add("A", { durable_name: "a" }); + const jm = await js.pull("A", "a"); + // change the subject + ((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx"; + nc.subscribe("xxxx"); + const start = Date.now(); + await assertRejects( + (): Promise => { + return jm!.ackAck(); + }, + Error, + "TIMEOUT", + ); + assertBetween(Date.now() - start, 1300, 1700); + + await cleanup(ns, nc); +});