Skip to content

Commit cfdb5b6

Browse files
authored
internal(direct): handle with optional callback (#153)
* internal(direct): handle with optional callback Removed the unnecessary error parameter from the callback function in the get method to streamline the code. Corrected the handling of errors by ensuring the check is done on the 'done' object directly, improving code clarity and maintainability. Signed-off-by: Alberto Ricart <[email protected]> --------- Signed-off-by: Alberto Ricart <[email protected]>
1 parent 3ced2f5 commit cfdb5b6

File tree

3 files changed

+157
-12
lines changed

3 files changed

+157
-12
lines changed

Diff for: jetstream/src/jsapi_types.ts

+4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import type { Nanos } from "@nats-io/nats-core";
1717
import { nanos } from "@nats-io/nats-core";
18+
import type { StoredMsg } from "./types.ts";
1819

1920
export interface ApiPaged {
2021
total: number;
@@ -509,11 +510,14 @@ export type DirectMsgRequest =
509510
| NextMsgRequest
510511
| StartTimeMsgRequest;
511512

513+
export type CompletionResult = { err?: Error };
514+
export type BatchCallback<T> = (done: CompletionResult | null, d: T) => void;
512515
export type StartSeq = { seq?: number };
513516
export type StartTime = { start_time?: Date | string };
514517
export type DirectBatchLimits = {
515518
batch?: number;
516519
max_bytes?: number;
520+
callback?: BatchCallback<StoredMsg>;
517521
};
518522
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
519523
export type DirectBatchStartTime = StartTime & DirectBatchLimits;

Diff for: jetstream/src/jsm_direct.ts

+53-11
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ import type {
2222
} from "./types.ts";
2323
import { DirectMsgHeaders } from "./types.ts";
2424
import type {
25+
CallbackFn,
2526
Codec,
2627
Msg,
2728
MsgHdrs,
2829
NatsConnection,
2930
QueuedIterator,
3031
ReviverFn,
31-
} from "@nats-io/nats-core";
32+
} from "@nats-io/nats-core/internal";
3233
import {
3334
createInbox,
3435
Empty,
@@ -37,6 +38,7 @@ import {
3738
TD,
3839
} from "@nats-io/nats-core/internal";
3940
import type {
41+
CompletionResult,
4042
DirectBatchOptions,
4143
DirectLastFor,
4244
DirectMsgRequest,
@@ -121,40 +123,80 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
121123
throw new Error(`batch direct require server ${min}`);
122124
}
123125
validateStreamName(stream);
124-
126+
const callback = typeof opts.callback === "function" ? opts.callback : null;
125127
const iter = new QueuedIteratorImpl<StoredMsg>();
128+
129+
function pushIter(
130+
done: CompletionResult | null,
131+
d: StoredMsg | CallbackFn,
132+
) {
133+
if (done) {
134+
iter.push(() => {
135+
done.err ? iter.stop(done.err) : iter.stop();
136+
});
137+
return;
138+
}
139+
iter.push(d);
140+
}
141+
142+
function pushCb(
143+
done: CompletionResult | null,
144+
m: StoredMsg | CallbackFn,
145+
) {
146+
const cb = callback!;
147+
if (typeof m === "function") {
148+
m();
149+
return;
150+
}
151+
cb(done, m);
152+
}
153+
154+
if (callback) {
155+
iter.iterClosed.then((err) => {
156+
push({ err: err ? err : undefined }, {} as StoredMsg);
157+
sub.unsubscribe();
158+
});
159+
}
160+
161+
const push = callback ? pushCb : pushIter;
162+
126163
const inbox = createInbox(this.nc.options.inboxPrefix);
127164
let batchSupported = false;
128165
const sub = this.nc.subscribe(inbox, {
129-
timeout: 10_000,
166+
timeout: 5000,
130167
callback: (err, msg) => {
131168
if (err) {
132-
iter.push(() => {
133-
iter.stop(err);
134-
});
169+
iter.stop(err);
135170
sub.unsubscribe();
136171
return;
137172
}
138173
const status = JetStreamStatus.maybeParseStatus(msg);
139174
if (status) {
140-
iter.push(() => {
141-
status.isEndOfBatch() ? iter.stop() : iter.stop(status.toError());
142-
});
175+
if (status.isEndOfBatch()) {
176+
push({}, () => {
177+
iter.stop();
178+
});
179+
} else {
180+
const err = status.toError();
181+
push({ err }, () => {
182+
iter.stop(err);
183+
});
184+
}
143185
return;
144186
}
145187
if (!batchSupported) {
146188
if (typeof msg.headers?.get("Nats-Num-Pending") !== "string") {
147189
// no batch/max_bytes option was provided, so single response
148190
sub.unsubscribe();
149-
iter.push(() => {
191+
push({}, () => {
150192
iter.stop();
151193
});
152194
} else {
153195
batchSupported = true;
154196
}
155197
}
156198

157-
iter.push(new DirectMsgImpl(msg));
199+
push(null, new DirectMsgImpl(msg));
158200
},
159201
});
160202

Diff for: jetstream/tests/jsm_direct_test.ts

+100-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ import {
1717
assertArrayIncludes,
1818
assertEquals,
1919
assertExists,
20+
assertIsError,
2021
assertRejects,
22+
fail,
2123
} from "jsr:@std/assert";
2224

2325
import { deferred, delay } from "@nats-io/nats-core";
@@ -27,6 +29,7 @@ import {
2729
JetStreamError,
2830
jetstreamManager,
2931
StorageType,
32+
type StoredMsg,
3033
} from "../src/mod.ts";
3134
import {
3235
cleanup,
@@ -36,9 +39,15 @@ import {
3639
setup,
3740
} from "test_helpers";
3841

42+
import { JetStreamStatusError } from "../src/jserrors.ts";
43+
3944
import type { JetStreamManagerImpl } from "../src/jsclient.ts";
4045
import type { DirectBatchOptions, DirectLastFor } from "../src/jsapi_types.ts";
41-
import type { NatsConnectionImpl } from "@nats-io/nats-core/internal";
46+
import {
47+
type NatsConnectionImpl,
48+
type QueuedIteratorImpl,
49+
TimeoutError,
50+
} from "@nats-io/nats-core/internal";
4251

4352
Deno.test("direct - decoder", async (t) => {
4453
const { ns, nc } = await setup(jetstreamServerConf({}));
@@ -174,6 +183,96 @@ Deno.test("direct - get", async (t) => {
174183
await cleanup(ns, nc);
175184
});
176185

186+
Deno.test("direct callback", async (t) => {
187+
const { ns, nc } = await setup(jetstreamServerConf());
188+
if (await notCompatible(ns, nc, "2.11.0")) {
189+
return;
190+
}
191+
const nci = nc as NatsConnectionImpl;
192+
const jsm = await jetstreamManager(nci) as JetStreamManagerImpl;
193+
194+
await t.step("no stream", async () => {
195+
const d = deferred();
196+
const iter = await jsm.direct.getBatch("hello", {
197+
//@ts-ignore: test
198+
seq: 1,
199+
callback: (done, _) => {
200+
assertExists(done);
201+
assertIsError(done.err, TimeoutError);
202+
d.resolve();
203+
},
204+
}) as QueuedIteratorImpl<StoredMsg>;
205+
206+
const err = await iter.iterClosed;
207+
assertIsError(err, TimeoutError);
208+
});
209+
210+
await t.step("message not found", async () => {
211+
await jsm.streams.add({
212+
name: "empty",
213+
subjects: ["empty"],
214+
storage: StorageType.Memory,
215+
allow_direct: true,
216+
});
217+
218+
const iter = await jsm.direct.getBatch("empty", {
219+
//@ts-ignore: test
220+
seq: 1,
221+
callback: (done, _) => {
222+
assertExists(done);
223+
assertIsError(done.err, JetStreamStatusError, "message not found");
224+
},
225+
}) as QueuedIteratorImpl<StoredMsg>;
226+
227+
const err = await iter.iterClosed;
228+
assertIsError(err, JetStreamStatusError, "message not found");
229+
});
230+
231+
await t.step("6 messages", async () => {
232+
await jsm.streams.add({
233+
name: "A",
234+
subjects: ["a.*"],
235+
storage: StorageType.Memory,
236+
allow_direct: true,
237+
});
238+
239+
const js = jetstream(nc);
240+
await Promise.all([
241+
js.publish("a.a"),
242+
js.publish("a.b"),
243+
js.publish("a.c"),
244+
]);
245+
246+
const buf: StoredMsg[] = [];
247+
248+
const iter = await jsm.direct.getBatch("A", {
249+
batch: 10,
250+
seq: 1,
251+
callback: (done, sm) => {
252+
if (done) {
253+
if (done.err) {
254+
console.log(done.err);
255+
fail(done.err.message);
256+
}
257+
return;
258+
}
259+
buf.push(sm);
260+
},
261+
}) as QueuedIteratorImpl<StoredMsg>;
262+
263+
const err = await iter.iterClosed;
264+
assertEquals(err, undefined);
265+
266+
const subj = buf.map((m) => m.subject);
267+
assertEquals(subj.length, 3);
268+
assertEquals(subj[0], "a.a");
269+
assertEquals(subj[1], "a.b");
270+
assertEquals(subj[2], "a.c");
271+
});
272+
273+
await cleanup(ns, nc);
274+
});
275+
177276
Deno.test("direct - batch", async (t) => {
178277
const { ns, nc } = await setup(jetstreamServerConf());
179278
if (await notCompatible(ns, nc, "2.11.0")) {

0 commit comments

Comments
 (0)