Skip to content

Commit

Permalink
Merge pull request #708 from nats-io/fix-ackack-timeout
Browse files Browse the repository at this point in the history
Honor JS timeout in ackAck() and also allow to override
  • Loading branch information
aricart authored Jun 20, 2024
2 parents 8b7c1f4 + 24df2ca commit 8a97f4c
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 40 deletions.
2 changes: 1 addition & 1 deletion jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
// push the user message
this._push(toJsMsg(msg));
this._push(toJsMsg(msg, this.consumer.api.timeout));
this.received++;
if (this.pending.msgs) {
this.pending.msgs--;
Expand Down
72 changes: 39 additions & 33 deletions jetstream/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ export class JetStreamClientImpl extends BaseApiClient
if (err) {
throw err;
}
return toJsMsg(msg);
return toJsMsg(msg, this.timeout);
}

/*
Expand Down Expand Up @@ -389,7 +389,7 @@ export class JetStreamClientImpl extends BaseApiClient
// if we are doing heartbeats, message resets
monitor?.work();
qi.received++;
qi.push(toJsMsg(msg));
qi.push(toJsMsg(msg, this.timeout));
}
},
});
Expand Down Expand Up @@ -653,7 +653,7 @@ export class JetStreamClientImpl extends BaseApiClient
jsi: JetStreamSubscriptionInfo,
): TypedSubscriptionOptions<JsMsg> {
const so = {} as TypedSubscriptionOptions<JsMsg>;
so.adapter = msgAdapter(jsi.callbackFn === undefined);
so.adapter = msgAdapter(jsi.callbackFn === undefined, this.timeout);
so.ingestionFilterFn = JetStreamClientImpl.ingestionFn(jsi.ordered);
so.protocolFilterFn = (jm, ingest = false): boolean => {
const jsmi = jm as JsMsgImpl;
Expand Down Expand Up @@ -979,44 +979,50 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
}
}

function msgAdapter(iterator: boolean): MsgAdapter<JsMsg> {
function msgAdapter(iterator: boolean, ackTimeout: number): MsgAdapter<JsMsg> {
if (iterator) {
return iterMsgAdapter;
return iterMsgAdapter(ackTimeout);
} else {
return cbMsgAdapter;
return cbMsgAdapter(ackTimeout);
}
}

function cbMsgAdapter(
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] {
if (err) {
return [err, null];
}
err = checkJsError(msg);
if (err) {
return [err, null];
}
// assuming that the protocolFilterFn is set!
return [null, toJsMsg(msg)];
function cbMsgAdapter(ackTimeout: number): MsgAdapter<JsMsg> {
return (
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] => {
if (err) {
return [err, null];
}
err = checkJsError(msg);
if (err) {
return [err, null];
}
// assuming that the protocolFilterFn is set!
return [null, toJsMsg(msg, ackTimeout)];
};
}

function iterMsgAdapter(
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] {
if (err) {
return [err, null];
}
// iterator will close if we have an error
// check for errors that shouldn't close it
const ne = checkJsError(msg);
if (ne !== null) {
return [hideNonTerminalJsErrors(ne), null];
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg)];
ackTimeout: number,
): MsgAdapter<JsMsg> {
return (
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] => {
if (err) {
return [err, null];
}
// iterator will close if we have an error
// check for errors that shouldn't close it
const ne = checkJsError(msg);
if (ne !== null) {
return [hideNonTerminalJsErrors(ne), null];
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg, ackTimeout)];
};
}

function hideNonTerminalJsErrors(ne: NatsError): NatsError | null {
Expand Down
16 changes: 10 additions & 6 deletions jetstream/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export interface JsMsg {
* successfully and that the JetStream server should acknowledge back
* that the acknowledgement was received.
*/
ackAck(): Promise<boolean>;
ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;

/**
* Convenience method to parse the message payload as JSON. This method
Expand All @@ -125,8 +125,8 @@ export interface JsMsg {
string(): string;
}

export function toJsMsg(m: Msg): JsMsg {
return new JsMsgImpl(m);
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
return new JsMsgImpl(m, ackTimeout);
}

export function parseInfo(s: string): DeliveryInfo {
Expand Down Expand Up @@ -164,10 +164,12 @@ export class JsMsgImpl implements JsMsg {
msg: Msg;
di?: DeliveryInfo;
didAck: boolean;
timeout: number;

constructor(msg: Msg) {
constructor(msg: Msg, timeout: number) {
this.msg = msg;
this.didAck = false;
this.timeout = timeout;
}

get subject(): string {
Expand Down Expand Up @@ -220,7 +222,9 @@ export class JsMsgImpl implements JsMsg {

// this has to dig into the internals as the message has access
// to the protocol but not the high-level client.
async ackAck(): Promise<boolean> {
async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
opts = opts || {};
opts.timeout = opts.timeout || this.timeout;
const d = deferred<boolean>();
if (!this.didAck) {
this.didAck = true;
Expand All @@ -229,7 +233,7 @@ export class JsMsgImpl implements JsMsg {
const proto = mi.publisher as unknown as ProtocolHandler;
const trace = !(proto.options?.noAsyncTraces || false);
const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
timeout: 1000,
timeout: opts.timeout,
}, trace);
proto.request(r);
try {
Expand Down
95 changes: 95 additions & 0 deletions jetstream/tests/jsmsg_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from "../../src/mod.ts";
import { JsMsgImpl, parseInfo, toJsMsg } from "../jsmsg.ts";
import {
assertBetween,
cleanup,
jetstreamServerConf,
setup,
Expand Down Expand Up @@ -225,3 +226,97 @@ Deno.test("jsmsg - explicit consumer ackAck timeout", async () => {

await cleanup(ns, nc);
});

Deno.test("jsmsg - custom consumer ackAck timeout", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const c = await js.consumers.get("A", "a");
const jm = await c.next();
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
const start = Date.now();
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck({ timeout: 1500 });
},
Error,
"TIMEOUT",
);
assertBetween(Date.now() - start, 1300, 1700);
await cleanup(ns, nc);
});

Deno.test("jsmsg - custom consumer ackAck timeout in jsopts", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream({ timeout: 2000 });
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const c = await js.consumers.get("A", "a");
const jm = await c.next();
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
const start = Date.now();
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck();
},
Error,
"TIMEOUT",
);
assertBetween(Date.now() - start, 1800, 2200);

await cleanup(ns, nc);
});

Deno.test("jsmsg - ackAck() timeout legacy jsopts", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream({ timeout: 1500 });
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const jm = await js.pull("A", "a");
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
const start = Date.now();
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck();
},
Error,
"TIMEOUT",
);
assertBetween(Date.now() - start, 1300, 1700);

await cleanup(ns, nc);
});

0 comments on commit 8a97f4c

Please sign in to comment.