Skip to content

Commit

Permalink
fix(jetstream): direct batch get missing next_by_subj filter
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Feb 1, 2025
1 parent 1281814 commit e7a0e73
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
1 change: 1 addition & 0 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ export type DirectBatchLimits = {
batch?: number;
max_bytes?: number;
callback?: BatchCallback<StoredMsg>;
next_by_subj?: string;
};
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
export type DirectBatchStartTime = StartTime & DirectBatchLimits;
Expand Down
53 changes: 53 additions & 0 deletions jetstream/tests/jsm_direct_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,56 @@ Deno.test("direct - last message for", async (t) => {

await cleanup(ns, nc);
});

Deno.test("direct - batch next_by_subj", async (t) => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}
const nci = nc as NatsConnectionImpl;
const jsm = await jetstreamManager(nci) as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a", "b"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = jsm.jetstream();

const buf = [];
for (let i = 0; i < 50; i++) {
buf.push(js.publish(`a`), buf.push(js.publish(`b`)));
}
await Promise.all(buf);

const msgs = [];
let iter = await jsm.direct.getBatch("A", {
seq: 0,
batch: 100,
next_by_subj: "a",
});
for await (const m of iter) {
msgs.push(m.subject);
}
assertEquals(msgs.length, 50);
for (let i = 0; i < 50; i++) {
assertEquals(msgs[i], "a");
}

msgs.length = 0;
iter = await jsm.direct.getBatch("A", {
seq: 50,
batch: 100,
next_by_subj: "b",
});
for await (const m of iter) {
msgs.push(m.subject);
}
assertEquals(msgs.length, 26);
for (let i = 0; i < 26; i++) {
assertEquals(msgs[i], "b");
}

await cleanup(ns, nc);
});

0 comments on commit e7a0e73

Please sign in to comment.