Skip to content

Commit 3a48162

Browse files
authored
[FEAT] implements the ability in KV to request a specific revision for a key (#302)
- [FEAT] implements the ability in KV to request a specific revision for a key - [FEAT] added check when retrieving a key and specifying a sequence (new feature), that the key matched. If not, null is returned as the sequence is not referencing the expected key. - [FIX] conversion from stored message captured the key as provided by the client, and returned an entry with the said key. This issue is only a problem when combined with the new feature to specify a revision for the key, as the entry returned could have been not related to the key. FIX #301
1 parent a148e7e commit 3a48162

File tree

4 files changed

+59
-14
lines changed

4 files changed

+59
-14
lines changed

Diff for: nats-base-client/kv.ts

+22-10
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {
3535
KvPutOptions,
3636
KvRemove,
3737
KvStatus,
38+
MsgRequest,
3839
PurgeOpts,
3940
PurgeResponse,
4041
RetentionPolicy,
@@ -311,10 +312,10 @@ export class Bucket implements KV, KvRemove {
311312
return data.length;
312313
}
313314

314-
smToEntry(key: string, sm: StoredMsg): KvEntry {
315+
smToEntry(sm: StoredMsg): KvEntry {
315316
return {
316317
bucket: this.bucket,
317-
key: key,
318+
key: sm.subject.substring(this.prefixLen),
318319
value: sm.data,
319320
delta: 0,
320321
created: sm.time,
@@ -324,7 +325,7 @@ export class Bucket implements KV, KvRemove {
324325
};
325326
}
326327

327-
jmToEntry(_k: string, jm: JsMsg): KvEntry {
328+
jmToEntry(jm: JsMsg): KvEntry {
328329
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
329330
return {
330331
bucket: this.bucket,
@@ -367,14 +368,25 @@ export class Bucket implements KV, KvRemove {
367368
return pa.seq;
368369
}
369370

370-
async get(k: string): Promise<KvEntry | null> {
371+
async get(
372+
k: string,
373+
opts?: { revision: number },
374+
): Promise<KvEntry | null> {
371375
const ek = this.encodeKey(k);
372376
this.validateKey(ek);
377+
378+
let arg: MsgRequest = { last_by_subj: this.fullKeyName(ek) };
379+
if (opts && opts.revision > 0) {
380+
arg = { seq: opts.revision };
381+
}
382+
373383
try {
374-
const sm = await this.jsm.streams.getMessage(this.bucketName(), {
375-
last_by_subj: this.fullKeyName(ek),
376-
});
377-
return this.smToEntry(k, sm);
384+
const sm = await this.jsm.streams.getMessage(this.bucketName(), arg);
385+
const ke = this.smToEntry(sm);
386+
if (ke.key !== ek) {
387+
return null;
388+
}
389+
return ke;
378390
} catch (err) {
379391
if (err.message === "no message found") {
380392
return null;
@@ -468,7 +480,7 @@ export class Bucket implements KV, KvRemove {
468480
return;
469481
}
470482
if (jm) {
471-
const e = this.jmToEntry(k, jm);
483+
const e = this.jmToEntry(jm);
472484
qi.push(e);
473485
qi.received++;
474486
//@ts-ignore - function will be removed
@@ -545,7 +557,7 @@ export class Bucket implements KV, KvRemove {
545557
return;
546558
}
547559
if (jm) {
548-
const e = this.jmToEntry(k, jm);
560+
const e = this.jmToEntry(jm);
549561
qi.push(e);
550562
qi.received++;
551563

Diff for: nats-base-client/types.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ export interface KvRemove {
914914
}
915915

916916
export interface RoKV {
917-
get(k: string): Promise<KvEntry | null>;
917+
get(k: string, opts?: { revision: number }): Promise<KvEntry | null>;
918918
history(opts?: { key?: string }): Promise<QueuedIterator<KvEntry>>;
919919
watch(
920920
opts?: { key?: string; headers_only?: boolean; initializedFn?: callbackFn },

Diff for: tests/jetstream_test.ts

-3
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ import {
3939
JsMsg,
4040
JsMsgCallback,
4141
JSONCodec,
42-
millis,
43-
Nanos,
4442
nanos,
4543
NatsConnectionImpl,
4644
NatsError,
@@ -51,7 +49,6 @@ import {
5149
StringCodec,
5250
} from "../nats-base-client/internal_mod.ts";
5351
import {
54-
assertArrayIncludes,
5552
assertEquals,
5653
assertRejects,
5754
assertThrows,

Diff for: tests/kv_test.ts

+36
Original file line numberDiff line numberDiff line change
@@ -1271,3 +1271,39 @@ Deno.test("kv - watch init callback exceptions terminate the iterator", async ()
12711271
assertEquals(err.message, "crash");
12721272
await cleanup(ns, nc);
12731273
});
1274+
1275+
Deno.test("kv - get revision", async () => {
1276+
const { ns, nc } = await setup(
1277+
jetstreamServerConf({}, true),
1278+
);
1279+
const js = nc.jetstream();
1280+
const sc = StringCodec();
1281+
1282+
const b = await js.views.kv(nuid.next(), { history: 3 }) as Bucket;
1283+
1284+
async function check(key: string, value: string | null, revision = 0) {
1285+
const e = await b.get(key, { revision });
1286+
if (value === null) {
1287+
assertEquals(e, null);
1288+
} else {
1289+
assertEquals(sc.decode(e!.value), value);
1290+
}
1291+
}
1292+
1293+
await b.put("A", sc.encode("a"));
1294+
await b.put("A", sc.encode("b"));
1295+
await b.put("A", sc.encode("c"));
1296+
1297+
// expect null, as sequence 1, holds "A"
1298+
await check("B", null, 1);
1299+
1300+
await check("A", "c");
1301+
await check("A", "a", 1);
1302+
await check("A", "b", 2);
1303+
1304+
await b.put("A", sc.encode("d"));
1305+
await check("A", "d");
1306+
await check("A", null, 1);
1307+
1308+
await cleanup(ns, nc);
1309+
});

0 commit comments

Comments
 (0)