Skip to content

Commit

Permalink
[BREAKING] [FIX] ObjectStore implementations prior to this one used a…
Browse files Browse the repository at this point in the history
… sha256 library that generated incorrect digests for files 512MB and larger. These digests prevent other tools from accessing objects `put` by the JavaScript ObjectStore implementation. Included in the fix is a migration tool that walks the specified objectstore and calculates hashes, if found to be a match for the bad-hashes, it updates the hash in the metadata entry for the object.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Feb 7, 2025
1 parent edc3dbe commit 130e062
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 21 deletions.
58 changes: 44 additions & 14 deletions bin/fix-os-hashes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
import { parse } from "https://deno.land/[email protected]/flags/mod.ts";
import { ObjectStoreImpl, ServerObjectInfo } from "../jetstream/objectstore.ts";
import {
Base64UrlPaddedCodec,
connect,
ConnectionOptions,
credsAuthenticator,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/main/src/mod.ts";
import { Base64UrlPaddedCodec } from "../nats-base-client/base64.ts";
import {
SHA256 as BAD_SHA256,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/refs/tags/v1.29.1/nats-base-client/sha256.js";
import { consumerOpts } from "../jetstream/internal_mod.ts";
import { consumerOpts } from "../jetstream/mod.ts";
import { sha256 } from "../nats-base-client/js-sha256.js";
import { checkSha256, parseSha256 } from "../jetstream/sha_digest.parser.ts";

const argv = parse(
Deno.args,
Expand Down Expand Up @@ -54,9 +55,9 @@ if (argv.h || argv.help) {
"\nThis tool fixes metadata entries in an object store that were written",
);
console.log(
"with recalculated hashes. Please backup your object stores",
"with hashes that were calculated incorrectly due to a bug in the sha256 library.",
);
console.log("before using this tool.");
console.log("Please backup your object stores before using this tool.");

Deno.exit(1);
}
Expand Down Expand Up @@ -95,10 +96,32 @@ async function fixDigests(os: ObjectStoreImpl): Promise<void> {
let fixes = 0;
const entries = await os.list();
for (const entry of entries) {
if (!entry.digest.startsWith("SHA-256=")) {
console.error(
`ignoring entry ${entry.name} - unknown objectstore digest:`,
entry.digest,
);
continue;
}
// plain digest string
const digest = entry.digest.substring(8);
const parsedDigest = parseSha256(digest);
if (parsedDigest === null) {
console.error(
`ignoring entry ${entry.name} - unable to parse digest:`,
digest,
);
continue;
}

const badSha = new BAD_SHA256();
const sha = sha256.create();
let badHash = new Uint8Array(0);
let hash = new Uint8Array(0);

const oc = consumerOpts();
oc.orderedConsumer();

const subj = `$O.${os.name}.C.${entry.nuid}`;
let needsFixing = false;

Expand All @@ -109,19 +132,26 @@ async function fixDigests(os: ObjectStoreImpl): Promise<void> {
sha.update(m.data);
}
if (m.info.pending === 0) {
const hash = sha.digest();
const badHash = badSha.digest();
for (let i = 0; i < hash.length; i++) {
if (hash[i] !== badHash[i]) {
needsFixing = true;
fixes++;
break;
}
}
badHash = badSha.digest();
hash = sha.digest();
break;
}
}
sub.unsubscribe();

if (checkSha256(parsedDigest, badHash)) {
// this one could be bad
if (!checkSha256(badHash, hash)) {
console.log(
`[WARN] entry ${entry.name} has a bad hash: ${
Base64UrlPaddedCodec.encode(badHash)
} - should be ${Base64UrlPaddedCodec.encode(hash)}`,
);
needsFixing = true;
fixes++;
}
}

if (argv.check) {
continue;
}
Expand All @@ -131,7 +161,7 @@ async function fixDigests(os: ObjectStoreImpl): Promise<void> {
last_by_subj: metaSubject,
});
const info = m.json<ServerObjectInfo>();
const digest = Base64UrlPaddedCodec.encode(sha.digest());
const digest = Base64UrlPaddedCodec.encode(hash);
info.digest = `SHA-256=${digest}`;
try {
await js.publish(metaSubject, JSON.stringify(info));
Expand Down
15 changes: 12 additions & 3 deletions jetstream/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
} from "./jsapi_types.ts";
import { JsMsg } from "./jsmsg.ts";
import { PubHeaders } from "./jsclient.ts";
import { checkSha256, parseSha256 } from "./sha_digest.parser.ts";

export const osPrefix = "OBJ_";
export const digestType = "SHA-256=";
Expand Down Expand Up @@ -525,6 +526,16 @@ export class ObjectStoreImpl implements ObjectStore {
return os.get(ln);
}

if (!info.digest.startsWith(digestType)) {
return Promise.reject(new Error(`unknown digest type: ${info.digest}`));
}
const digest = parseSha256(info.digest.substring(8));
if (digest === null) {
return Promise.reject(
new Error(`unable to parse digest: ${info.digest}`),
);
}

const d = deferred<Error | null>();

const r: Partial<ObjectResult> = {
Expand All @@ -551,9 +562,7 @@ export class ObjectStoreImpl implements ObjectStore {
controller!.enqueue(jm.data);
}
if (jm.info.pending === 0) {
const hash = Base64UrlPaddedCodec.encode(sha.digest());
const digest = `${digestType}${hash}`;
if (digest !== info.digest) {
if (!checkSha256(digest, sha.digest())) {
controller!.error(
new Error(
`received a corrupt object, digests do not match received: ${info.digest} calculated ${digest}`,
Expand Down
104 changes: 104 additions & 0 deletions jetstream/sha_digest.parser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2024 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export function parseSha256(s: string): Uint8Array | null {
return toByteArray(s);
}

function isHex(s: string): boolean {
// contains valid hex characters only
const hexRegex = /^[0-9A-Fa-f]+$/;
if (!hexRegex.test(s)) {
// non-hex characters
return false;
}

// check for mixed-case strings - paranoid base64 sneaked in
const isAllUpperCase = /^[0-9A-F]+$/.test(s);
const isAllLowerCase = /^[0-9a-f]+$/.test(s);
if (!(isAllUpperCase || isAllLowerCase)) {
return false;
}

// ensure the input string length is even
return s.length % 2 === 0;
}

function isBase64(s: string): boolean {
// test for padded or normal base64
return /^[A-Za-z0-9\-_]*(={0,2})?$/.test(s) ||
/^[A-Za-z0-9+/]*(={0,2})?$/.test(s);
}

function detectEncoding(input: string): "hex" | "b64" | "" {
// hex is more reliable to flush out...
if (isHex(input)) {
return "hex";
} else if (isBase64(input)) {
return "b64";
}
return "";
}

function hexToByteArray(s: string): Uint8Array {
if (s.length % 2 !== 0) {
throw new Error("hex string must have an even length");
}
const a = new Uint8Array(s.length / 2);
for (let i = 0; i < s.length; i += 2) {
// parse hex two chars at a time
a[i / 2] = parseInt(s.substring(i, i + 2), 16);
}
return a;
}

function base64ToByteArray(s: string): Uint8Array {
// could be url friendly
s = s.replace(/-/g, "+");
s = s.replace(/_/g, "/");
const sbin = atob(s);
return Uint8Array.from(sbin, (c) => c.charCodeAt(0));
}

function toByteArray(input: string): Uint8Array | null {
const encoding = detectEncoding(input);
switch (encoding) {
case "hex":
return hexToByteArray(input);
case "b64":
return base64ToByteArray(input);
}
return null;
}

export function checkSha256(
a: string | Uint8Array,
b: string | Uint8Array,
): boolean {
const aBytes = typeof a === "string" ? parseSha256(a) : a;
const bBytes = typeof b === "string" ? parseSha256(b) : b;
if (aBytes === null || bBytes === null) {
return false;
}
if (aBytes.length !== bBytes.length) {
return false;
}
for (let i = 0; i < aBytes.length; i++) {
if (aBytes[i] !== bBytes[i]) {
return false;
}
}
return true;
}
2 changes: 1 addition & 1 deletion jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ Deno.test("ordered consumers - consume reset", async () => {
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;

// after the first message others will get published
let iter = await c.consume({ max_messages: 11, expires: 5000 });
const iter = await c.consume({ max_messages: 11, expires: 5000 });
countResets(iter).catch();
const sequences = [];
for await (const m of iter) {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2166,7 +2166,7 @@ Deno.test("kv - watcher on server restart", async () => {
});

Deno.test("kv - maxBucketSize doesn't override max_bytes", async () => {
let { ns, nc } = await setup(
const { ns, nc } = await setup(
jetstreamServerConf({}),
);
const js = nc.jetstream();
Expand Down
4 changes: 2 additions & 2 deletions tests/auth_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ Deno.test("auth - sub permission queue", async () => {
const qA = deferred();
nc.subscribe("q", {
queue: "A",
callback: (err, msg) => {
callback: (err, _msg) => {
if (err) {
qA.reject(err);
}
Expand All @@ -1283,7 +1283,7 @@ Deno.test("auth - sub permission queue", async () => {
const qBad = deferred<NatsError>();
nc.subscribe("q", {
queue: "bad",
callback: (err, msg) => {
callback: (err, _msg) => {
if (err) {
qBad.resolve(err);
}
Expand Down

0 comments on commit 130e062

Please sign in to comment.