Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: limit bulk API requests to batches of 5,000 #554

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/angry-countries-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"wrangler": patch
---

fix: limit bulk put API requests to batches of 5,000

The `kv:bulk put` command now batches up put requests in groups of 5,000,
displaying progress for each request.
8 changes: 8 additions & 0 deletions .changeset/smooth-tables-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"wrangler": patch
---

fix: limit bulk delete API requests to batches of 5,000

The `kv:bulk delete` command now batches up delete requests in groups of 5,000,
displaying progress for each request.
67 changes: 61 additions & 6 deletions packages/wrangler/src/__tests__/kv.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,12 @@ describe("wrangler", () => {
requests.count++;
expect(accountId).toEqual("some-account-id");
expect(namespaceId).toEqual(expectedNamespaceId);
expect(JSON.parse(body as string)).toEqual(expectedKeyValues);
expect(JSON.parse(body as string)).toEqual(
expectedKeyValues.slice(
(requests.count - 1) * 5000,
requests.count * 5000
)
);
return null;
}
);
Expand All @@ -1150,7 +1155,29 @@ describe("wrangler", () => {
`kv:bulk put --namespace-id some-namespace-id keys.json`
);
expect(requests.count).toEqual(1);
expect(std.out).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`"Success!"`);
expect(std.warn).toMatchInlineSnapshot(`""`);
expect(std.err).toMatchInlineSnapshot(`""`);
});

it("should put the key-values in batches of 5000 parsed from a file", async () => {
const keyValues: KeyValue[] = new Array(12000).fill({
key: "someKey1",
value: "someValue1",
});
writeFileSync("./keys.json", JSON.stringify(keyValues));
const requests = mockPutRequest("some-namespace-id", keyValues);
await runWrangler(
`kv:bulk put --namespace-id some-namespace-id keys.json`
);
expect(requests.count).toEqual(3);
expect(std.out).toMatchInlineSnapshot(`
"Uploaded 0 of 12000.
Copy link
Contributor

@Electroid Electroid Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should land this, but a potential followup could be a percentage?
Something like: Uploaded 7% (1.5k out of 100.3k)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an issue to track this #555

Uploaded 5000 of 12000.
Uploaded 10000 of 12000.
Uploaded 12000 of 12000.
Success!"
`);
expect(std.warn).toMatchInlineSnapshot(`""`);
expect(std.err).toMatchInlineSnapshot(`""`);
});
Expand Down Expand Up @@ -1222,7 +1249,12 @@ describe("wrangler", () => {
expect(new Headers(headers ?? []).get("Content-Type")).toEqual(
"application/json"
);
expect(JSON.parse(body as string)).toEqual(expectedKeys);
expect(JSON.parse(body as string)).toEqual(
expectedKeys.slice(
(requests.count - 1) * 5000,
requests.count * 5000
)
);
return null;
}
);
Expand All @@ -1246,6 +1278,29 @@ describe("wrangler", () => {
expect(std.err).toMatchInlineSnapshot(`""`);
});

it("should delete the keys in batches of 5000 parsed from a file", async () => {
const keys = new Array(12000).fill("some-key");
writeFileSync("./keys.json", JSON.stringify(keys));
mockConfirm({
text: `Are you sure you want to delete all the keys read from "keys.json" from kv-namespace with id "some-namespace-id"?`,
result: true,
});
const requests = mockDeleteRequest("some-namespace-id", keys);
await runWrangler(
`kv:bulk delete --namespace-id some-namespace-id keys.json`
);
expect(requests.count).toEqual(3);
expect(std.out).toMatchInlineSnapshot(`
"Deleted 0 of 12000.
Deleted 5000 of 12000.
Deleted 10000 of 12000.
Deleted 12000 of 12000.
Success!"
`);
expect(std.warn).toMatchInlineSnapshot(`""`);
expect(std.err).toMatchInlineSnapshot(`""`);
});

it("should not delete the keys if the user confirms no", async () => {
const keys = ["someKey1", "ns:someKey2"];
writeFileSync("./keys.json", JSON.stringify(keys));
Expand Down Expand Up @@ -1323,9 +1378,9 @@ describe("wrangler", () => {
).rejects.toThrowErrorMatchingInlineSnapshot(`
"Unexpected JSON input from \\"keys.json\\".
Expected an array of strings.
The item at index 1 is a number: 12354
The item at index 2 is a object: [object Object]
The item at index 3 is a object: null"
The item at index 1 is type: \\"number\\" - 12354
The item at index 2 is type: \\"object\\" - {\\"key\\":\\"someKey\\"}
The item at index 3 is type: \\"object\\" - null"
`);
expect(std.out).toMatchInlineSnapshot(`""`);
expect(std.warn).toMatchInlineSnapshot(`""`);
Expand Down
24 changes: 21 additions & 3 deletions packages/wrangler/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2031,7 +2031,16 @@ export async function main(argv: string[]): Promise<void> {
}

const accountId = await requireAuth(config);
await putBulkKeyValue(accountId, namespaceId, content);
await putBulkKeyValue(
accountId,
namespaceId,
content,
(index, total) => {
console.log(`Uploaded ${index} of ${total}.`);
}
);

console.log("Success!");
}
)
.command(
Expand Down Expand Up @@ -2098,7 +2107,9 @@ export async function main(argv: string[]): Promise<void> {
const key = content[i];
if (typeof key !== "string") {
errors.push(
`The item at index ${i} is a ${typeof key}: ${key}`
`The item at index ${i} is type: "${typeof key}" - ${JSON.stringify(
key
)}`
);
}
}
Expand All @@ -2112,7 +2123,14 @@ export async function main(argv: string[]): Promise<void> {

const accountId = await requireAuth(config);

await deleteBulkKeyValue(accountId, namespaceId, content);
await deleteBulkKeyValue(
accountId,
namespaceId,
content,
(index, total) => {
console.log(`Deleted ${index} of ${total}.`);
}
);

console.log("Success!");
}
Expand Down
58 changes: 42 additions & 16 deletions packages/wrangler/src/kv.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import { URLSearchParams } from "node:url";
import { fetchListResult, fetchResult, fetchKVGetValue } from "./cfetch";
import type { Config, Environment } from "./config";

/** The largest number of kv items we can pass to the API in a single request. */
const API_MAX = 10000;
// The const below are halved from the API's true capacity to help avoid
// hammering it with large requests.
const BATCH_KEY_MAX = API_MAX / 2;

type KvArgs = {
binding?: string;
"namespace-id"?: string;
Expand Down Expand Up @@ -163,31 +169,51 @@ export async function getKeyValue(
export async function putBulkKeyValue(
accountId: string,
namespaceId: string,
keyValues: KeyValue[]
keyValues: KeyValue[],
progressCallback: (index: number, total: number) => void
) {
return await fetchResult(
`/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/bulk`,
{
method: "PUT",
body: JSON.stringify(keyValues),
headers: { "Content-Type": "application/json" },
for (let index = 0; index < keyValues.length; index += BATCH_KEY_MAX) {
if (progressCallback && keyValues.length > BATCH_KEY_MAX) {
progressCallback(index, keyValues.length);
}
);

await fetchResult(
`/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/bulk`,
{
method: "PUT",
body: JSON.stringify(keyValues.slice(index, index + BATCH_KEY_MAX)),
headers: { "Content-Type": "application/json" },
}
);
}
if (progressCallback && keyValues.length > BATCH_KEY_MAX) {
progressCallback(keyValues.length, keyValues.length);
}
}

export async function deleteBulkKeyValue(
accountId: string,
namespaceId: string,
keys: string[]
keys: string[],
progressCallback: (index: number, total: number) => void
) {
return await fetchResult(
`/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/bulk`,
{
method: "DELETE",
body: JSON.stringify(keys),
headers: { "Content-Type": "application/json" },
for (let index = 0; index < keys.length; index += BATCH_KEY_MAX) {
if (progressCallback && keys.length > BATCH_KEY_MAX) {
progressCallback(index, keys.length);
}
);

await fetchResult(
`/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/bulk`,
{
method: "DELETE",
body: JSON.stringify(keys.slice(index, index + BATCH_KEY_MAX)),
headers: { "Content-Type": "application/json" },
}
);
}
if (progressCallback && keys.length > BATCH_KEY_MAX) {
progressCallback(keys.length, keys.length);
}
}

export function getNamespaceId(
Expand Down
2 changes: 1 addition & 1 deletion packages/wrangler/src/sites.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ export async function syncAssets(
}
manifest[path.relative(siteAssets.baseDirectory, file)] = assetKey;
}
await putBulkKeyValue(accountId, namespace, upload);
await putBulkKeyValue(accountId, namespace, upload, () => {});
return { manifest, namespace };
}

Expand Down