From 18f8f65424adb8505c7584ae01b1823bb648eb6e Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Wed, 12 Jan 2022 15:32:09 +0000 Subject: [PATCH] refactor: proxy/preview server (#231) Closes #205 This PR refactors how we setup the proxy server between the developer and the edge preview service during `wrangler dev`. Of note, we start the server immediately. We also buffers requests/streams and hold on to the, when starting/refreshing the token. This means a developer should never see `ERR_CONNECTION_REFUSED` error page, or have an older worker respond after making a change to the code. When the token does get refreshed, we flush said streams/requests with the newer values. --- .changeset/angry-schools-walk.md | 7 + packages/wrangler/src/dev.tsx | 89 ++------ packages/wrangler/src/proxy.ts | 343 ++++++++++++++++++++++++------- 3 files changed, 288 insertions(+), 151 deletions(-) create mode 100644 .changeset/angry-schools-walk.md diff --git a/.changeset/angry-schools-walk.md b/.changeset/angry-schools-walk.md new file mode 100644 index 000000000000..bc4b833702a2 --- /dev/null +++ b/.changeset/angry-schools-walk.md @@ -0,0 +1,7 @@ +--- +"wrangler": patch +--- + +refactor: proxy/preview server + +This PR refactors how we setup the proxy server between the developer and the edge preview service during `wrangler dev`. Of note, we start the server immediately. We also buffer requests/streams and hold on to them, when starting/refreshing the token. This means a developer should never see `ERR_CONNECTION_REFUSED` error page, or have an older worker respond after making a change to the code. And when the token does get refreshed, we flush said streams/requests with the newer values, making the iteration process a lot smoother and predictable. diff --git a/packages/wrangler/src/dev.tsx b/packages/wrangler/src/dev.tsx index 429c1cd595e4..4aa5329d4b11 100644 --- a/packages/wrangler/src/dev.tsx +++ b/packages/wrangler/src/dev.tsx @@ -22,7 +22,7 @@ import { getAPIToken } from "./user"; import fetch from "node-fetch"; import makeModuleCollector from "./module-collection"; import { withErrorBoundary, useErrorHandler } from "react-error-boundary"; -import { createHttpProxy } from "./proxy"; +import { usePreviewServer } from "./proxy"; import { execa } from "execa"; import { watch } from "chokidar"; @@ -38,7 +38,7 @@ export type DevProps = { jsxFactory: void | string; jsxFragment: void | string; bindings: CfWorkerInit["bindings"]; - public: void | string; + public: undefined | string; site: void | string; compatibilityDate: void | string; compatibilityFlags: void | string[]; @@ -133,7 +133,7 @@ function Remote(props: { name: void | string; bundle: EsbuildBundle | void; format: CfScriptFormat; - public: void | string; + public: undefined | string; site: void | string; port: number; accountId: void | string; @@ -145,7 +145,7 @@ function Remote(props: { }) { assert(props.accountId, "accountId is required"); assert(props.apiToken, "apiToken is required"); - const token = useWorker({ + const previewToken = useWorker({ name: props.name, bundle: props.bundle, format: props.format, @@ -160,10 +160,14 @@ function Remote(props: { usageModel: props.usageModel, }); - useProxy({ token, publicRoot: props.public, port: props.port }); + usePreviewServer({ + previewToken, + publicRoot: props.public, + port: props.port, + }); useInspector({ - inspectorUrl: token ? token.inspectorUrl.href : undefined, + inspectorUrl: previewToken ? previewToken.inspectorUrl.href : undefined, port: 9229, logToTerminal: true, }); @@ -509,7 +513,7 @@ function useWorker(props: { compatibilityDate: string | void; compatibilityFlags: string[] | void; usageModel: void | "bundled" | "unbound"; -}): CfPreviewToken | void { +}): CfPreviewToken | undefined { const { name, bundle, @@ -524,7 +528,7 @@ function useWorker(props: { usageModel, port, } = props; - const [token, setToken] = useState(); + const [token, setToken] = useState(); // This is the most reliable way to detect whether // something's "happened" in our system; We make a ref and @@ -533,6 +537,8 @@ function useWorker(props: { useEffect(() => { async function start() { + setToken(undefined); // reset token in case we're re-running + if (!bundle) return; if (format === "modules" && bundle.type === "commonjs") { console.error("⎔ Cannot use modules with a commonjs bundle."); @@ -546,7 +552,6 @@ function useWorker(props: { } if (!startedRef.current) { - console.log("⎔ Starting server..."); startedRef.current = true; } else { console.log("⎔ Detected changes, restarting server..."); @@ -624,72 +629,6 @@ function useWorker(props: { return token; } -function useProxy({ - token, - publicRoot, - port, -}: { - token: CfPreviewToken | void; - publicRoot: void | string; - port: number; -}) { - useEffect(() => { - if (!token) return; - // TODO(soon): since headers are added in callbacks, the server - // does not need to restart when changes are made. - const host = token.host; - const proxy = createHttpProxy({ - host, - assetPath: typeof publicRoot === "string" ? publicRoot : null, - onRequest: (headers) => { - headers["cf-workers-preview-token"] = token.value; - }, - onResponse: (headers) => { - for (const [name, value] of Object.entries(headers)) { - // Rewrite the remote host to the local host. - if (typeof value === "string" && value.includes(host)) { - headers[name] = value - .replaceAll(`https://${host}`, `http://localhost:${port}`) - .replaceAll(host, `localhost:${port}`); - } - } - }, - }); - - console.log(`⬣ Listening at http://localhost:${port}`); - - const server = proxy.listen(port); - - // TODO(soon): refactor logging format into its own function - proxy.on("request", function (req, res) { - // log all requests - console.log( - new Date().toLocaleTimeString(), - req.method, - req.url, - res.statusCode - ); - }); - proxy.on("upgrade", (req) => { - console.log( - new Date().toLocaleTimeString(), - req.method, - req.url, - 101, - "(WebSocket)" - ); - }); - proxy.on("error", (err) => { - console.error(new Date().toLocaleTimeString(), err); - }); - - return () => { - proxy.close(); - server.close(); - }; - }, [token, publicRoot, port]); -} - function sleep(period: number) { return new Promise((resolve) => setTimeout(resolve, period)); } diff --git a/packages/wrangler/src/proxy.ts b/packages/wrangler/src/proxy.ts index c3f607d4852e..5a1407bf3a86 100644 --- a/packages/wrangler/src/proxy.ts +++ b/packages/wrangler/src/proxy.ts @@ -1,97 +1,288 @@ import { connect } from "node:http2"; +import type { ServerHttp2Stream } from "node:http2"; import { createServer } from "node:http"; -import type { Server, IncomingHttpHeaders, RequestListener } from "node:http"; +import type { + IncomingHttpHeaders, + RequestListener, + IncomingMessage, + ServerResponse, + Server, +} from "node:http"; import WebSocket from "faye-websocket"; import serveStatic from "serve-static"; +import type { CfPreviewToken } from "./api/preview"; +import { useEffect, useRef } from "react"; -export interface HttpProxyInit { - host: string; - assetPath?: string | null; - onRequest?: (headers: IncomingHttpHeaders) => void; - onResponse?: (headers: IncomingHttpHeaders) => void; +/** + * `usePreviewServer` is a React hook that creates a local development + * server that can be used to develop a Worker. + * + * When we run `wrangler dev`, we start by uploading the compiled worker + * to the preview service, which responds with a preview token. + * (see `useWorker`/`createWorker` for details.) + * We can then use that token to connect to the preview server for a + * great local development experience. Further, as we change the worker, + * we can update the preview token transparently without having to restart + * the development server. + */ + +/** Rewrite request headers to add the preview token. */ +function addCfPreviewTokenHeader( + headers: IncomingHttpHeaders, + previewTokenValue: string +) { + headers["cf-workers-preview-token"] = previewTokenValue; } /** - * Creates a HTTP/1 proxy that sends requests over HTTP/2. + * Rewrite references in request headers + * from the preview host to the local host. */ -export function createHttpProxy(init: HttpProxyInit): Server { - const { host, assetPath, onRequest = () => {}, onResponse = () => {} } = init; - const remote = connect(`https://${host}`); - const local = createServer(); - // HTTP/2 -> HTTP/2 - local.on("stream", (stream, headers: IncomingHttpHeaders) => { - onRequest(headers); - headers[":authority"] = host; - const request = stream.pipe(remote.request(headers)); - request.on("response", (responseHeaders: IncomingHttpHeaders) => { - onResponse(responseHeaders); - stream.respond(responseHeaders); - request.pipe(stream, { end: true }); - }); - }); - // HTTP/1 -> HTTP/2 - const handleRequest: RequestListener = (message, response) => { - const { httpVersionMajor, headers, method, url } = message; - if (httpVersionMajor >= 2) { - return; // Already handled by the "stream" event. +function rewriteRemoteHostToLocalHostInHeaders( + headers: IncomingHttpHeaders, + remoteHost: string, + localPort: number +) { + for (const [name, value] of Object.entries(headers)) { + // Rewrite the remote host to the local host. + if (typeof value === "string" && value.includes(remoteHost)) { + headers[name] = value + .replaceAll(`https://${remoteHost}`, `http://localhost:${localPort}`) + .replaceAll(remoteHost, `localhost:${localPort}`); } - onRequest(headers); - headers[":method"] = method; - headers[":path"] = url; - headers[":authority"] = host; - headers[":scheme"] = "https"; - for (const name of Object.keys(headers)) { - if (HTTP1_HEADERS.has(name.toLowerCase())) { - delete headers[name]; - } + } +} + +export function usePreviewServer({ + previewToken, + publicRoot, + port, +}: { + previewToken: CfPreviewToken | undefined; + publicRoot: undefined | string; + port: number; +}) { + /** Creates an HTTP/1 proxy that sends requests over HTTP/2. */ + const proxyServer = useRef(); + if (!proxyServer.current) { + proxyServer.current = createServer() + .on("request", function (req, res) { + // log all requests + console.log( + new Date().toLocaleTimeString(), + req.method, + req.url, + res.statusCode + ); + }) + .on("upgrade", (req) => { + // log all websocket connections + console.log( + new Date().toLocaleTimeString(), + req.method, + req.url, + 101, + "(WebSocket)" + ); + }) + .on("error", (err) => { + // log all connection errors + console.error(new Date().toLocaleTimeString(), err); + }); + } + + /** + * When we're not connected / getting a fresh token on changes, + * we'd like to buffer streams/requests until we're connected. + * Once connected, we can flush the buffered streams/requests. + * streamBufferRef is used to buffer http/2 streams, while + * requestResponseBufferRef is used to buffer http/1 requests. + */ + const streamBufferRef = useRef< + { stream: ServerHttp2Stream; headers: IncomingHttpHeaders }[] + >([]); + const requestResponseBufferRef = useRef< + { request: IncomingMessage; response: ServerResponse }[] + >([]); + + useEffect(() => { + const proxy = proxyServer.current; + + // If we don't have a token, that means either we're just starting up, + // or we're refreshing the token. + if (!previewToken) { + const cleanupListeners: (() => void)[] = []; + const bufferStream = ( + stream: ServerHttp2Stream, + headers: IncomingHttpHeaders + ) => { + // store the stream in a buffer so we can replay it later + streamBufferRef.current.push({ stream, headers }); + }; + proxy.on("stream", bufferStream); + cleanupListeners.push(() => proxy.off("stream", bufferStream)); + + const bufferRequestResponse = ( + request: IncomingMessage, + response: ServerResponse + ) => { + // store the request and response in a buffer so we can replay it later + requestResponseBufferRef.current.push({ request, response }); + }; + + proxy.on("request", bufferRequestResponse); + cleanupListeners.push(() => proxy.off("request", bufferRequestResponse)); + return () => { + cleanupListeners.forEach((cleanup) => cleanup()); + }; } - const request = message.pipe(remote.request(headers)); - request.on("response", (responseHeaders) => { - const status = responseHeaders[":status"]; - onResponse(responseHeaders); - for (const name of Object.keys(responseHeaders)) { - if (name.startsWith(":")) { - delete responseHeaders[name]; + + // We have a token. Let's proxy requests to the preview end point. + const cleanupListeners: (() => void)[] = []; + + const assetPath = typeof publicRoot === "string" ? publicRoot : null; + + // create a ClientHttp2Session + const remote = connect(`https://${previewToken.host}`); + cleanupListeners.push(() => remote.destroy()); + + /** HTTP/2 -> HTTP/2 */ + function handleStream( + stream: ServerHttp2Stream, + headers: IncomingHttpHeaders + ) { + addCfPreviewTokenHeader(headers, previewToken.value); + headers[":authority"] = previewToken.host; + const request = stream.pipe(remote.request(headers)); + request.on("response", (responseHeaders: IncomingHttpHeaders) => { + rewriteRemoteHostToLocalHostInHeaders( + responseHeaders, + previewToken.host, + port + ); + stream.respond(responseHeaders); + request.pipe(stream, { end: true }); + }); + } + proxy.on("stream", handleStream); + cleanupListeners.push(() => proxy.off("stream", handleStream)); + + // flush and replay buffered streams + streamBufferRef.current.forEach((buffer) => + handleStream(buffer.stream, buffer.headers) + ); + streamBufferRef.current = []; + + /** HTTP/1 -> HTTP/2 */ + const handleRequest: RequestListener = ( + message: IncomingMessage, + response: ServerResponse + ) => { + const { httpVersionMajor, headers, method, url } = message; + if (httpVersionMajor >= 2) { + return; // Already handled by the "stream" event. + } + addCfPreviewTokenHeader(headers, previewToken.value); + headers[":method"] = method; + headers[":path"] = url; + headers[":authority"] = previewToken.host; + headers[":scheme"] = "https"; + for (const name of Object.keys(headers)) { + if (HTTP1_HEADERS.has(name.toLowerCase())) { + delete headers[name]; } } - response.writeHead(status, responseHeaders); - request.pipe(response, { end: true }); - }); - }; - // If an asset path is defined, check the file system - // for a file first and serve if it exists. - if (assetPath) { - const handleAsset = serveStatic(assetPath, { - cacheControl: false, - }); - local.on("request", (request, response) => { - handleAsset(request, response, () => { - handleRequest(request, response); + const request = message.pipe(remote.request(headers)); + request.on("response", (responseHeaders) => { + const status = responseHeaders[":status"]; + rewriteRemoteHostToLocalHostInHeaders( + responseHeaders, + previewToken.host, + port + ); + for (const name of Object.keys(responseHeaders)) { + if (name.startsWith(":")) { + delete responseHeaders[name]; + } + } + response.writeHead(status, responseHeaders); + request.pipe(response, { end: true }); }); - }); - } else { - local.on("request", handleRequest); - } - // HTTP/1 -> WebSocket (over HTTP/1) - local.on("upgrade", (message, socket, body) => { - const { headers, url } = message; - onRequest(headers); - headers["host"] = host; - const localWebsocket = new WebSocket(message, socket, body); - // TODO(soon): Custom WebSocket protocol is not working? - const remoteWebsocketClient = new WebSocket.Client( - `wss://${host}${url}`, - [], - { headers } + }; + + // If an asset path is defined, check the file system + // for a file first and serve if it exists. + const actualHandleRequest = assetPath + ? createHandleAssetsRequest(assetPath, handleRequest) + : handleRequest; + + proxy.on("request", actualHandleRequest); + cleanupListeners.push(() => proxy.off("request", actualHandleRequest)); + + // flush and replay buffered requests + requestResponseBufferRef.current.forEach(({ request, response }) => + actualHandleRequest(request, response) ); - localWebsocket.pipe(remoteWebsocketClient).pipe(localWebsocket); - }); - remote.on("close", () => { - local.close(); + requestResponseBufferRef.current = []; + + /** HTTP/1 -> WebSocket (over HTTP/1) */ + const handleUpgrade = ( + message: IncomingMessage, + socket: WebSocket, + body: Buffer + ) => { + const { headers, url } = message; + addCfPreviewTokenHeader(headers, previewToken.value); + headers["host"] = previewToken.host; + const localWebsocket = new WebSocket(message, socket, body); + // TODO(soon): Custom WebSocket protocol is not working? + const remoteWebsocketClient = new WebSocket.Client( + `wss://${previewToken.host}${url}`, + [], + { headers } + ); + localWebsocket.pipe(remoteWebsocketClient).pipe(localWebsocket); + // We close down websockets whenever we refresh the token. + cleanupListeners.push(() => { + localWebsocket.destroy(); + remoteWebsocketClient.destroy(); + }); + }; + proxy.on("upgrade", handleUpgrade); + cleanupListeners.push(() => proxy.off("upgrade", handleUpgrade)); + + return () => { + cleanupListeners.forEach((d) => d()); + }; + }, [previewToken, publicRoot, port]); + + // Start/stop the server whenever the + // containing component is mounted/unmounted. + useEffect(() => { + proxyServer.current.listen(port); + console.log(`⬣ Listening at http://localhost:${port}`); + + return () => { + proxyServer.current.close(); + }; + }, [port]); +} + +function createHandleAssetsRequest( + assetPath: string, + handleRequest: RequestListener +) { + const handleAsset = serveStatic(assetPath, { + cacheControl: false, }); - return local; + return (request: IncomingMessage, response: ServerResponse) => { + handleAsset(request, response, () => { + handleRequest(request, response); + }); + }; } +/** A Set of headers we want to remove from HTTP/1 requests. */ const HTTP1_HEADERS = new Set([ "host", "connection",