Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] fixed an issue where the correct info for an object entry was not calculated #358

Merged
merged 1 commit into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 24 additions & 53 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,23 @@ export class ObjectStoreImpl implements ObjectStore {
}
}

async _si(
opts?: Partial<StreamInfoRequestOptions>,
): Promise<StreamInfo | null> {
try {
const info = await this.jsm.streams.info(this.stream, opts);
return info;
} catch (err) {
const nerr = err as NatsError;
if (nerr.code === "404") {
return null;
}
return Promise.reject(err);
}
}

async seal(): Promise<ObjectStoreInfo> {
let info = await this.jsm.streams.info(this.stream);
let info = await this._si();
if (info === null) {
return Promise.reject(new Error("object store not found"));
}
Expand All @@ -273,7 +288,7 @@ export class ObjectStoreImpl implements ObjectStore {
async status(
opts?: Partial<StreamInfoRequestOptions>,
): Promise<ObjectStoreInfo> {
const info = await this.jsm.streams.info(this.stream, opts);
const info = await this._si(opts);
if (info === null) {
return Promise.reject(new Error("object store not found"));
}
Expand Down Expand Up @@ -332,14 +347,15 @@ export class ObjectStoreImpl implements ObjectStore {
sha.update(payload);
info.chunks!++;
info.size! += payload.length;
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
info.digest = `sha-256=${digest}${padding}`;
info.deleted = false;
proms.push(this.js.publish(chunkSubj, payload));
}
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
info.digest = `sha-256=${digest}${padding}`;
info.deleted = false;

// trailing md for the object
const h = headers();
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
Expand Down Expand Up @@ -685,48 +701,3 @@ export class ObjectStoreImpl implements ObjectStore {
return Promise.resolve(os);
}
}

class Base64Codec {
static encode(bytes: string | Uint8Array): string {
if (typeof bytes === "string") {
return btoa(bytes);
}
const a = Array.from(bytes);
return btoa(String.fromCharCode(...a));
}

static decode(s: string, binary = false): Uint8Array | string {
const bin = atob(s);
if (!binary) {
return bin;
}
const bytes = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) {
bytes[i] = bin.charCodeAt(i);
}
return bytes;
}
}

class Base64UrlCodec {
static encode(bytes: string | Uint8Array): string {
return Base64UrlCodec.toB64URLEncoding(Base64Codec.encode(bytes));
}

static decode(s: string, binary = false): Uint8Array | string {
return Base64Codec.decode(Base64UrlCodec.fromB64URLEncoding(s), binary);
}

static toB64URLEncoding(b64str: string): string {
b64str = b64str.replace(/=/g, "");
b64str = b64str.replace(/\+/g, "-");
return b64str.replace(/\//g, "_");
}

static fromB64URLEncoding(b64str: string): string {
// pads are % 4, but not necessary on decoding
b64str = b64str.replace(/_/g, "/");
b64str = b64str.replace(/-/g, "+");
return b64str;
}
}
135 changes: 132 additions & 3 deletions tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ import {
} from "https://deno.land/[email protected]/testing/asserts.ts";
import { DataBuffer } from "../nats-base-client/databuffer.ts";
import { crypto } from "https://deno.land/[email protected]/crypto/mod.ts";
import { headers, StorageType, StringCodec } from "../nats-base-client/mod.ts";
import {
Empty,
headers,
StorageType,
StringCodec,
} from "../nats-base-client/mod.ts";
import { assertRejects } from "https://deno.land/[email protected]/testing/asserts.ts";
import { equals } from "https://deno.land/[email protected]/bytes/mod.ts";
import { ObjectInfo, ObjectStoreMeta } from "../nats-base-client/types.ts";
import { SHA256 } from "../nats-base-client/sha256.js";

function readableStreamFrom(data: Uint8Array): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -72,6 +78,15 @@ function makeData(n: number): Uint8Array {
return data;
}

function digest(data: Uint8Array): string {
const sha = new SHA256();
sha.update(data);
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
return `sha-256=${digest}${padding}`;
}

Deno.test("objectstore - basics", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
Expand All @@ -84,11 +99,23 @@ Deno.test("objectstore - basics", async () => {
const js = nc.jetstream();
const os = await js.views.os("OBJS", { description: "testing" });

const oi = await os.put({ name: "BLOB" }, readableStreamFrom(blob));
const info = await os.status() as ObjectStoreInfoImpl;
assertEquals(info.description, "testing");
assertEquals(info.ttl, 0);
assertEquals(info.replicas, 1);
assertEquals(info.streamInfo.config.name, "OBJ_OBJS");

const oi = await os.put(
{ name: "BLOB", description: "myblob" },
readableStreamFrom(blob),
);
assertEquals(oi.bucket, "OBJS");
assertEquals(oi.nuid.length, 22);
assertEquals(oi.name, "BLOB");
// assert(1000 > (Date.now() - millis(oi.mtime)));
assertEquals(oi.digest, digest(blob));
assertEquals(oi.description, "myblob");
assertEquals(oi.deleted, false);
assert(typeof oi.mtime === "string");

const jsm = await nc.jetstreamManager();
const si = await jsm.streams.info("OBJ_OBJS");
Expand Down Expand Up @@ -159,6 +186,7 @@ Deno.test("objectstore - chunked content", async () => {
);

const d = await os.get("blob");
assertEquals(d!.info.digest, digest(data));
const vv = await fromReadableStream(d!.data);
equals(vv, data);

Expand Down Expand Up @@ -347,6 +375,8 @@ Deno.test("objectstore - empty entry", async () => {
);
assertEquals(oi.nuid.length, 22);
assertEquals(oi.name, "empty");
assertEquals(oi.digest, digest(new Uint8Array(0)));
assertEquals(oi.chunks, 0);

const or = await os.get("empty");
assert(or !== null);
Expand Down Expand Up @@ -670,3 +700,102 @@ Deno.test("objectstore - sanitize", async () => {
//
// await nc.close();
// });

Deno.test("objectstore - partials", async () => {
const { ns, nc } = await setup(jetstreamServerConf({
max_payload: 1024 * 1024,
}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const os = await js.views.os("test");
const sc = StringCodec();

const data = sc.encode("".padStart(7, "a"));

const info = await os.put(
{ name: "test", options: { max_chunk_size: 2 } },
readableStreamFrom(data),
);
assertEquals(info.chunks, 4);
assertEquals(info.digest, digest(data));

const rs = await os.get("test");
const reader = rs!.data.getReader();
let i = 0;
while (true) {
i++;
const { done, value } = await reader.read();
if (done) {
assertEquals(i, 5);
break;
}
if (i === 4) {
assertEquals(value!.length, 1);
} else {
assertEquals(value!.length, 2);
}
}
await cleanup(ns, nc);
});

Deno.test("objectstore - no store", async () => {
const { ns, nc } = await setup(jetstreamServerConf({
max_payload: 1024 * 1024,
}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const os = await js.views.os("test");
await os.put({ name: "test" }, readableStreamFrom(Empty));
await os.delete("test");
const oi = await os.info("test");
await assertRejects(
async () => {
await os.link("bar", oi!);
},
Error,
"object is deleted",
);

const r = await os.delete("foo");
assertEquals(r, { purged: 0, success: false });

await assertRejects(
async () => {
await os.update("baz", oi!);
},
Error,
"object not found",
);

const jsm = await nc.jetstreamManager();
await jsm.streams.delete("OBJ_test");
await assertRejects(
async () => {
await os.seal();
},
Error,
"object store not found",
);

await assertRejects(
async () => {
await os.status();
},
Error,
"object store not found",
);

await assertRejects(
async () => {
await os.put({ name: "foo" }, readableStreamFrom(Empty));
},
Error,
"stream not found",
);

await cleanup(ns, nc);
});