-
-
Notifications
You must be signed in to change notification settings - Fork 67
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: add examples for reusing workers
- Loading branch information
1 parent
d1eea3f
commit b7dfda4
Showing
2 changed files
with
204 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// @ts-ignore | ||
import { STATUS_CODE } from 'https://deno.land/std/http/status.ts'; | ||
|
||
interface Worker { | ||
new(key: string, rid: string): Worker; | ||
|
||
fetch(request: Request, options?: any): Promise<Response>; | ||
dispose(): void; | ||
|
||
get active(): boolean; | ||
} | ||
|
||
const SESSION_HEADER_NAME = 'X-Edge-Runtime-Session-Id'; | ||
const WORKERS = new Map<string, Worker>(); | ||
|
||
setInterval(() => { | ||
const shouldBeRemoved: string[] = []; | ||
|
||
for (const [uuid, worker] of WORKERS) { | ||
if (!worker.active) { | ||
shouldBeRemoved.push(uuid); | ||
} | ||
} | ||
|
||
for (const uuid of shouldBeRemoved) { | ||
console.log("deleted: ", uuid); | ||
WORKERS.delete(uuid); | ||
} | ||
}, 2500); | ||
|
||
console.log('main function started (session mode)'); | ||
|
||
Deno.serve(async (req: Request) => { | ||
const headers = new Headers({ | ||
'Content-Type': 'application/json', | ||
}); | ||
|
||
const url = new URL(req.url); | ||
const { pathname } = url; | ||
|
||
// handle health checks | ||
if (pathname === '/_internal/health') { | ||
return new Response( | ||
JSON.stringify({ 'message': 'ok' }), | ||
{ | ||
status: STATUS_CODE.OK, | ||
headers, | ||
}, | ||
); | ||
} | ||
|
||
if (pathname === '/_internal/metric') { | ||
const metric = await EdgeRuntime.getRuntimeMetrics(); | ||
return Response.json(metric); | ||
} | ||
|
||
const path_parts = pathname.split('/'); | ||
const service_name = path_parts[1]; | ||
|
||
if (!service_name || service_name === '') { | ||
const error = { msg: 'missing function name in request' }; | ||
return new Response( | ||
JSON.stringify(error), | ||
{ status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } }, | ||
); | ||
} | ||
|
||
const servicePath = `./examples/${service_name}`; | ||
const createWorker = async (): Promise<Worker> => { | ||
const memoryLimitMb = 150; | ||
const workerTimeoutMs = 30 * 1000; | ||
const noModuleCache = false; | ||
|
||
const importMapPath = null; | ||
const envVarsObj = Deno.env.toObject(); | ||
const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); | ||
const forceCreate = false; | ||
const netAccessDisabled = false; | ||
const cpuTimeSoftLimitMs = 10000; | ||
const cpuTimeHardLimitMs = 20000; | ||
|
||
return await EdgeRuntime.userWorkers.create({ | ||
servicePath, | ||
memoryLimitMb, | ||
workerTimeoutMs, | ||
noModuleCache, | ||
importMapPath, | ||
envVars, | ||
forceCreate, | ||
netAccessDisabled, | ||
cpuTimeSoftLimitMs, | ||
cpuTimeHardLimitMs, | ||
}); | ||
}; | ||
|
||
const callWorker = async () => { | ||
|
||
try { | ||
let worker: Worker | null = null; | ||
|
||
if (req.headers.get(SESSION_HEADER_NAME)) { | ||
const sessionId = req.headers.get(SESSION_HEADER_NAME)!; | ||
const complexSessionId = `${servicePath}/${sessionId}`; | ||
|
||
let maybeWorker = WORKERS.get(complexSessionId); | ||
|
||
if (maybeWorker && maybeWorker.active) { | ||
worker = maybeWorker; | ||
} | ||
} | ||
|
||
if (!worker) { | ||
worker = await createWorker(); | ||
} | ||
|
||
let resp = await worker.fetch(req); | ||
|
||
if (resp.headers.has(SESSION_HEADER_NAME)) { | ||
const sessionIdFromWorker = resp.headers.get(SESSION_HEADER_NAME)!; | ||
const complexSessionId = `${servicePath}/${sessionIdFromWorker}`; | ||
|
||
WORKERS.set(complexSessionId, worker); | ||
} | ||
|
||
return resp; | ||
} catch (e) { | ||
console.error(e); | ||
|
||
if (e instanceof Deno.errors.WorkerRequestCancelled) { | ||
headers.append('Connection', 'close'); | ||
} | ||
|
||
const error = { msg: e.toString() }; | ||
return new Response( | ||
JSON.stringify(error), | ||
{ | ||
status: STATUS_CODE.InternalServerError, | ||
headers, | ||
}, | ||
); | ||
} | ||
}; | ||
|
||
return callWorker(); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// @ts-ignore | ||
import { STATUS_CODE } from "https://deno.land/std/http/status.ts"; | ||
|
||
const SESSION_HEADER_NAME = "X-Edge-Runtime-Session-Id"; | ||
const SESSIONS = new Map<string, object>(); | ||
|
||
function makeNewSession(): [string, object] { | ||
const uuid = crypto.randomUUID(); | ||
const storage = {}; | ||
|
||
SESSIONS.set(uuid, storage); | ||
return [uuid, storage]; | ||
} | ||
|
||
function getSessionStorageFromRequest(req: Request): object | void { | ||
let maybeSessionId = req.headers.get(SESSION_HEADER_NAME); | ||
|
||
if (typeof maybeSessionId === "string" && SESSIONS.has(maybeSessionId)) { | ||
return SESSIONS.get(maybeSessionId); | ||
} | ||
} | ||
|
||
Deno.serve((req: Request) => { | ||
const headers = new Headers(); | ||
let storage: object; | ||
|
||
if (req.headers.get(SESSION_HEADER_NAME)) { | ||
const maybeStorage = getSessionStorageFromRequest(req); | ||
|
||
if (!maybeStorage) { | ||
return new Response(null, { | ||
status: STATUS_CODE.BadRequest | ||
}); | ||
} | ||
|
||
storage = maybeStorage; | ||
} else { | ||
const [sessionId, newStorage] = makeNewSession(); | ||
|
||
headers.set(SESSION_HEADER_NAME, sessionId); | ||
|
||
storage = newStorage; | ||
} | ||
|
||
if (!("count" in storage)) { | ||
storage["count"] = 0; | ||
} else { | ||
(storage["count"] as number)++; | ||
} | ||
|
||
const count = storage["count"] as number; | ||
|
||
return new Response( | ||
JSON.stringify({ count }), | ||
{ | ||
headers | ||
} | ||
); | ||
}); |