Skip to content

Commit

Permalink
Cache API tests (#447)
Browse files Browse the repository at this point in the history
* Respect `cf.cacheKey` in Cache API

* Fix caching of `ReadableStream`-bodied `Response`s

* Propagate `clock` through `GatewayFactory` for tests

* Catch loopback handler errors and return 500 response

* Add tests for `_getRangeResponse` and `_parseRanges` helpers

* Add `Miniflare` integration test helper

* Bump `undici` to `5.13.0` and add `@cloudflare/workers-types`

* Add Cache API tests
  • Loading branch information
mrbbot committed Nov 1, 2023
1 parent 7be9637 commit cde71d5
Show file tree
Hide file tree
Showing 13 changed files with 930 additions and 46 deletions.
3 changes: 2 additions & 1 deletion packages/miniflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
"kleur": "^4.1.5",
"source-map-support": "0.5.21",
"stoppable": "^1.1.0",
"undici": "^5.12.0",
"undici": "^5.13.0",
"workerd": "^1.20221111.5",
"ws": "^8.11.0",
"youch": "^3.2.2",
"zod": "^3.18.0"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20221111.1",
"@types/better-sqlite3": "^7.6.2",
"@types/debug": "^4.1.7",
"@types/estree": "^1.0.0",
Expand Down
47 changes: 29 additions & 18 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
serializeConfig,
} from "./runtime";
import {
Clock,
HttpError,
Log,
MiniflareCoreError,
Expand All @@ -54,6 +55,7 @@ import {
OptionalZodTypeOf,
UnionToIntersection,
ValueOf,
defaultClock,
} from "./shared";
import { anyAbortSignal } from "./shared/signal";
import { waitForRequest } from "./wait";
Expand Down Expand Up @@ -157,6 +159,7 @@ export class Miniflare {
#sharedOpts: PluginSharedOptions;
#workerOpts: PluginWorkerOptions[];
#log: Log;
readonly #clock: Clock;

readonly #runtimeConstructor: RuntimeConstructor;
#runtime?: Runtime;
Expand Down Expand Up @@ -201,6 +204,7 @@ export class Miniflare {
this.#sharedOpts = sharedOpts;
this.#workerOpts = workerOpts;
this.#log = this.#sharedOpts.core.log ?? new NoOpLog();
this.#clock = this.#sharedOpts.core.clock ?? defaultClock;
this.#initPlugins();

// Get supported shell for executing runtime binary
Expand All @@ -220,6 +224,7 @@ export class Miniflare {
if (plugin.gateway !== undefined && plugin.router !== undefined) {
const gatewayFactory = new GatewayFactory<any>(
this.#log,
this.#clock,
this.#sharedOpts.core.cloudflareFetch,
key,
plugin.gateway,
Expand Down Expand Up @@ -350,24 +355,30 @@ export class Miniflare {
});

let response: Response | undefined;
const customService = request.headers.get(HEADER_CUSTOM_SERVICE);
if (customService !== null) {
response = await this.#handleLoopbackCustomService(
request,
customService
);
} else if (url.pathname === "/core/error") {
const workerSrcOpts = this.#workerOpts.map<SourceOptions>(
({ core }) => core
);
response = await handlePrettyErrorRequest(
this.#log,
workerSrcOpts,
request
);
} else {
// TODO: check for proxying/outbound fetch header first (with plans for fetch mocking)
response = await this.#handleLoopbackPlugins(request, url);
try {
const customService = request.headers.get(HEADER_CUSTOM_SERVICE);
if (customService !== null) {
response = await this.#handleLoopbackCustomService(
request,
customService
);
} else if (url.pathname === "/core/error") {
const workerSrcOpts = this.#workerOpts.map<SourceOptions>(
({ core }) => core
);
response = await handlePrettyErrorRequest(
this.#log,
workerSrcOpts,
request
);
} else {
// TODO: check for proxying/outbound fetch header first (with plans for fetch mocking)
response = await this.#handleLoopbackPlugins(request, url);
}
} catch (e: any) {
this.#log.error(e);
res.writeHead(500);
return res.end(e?.stack ?? String(e));
}

if (response === undefined) {
Expand Down
35 changes: 22 additions & 13 deletions packages/miniflare/src/plugins/cache/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from "assert";
import crypto from "crypto";
import http from "http";
import { AddressInfo } from "net";
Expand Down Expand Up @@ -174,12 +175,13 @@ class HttpParser {
});
}
private listen(request: http.IncomingMessage, response: http.ServerResponse) {
if (request.url) {
response?.socket?.write(
this.responses.get(request.url) ?? new Uint8Array()
);
}
response.end();
assert(request.url !== undefined);
assert(response.socket !== null);
const array = this.responses.get(request.url);
assert(array !== undefined);
// Write response to parse directly to underlying socket
response.socket.write(array);
response.socket.end();
}
public async parse(response: Uint8Array): Promise<ParsedHttpResponse> {
await this.connected;
Expand Down Expand Up @@ -210,11 +212,12 @@ export class CacheGateway {
private readonly clock: Clock
) {}

async match(request: Request): Promise<Response> {
async match(request: Request, cacheKey?: string): Promise<Response> {
// Never cache Workers Sites requests, so we always return on-disk files
if (isSitesRequest(request)) throw new CacheMiss();

const cached = await this.storage.get<CacheMetadata>(request.url);
cacheKey ??= request.url;
const cached = await this.storage.get<CacheMetadata>(cacheKey);
if (cached?.metadata === undefined) throw new CacheMiss();

const response = new CacheResponse(
Expand All @@ -231,11 +234,15 @@ export class CacheGateway {
);
}

async put(request: Request, value: ArrayBuffer): Promise<Response> {
async put(
request: Request,
value: Uint8Array,
cacheKey?: string
): Promise<Response> {
// Never cache Workers Sites requests, so we always return on-disk files
if (isSitesRequest(request)) return new Response(null, { status: 204 });

const response = await HttpParser.get().parse(new Uint8Array(value));
const response = await HttpParser.get().parse(value);

const { storable, expiration, headers } = getExpiration(
this.clock,
Expand All @@ -249,7 +256,8 @@ export class CacheGateway {
throw new StorageFailure();
}

await this.storage.put<CacheMetadata>(request.url, {
cacheKey ??= request.url;
await this.storage.put<CacheMetadata>(cacheKey, {
value: response.body,
expiration: millisToSeconds(this.clock() + expiration),
metadata: {
Expand All @@ -260,8 +268,9 @@ export class CacheGateway {
return new Response(null, { status: 204 });
}

async delete(request: Request): Promise<Response> {
const deleted = await this.storage.delete(request.url);
async delete(request: Request, cacheKey?: string): Promise<Response> {
cacheKey ??= request.url;
const deleted = await this.storage.delete(cacheKey);
// This is an extremely vague error, but it fits with what the cache API in workerd expects
if (!deleted) throw new PurgeFailure();
return new Response(null);
Expand Down
1 change: 1 addition & 0 deletions packages/miniflare/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ export const CACHE_PLUGIN: Plugin<
};

export * from "./gateway";
export * from "./range";
109 changes: 104 additions & 5 deletions packages/miniflare/src/plugins/cache/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
PUT,
RouteHandler,
Router,
decodeCfBlob,
decodePersist,
} from "../shared";
import { HEADER_CACHE_WARN_USAGE } from "./constants";
Expand All @@ -23,6 +24,98 @@ function decodeNamespace(headers: Headers) {
return namespace === null ? `default` : `named:${namespace}`;
}

const CR = "\r".charCodeAt(0);
const LF = "\n".charCodeAt(0);

// Remove `Transfer-Encoding: chunked` header from the HTTP `message` if it
// exists. Why do we need to do this?
//
// With the following code:
//
// ```js
// const { readable, writable } = new IdentityTransformStream();
// const encoder = new TextEncoder();
// const writer = writable.getWriter();
// void writer.write(encoder.encode("hello"));
// void writer.write(encoder.encode("world"));
// void writer.close();
// const response = new Response(readable, {
// headers: { "Cache-Control": "max-age=3600" },
// });
// await caches.default.put(key, response);
// ```
//
// ...the Miniflare loopback server will receive the following HTTP request:
//
// ```http
// PUT / HTTP/1.1
// Transfer-Encoding: chunked
// Host: localhost
//
// 4c
// HTTP/1.1 200 OK
// Transfer-Encoding: chunked
// Cache-Control: max-age=3600
//
//
// 5
// hello
// 5
// world
// 0
// ```
//
// The body of this request (what the `body` variable here stores) will be:
//
// ```http
// HTTP/1.1 200 OK
// Transfer-Encoding: chunked
// Cache-Control: max-age=3600
//
//
// helloworld
// ```
//
// ...which is invalid `chunked` `Transfer-Encoding`
// (https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#directives)
// as there aren't any chunk lengths. This is working as intended, as the
// internal version of the cache gateway API wants responses in this format.
// However, the `llhttp` (https://github.com/nodejs/llhttp) parser used by
// `undici` will throw on receiving this.
//
// Therefore, we just remove the `Transfer-Encoding: chunked` header. We never
// reuse sockets when parsing HTTP requests, so we don't need to worry about
// delimiting HTTP messages.
function removeTransferEncodingChunked(message: Buffer): Buffer {
// Split headers from the body by looking for the first instance of
// "\r\n\r\n" signifying end-of-headers
const endOfHeadersIndex = message.findIndex(
(_value, index) =>
message[index] === CR &&
message[index + 1] === LF &&
message[index + 2] === CR &&
message[index + 3] === LF
);
if (endOfHeadersIndex !== -1) {
// `subarray` returns a new `Buffer` that references the original memory
const headers = message.subarray(0, endOfHeadersIndex).toString();
// Try to remove case-insensitive `Transfer-Encoding: chunked` header.
// Might be last header so may not have trailing "\r\n" (only `subarray`ing)
// up to "\r\n\r\n", so match "\r\n" at the start.
const replaced = headers.replace(/\r\nTransfer-Encoding: chunked/i, "");
if (headers.length !== replaced.length) {
// If we removed something, replace the message with a concatenation of
// the new headers and the body
message = Buffer.concat([
Buffer.from(replaced),
message.subarray(endOfHeadersIndex),
]);
}
}
return message;
}

// noinspection DuplicatedCode
export class CacheRouter extends Router<CacheGateway> {
#warnedUsage = false;
#maybeWarnUsage(headers: Headers) {
Expand All @@ -40,8 +133,10 @@ export class CacheRouter extends Router<CacheGateway> {
const uri = decodeURIComponent(params.uri);
const namespace = decodeNamespace(req.headers);
const persist = decodePersist(req.headers);
const cf = decodeCfBlob(req.headers);
const gateway = this.gatewayFactory.get(namespace, persist);
return fallible(gateway.match(new Request(uri, req as RequestInit)));
const key = new Request(uri, req as RequestInit);
return fallible(gateway.match(key, cf.cacheKey));
};

@PUT("/:uri")
Expand All @@ -50,10 +145,12 @@ export class CacheRouter extends Router<CacheGateway> {
const uri = decodeURIComponent(params.uri);
const namespace = decodeNamespace(req.headers);
const persist = decodePersist(req.headers);
const cf = decodeCfBlob(req.headers);
const gateway = this.gatewayFactory.get(namespace, persist);
return fallible(
gateway.put(new Request(uri, req as RequestInit), await req.arrayBuffer())
);
const bodyBuffer = Buffer.from(await req.arrayBuffer());
const bodyArray = new Uint8Array(removeTransferEncodingChunked(bodyBuffer));
const key = new Request(uri, { ...(req as RequestInit), body: undefined });
return fallible(gateway.put(key, bodyArray, cf.cacheKey));
};

@PURGE("/:uri")
Expand All @@ -62,7 +159,9 @@ export class CacheRouter extends Router<CacheGateway> {
const uri = decodeURIComponent(params.uri);
const namespace = decodeNamespace(req.headers);
const persist = decodePersist(req.headers);
const cf = decodeCfBlob(req.headers);
const gateway = this.gatewayFactory.get(namespace, persist);
return fallible(gateway.delete(new Request(uri, req as RequestInit)));
const key = new Request(uri, req as RequestInit);
return fallible(gateway.delete(key, cf.cacheKey));
};
}
7 changes: 6 additions & 1 deletion packages/miniflare/src/plugins/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { getCacheServiceName } from "../cache";
import {
BINDING_SERVICE_LOOPBACK,
CloudflareFetchSchema,
HEADER_CF_BLOB,
Plugin,
} from "../shared";
import { HEADER_ERROR_STACK } from "./errors";
Expand Down Expand Up @@ -64,6 +65,7 @@ export const CoreSharedOptionsSchema = z.object({
verbose: z.boolean().optional(),

log: z.instanceof(Log).optional(),
clock: z.function().returns(z.number()).optional(),
cloudflareFetch: CloudflareFetchSchema.optional(),

// TODO: add back validation of cf object
Expand Down Expand Up @@ -338,7 +340,10 @@ export const CORE_PLUGIN: Plugin<
});
}
const services: Service[] = [
{ name: SERVICE_LOOPBACK, external: { http: {} } },
{
name: SERVICE_LOOPBACK,
external: { http: { cfBlobHeader: HEADER_CF_BLOB } },
},
{
name: SERVICE_ENTRY,
worker: {
Expand Down
10 changes: 10 additions & 0 deletions packages/miniflare/src/plugins/shared/constants.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import type { RequestInitCfProperties } from "@cloudflare/workers-types";
import { Headers } from "undici";
import { Worker_Binding } from "../../runtime";
import { Persistence, PersistenceSchema } from "./gateway";

export const SOCKET_ENTRY = "entry";

export const HEADER_PERSIST = "MF-Persist";
// Even though we inject the `cf` blob in the entry script, we still need to
// specify a header, so we receive things like `cf.cacheKey` in loopback
// requests.
export const HEADER_CF_BLOB = "MF-CF-Blob";

export const BINDING_SERVICE_LOOPBACK = "MINIFLARE_LOOPBACK";
export const BINDING_TEXT_PLUGIN = "MINIFLARE_PLUGIN";
Expand Down Expand Up @@ -35,6 +40,11 @@ export function decodePersist(headers: Headers): Persistence {
: PersistenceSchema.parse(JSON.parse(header));
}

export function decodeCfBlob(headers: Headers): RequestInitCfProperties {
const header = headers.get(HEADER_CF_BLOB);
return header === null ? {} : JSON.parse(header);
}

export enum CfHeader {
Error = "cf-r2-error",
Request = "cf-r2-request",
Expand Down
Loading

0 comments on commit cde71d5

Please sign in to comment.