diff --git a/.changeset/cyan-geese-smell.md b/.changeset/cyan-geese-smell.md new file mode 100644 index 000000000000..1d3b0fbdbab8 --- /dev/null +++ b/.changeset/cyan-geese-smell.md @@ -0,0 +1,6 @@ +--- +"@cloudflare/workflows-shared": minor +"miniflare": minor +--- + +Add proper engine persistance in .wrangler and fix multiple workflows in miniflare diff --git a/fixtures/workflow-multiple/package.json b/fixtures/workflow-multiple/package.json new file mode 100644 index 000000000000..ca7184f54cc4 --- /dev/null +++ b/fixtures/workflow-multiple/package.json @@ -0,0 +1,17 @@ +{ + "name": "my-workflow-multiple", + "private": true, + "scripts": { + "deploy": "wrangler deploy", + "start": "wrangler dev", + "test:ci": "vitest" + }, + "devDependencies": { + "@cloudflare/workers-types": "^4.20241106.0", + "undici": "catalog:default", + "wrangler": "workspace:*" + }, + "volta": { + "extends": "../../package.json" + } +} diff --git a/fixtures/workflow-multiple/src/index.ts b/fixtures/workflow-multiple/src/index.ts new file mode 100644 index 000000000000..3498d07e2f24 --- /dev/null +++ b/fixtures/workflow-multiple/src/index.ts @@ -0,0 +1,90 @@ +import { + WorkerEntrypoint, + WorkflowEntrypoint, + WorkflowEvent, + WorkflowStep, +} from "cloudflare:workers"; + +type Params = { + name: string; +}; + +export class Demo extends WorkflowEntrypoint<{}, Params> { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { timestamp, payload } = event; + + await step.sleep("Wait", "1 second"); + + const result = await step.do("First step", async function () { + return { + output: "First step result", + }; + }); + + await step.sleep("Wait", "1 second"); + + const result2 = await step.do("Second step", async function () { + return { + output: "workflow1", + }; + }); + + return [result, result2, timestamp, payload, "workflow1"]; + } +} + +export class Demo2 extends WorkflowEntrypoint<{}, Params> { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { timestamp, payload } = event; + + await step.sleep("Wait", "1 second"); + + const result = await step.do("First step", async function () { + return { + output: "First step result", + }; + }); + + await step.sleep("Wait", "1 second"); + + const result2 = await step.do("Second step", async function () { + return { + output: "workflow2", + }; + }); + + return [result, result2, timestamp, payload, "workflow2"]; + } +} + +type Env = { + WORKFLOW: Workflow; + WORKFLOW2: Workflow; +}; + +export default class extends WorkerEntrypoint { + async fetch(req: Request) { + const url = new URL(req.url); + const id = url.searchParams.get("id"); + const workflowName = url.searchParams.get("workflowName"); + + if (url.pathname === "/favicon.ico") { + return new Response(null, { status: 404 }); + } + let workflowToUse = + workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW; + + let handle: WorkflowInstance; + if (url.pathname === "/create") { + if (id === null) { + handle = await workflowToUse.create(); + } else { + handle = await workflowToUse.create({ id }); + } + } else { + handle = await workflowToUse.get(id); + } + + return Response.json({ status: await handle.status(), id: handle.id }); + } +} diff --git a/fixtures/workflow-multiple/tests/index.test.ts b/fixtures/workflow-multiple/tests/index.test.ts new file mode 100644 index 000000000000..29268cd0ea29 --- /dev/null +++ b/fixtures/workflow-multiple/tests/index.test.ts @@ -0,0 +1,127 @@ +import { rm } from "fs/promises"; +import { resolve } from "path"; +import { fetch } from "undici"; +import { afterAll, beforeAll, describe, it, vi } from "vitest"; +import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived"; + +describe("Workflows", () => { + let ip: string, + port: number, + stop: (() => Promise) | undefined, + getOutput: () => string; + + beforeAll(async () => { + // delete previous run contents because of persistence + await rm(resolve(__dirname, "..") + "/.wrangler", { + force: true, + recursive: true, + }); + ({ ip, port, stop, getOutput } = await runWranglerDev( + resolve(__dirname, ".."), + ["--port=0", "--inspector-port=0"] + )); + }); + + afterAll(async () => { + await stop?.(); + }); + + async function fetchJson(url: string) { + const response = await fetch(url, { + headers: { + "MF-Disable-Pretty-Error": "1", + }, + }); + const text = await response.text(); + + try { + return JSON.parse(text); + } catch (err) { + throw new Error(`Couldn't parse JSON:\n\n${text}`); + } + } + + it("creates two instances with same id in two different workflows", async ({ + expect, + }) => { + const createResult = { + id: "test", + status: { + status: "running", + output: [], + }, + }; + + await Promise.all([ + expect( + fetchJson(`http://${ip}:${port}/create?workflowName=1&id=test`) + ).resolves.toStrictEqual(createResult), + expect( + fetchJson(`http://${ip}:${port}/create?workflowName=2&id=test`) + ).resolves.toStrictEqual(createResult), + ]); + + const firstResult = { + id: "test", + status: { + status: "running", + output: [{ output: "First step result" }], + }, + }; + await Promise.all([ + vi.waitFor( + async () => { + await expect( + fetchJson(`http://${ip}:${port}/status?workflowName=1&id=test`) + ).resolves.toStrictEqual(firstResult); + }, + { timeout: 5000 } + ), + vi.waitFor( + async () => { + await expect( + fetchJson(`http://${ip}:${port}/status?workflowName=2&id=test`) + ).resolves.toStrictEqual(firstResult); + }, + { timeout: 5000 } + ), + ]); + + await Promise.all([ + await vi.waitFor( + async () => { + await expect( + fetchJson(`http://${ip}:${port}/status?workflowName=1&id=test`) + ).resolves.toStrictEqual({ + id: "test", + status: { + status: "complete", + output: [ + { output: "First step result" }, + { output: "workflow1" }, + ], + }, + }); + }, + { timeout: 5000 } + ), + await vi.waitFor( + async () => { + await expect( + fetchJson(`http://${ip}:${port}/status?workflowName=2&id=test`) + ).resolves.toStrictEqual({ + id: "test", + status: { + status: "complete", + output: [ + { output: "First step result" }, + { output: "workflow2" }, + ], + }, + }); + }, + { timeout: 5000 } + ), + ]); + }); +}); diff --git a/fixtures/workflow-multiple/tests/tsconfig.json b/fixtures/workflow-multiple/tests/tsconfig.json new file mode 100644 index 000000000000..d2ce7f144694 --- /dev/null +++ b/fixtures/workflow-multiple/tests/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "@cloudflare/workers-tsconfig/tsconfig.json", + "compilerOptions": { + "types": ["node"] + }, + "include": ["**/*.ts", "../../../node-types.d.ts"] +} diff --git a/fixtures/workflow-multiple/tsconfig.json b/fixtures/workflow-multiple/tsconfig.json new file mode 100644 index 000000000000..856398634a5e --- /dev/null +++ b/fixtures/workflow-multiple/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "CommonJS", + "lib": ["ES2020"], + "types": ["@cloudflare/workers-types"], + "moduleResolution": "node", + "noEmit": true, + "skipLibCheck": true + }, + "include": ["**/*.ts"], + "exclude": ["tests"] +} diff --git a/fixtures/workflow-multiple/vitest.config.mts b/fixtures/workflow-multiple/vitest.config.mts new file mode 100644 index 000000000000..846cddc41995 --- /dev/null +++ b/fixtures/workflow-multiple/vitest.config.mts @@ -0,0 +1,9 @@ +import { defineProject, mergeConfig } from "vitest/config"; +import configShared from "../../vitest.shared"; + +export default mergeConfig( + configShared, + defineProject({ + test: {}, + }) +); diff --git a/fixtures/workflow-multiple/wrangler.toml b/fixtures/workflow-multiple/wrangler.toml new file mode 100644 index 000000000000..c984cab94231 --- /dev/null +++ b/fixtures/workflow-multiple/wrangler.toml @@ -0,0 +1,14 @@ +#:schema node_modules/wrangler/config-schema.json +name = "my-workflow-demo" +main = "src/index.ts" +compatibility_date = "2024-10-22" + +[[workflows]] +binding = "WORKFLOW" +name = "my-workflow" +class_name = "Demo" + +[[workflows]] +binding = "WORKFLOW2" +name = "my-workflow-2" +class_name = "Demo2" \ No newline at end of file diff --git a/fixtures/workflow/tests/index.test.ts b/fixtures/workflow/tests/index.test.ts index ce5a11e4983e..592feeef7c0d 100644 --- a/fixtures/workflow/tests/index.test.ts +++ b/fixtures/workflow/tests/index.test.ts @@ -1,3 +1,4 @@ +import { rm } from "fs/promises"; import { resolve } from "path"; import { fetch } from "undici"; import { afterAll, beforeAll, describe, it, vi } from "vitest"; @@ -10,14 +11,14 @@ describe("Workflows", () => { getOutput: () => string; beforeAll(async () => { + // delete previous run contents because of persistence + await rm(resolve(__dirname, "..") + "/.wrangler", { + force: true, + recursive: true, + }); ({ ip, port, stop, getOutput } = await runWranglerDev( resolve(__dirname, ".."), - [ - "--port=0", - "--inspector-port=0", - "--upstream-protocol=https", - "--host=prod.example.org", - ] + ["--port=0", "--inspector-port=0"] )); }); diff --git a/fixtures/workflow/worker-configuration.d.ts b/fixtures/workflow/worker-configuration.d.ts deleted file mode 100644 index ad79d683e0fb..000000000000 --- a/fixtures/workflow/worker-configuration.d.ts +++ /dev/null @@ -1,5 +0,0 @@ -// Generated by Wrangler by running `wrangler types` - -interface Env { - WORKFLOW: Workflow; -} diff --git a/packages/miniflare/src/plugins/workflows/index.ts b/packages/miniflare/src/plugins/workflows/index.ts index edd08d2e0be1..4d4a53a10225 100644 --- a/packages/miniflare/src/plugins/workflows/index.ts +++ b/packages/miniflare/src/plugins/workflows/index.ts @@ -62,15 +62,20 @@ export const WORKFLOWS_PLUGIN: Plugin< sharedOptions.workflowsPersist ); await fs.mkdir(persistPath, { recursive: true }); - const storageService: Service = { - name: WORKFLOWS_STORAGE_SERVICE_NAME, + // each workflow should get its own storage service + const storageServices: Service[] = Object.entries( + options.workflows ?? {} + ).map(([_, workflow]) => ({ + name: `${WORKFLOWS_STORAGE_SERVICE_NAME}-${workflow.name}`, disk: { path: persistPath, writable: true }, - }; + })); // this creates one miniflare service per workflow that the user's script has. we should dedupe engine definition later const services = Object.entries(options.workflows ?? {}).map( ([_bindingName, workflow]) => { - const uniqueKey = `miniflare-workflows`; + // NOTE(lduarte): the engine unique namespace key must be unique per workflow definition + // otherwise workerd will crash because there's two equal DO namespaces + const uniqueKey = `miniflare-workflows-${workflow.name}`; const workflowsBinding: Service = { name: `${WORKFLOWS_PLUGIN_NAME}:${workflow.name}`, @@ -90,8 +95,9 @@ export const WORKFLOWS_PLUGIN: Plugin< preventEviction: true, }, ], - // this might conflict between workflows - durableObjectStorage: { localDisk: WORKFLOWS_STORAGE_SERVICE_NAME }, + durableObjectStorage: { + localDisk: `${WORKFLOWS_STORAGE_SERVICE_NAME}-${workflow.name}`, + }, bindings: [ { name: "ENGINE", @@ -116,7 +122,7 @@ export const WORKFLOWS_PLUGIN: Plugin< return []; } - return [storageService, ...services]; + return [...storageServices, ...services]; }, getPersistPath({ workflowsPersist }, tmpPath) { diff --git a/packages/miniflare/test/plugins/workflows/index.spec.ts b/packages/miniflare/test/plugins/workflows/index.spec.ts new file mode 100644 index 000000000000..ac19a418e868 --- /dev/null +++ b/packages/miniflare/test/plugins/workflows/index.spec.ts @@ -0,0 +1,72 @@ +import * as fs from "fs/promises"; +import { scheduler } from "timers/promises"; +import test from "ava"; +import { Miniflare, MiniflareOptions } from "miniflare"; +import { useTmp } from "../../test-shared"; + +const WORKFLOW_SCRIPT = () => ` +import { WorkflowEntrypoint } from "cloudflare:workers"; +export class MyWorkflow extends WorkflowEntrypoint { + async run(event, step) { + await step.do("i'm a step?", async () => "yes you are") + + return "I'm a output string" + } + } + export default { + async fetch(request, env, ctx) { + const workflow = await env.MY_WORKFLOW.create({id: "i'm an id"}) + + return new Response(JSON.stringify(await workflow.status())) + }, + };`; + +test("persists Workflow data on file-system between runs", async (t) => { + const tmp = await useTmp(t); + const opts: MiniflareOptions = { + name: "worker", + compatibilityDate: "2024-11-20", + modules: true, + script: WORKFLOW_SCRIPT(), + workflows: { + MY_WORKFLOW: { + className: "MyWorkflow", + name: "MY_WORKFLOW", + }, + }, + workflowsPersist: tmp, + }; + let mf = new Miniflare(opts); + t.teardown(() => mf.dispose()); + + let res = await mf.dispatchFetch("http://localhost"); + t.is(await res.text(), '{"status":"running","output":[]}'); + + // there's no waitUntil in ava haha + const begin = performance.now(); + let success = false; + let test = ""; + while (performance.now() - begin < 2000) { + const res = await mf.dispatchFetch("http://localhost"); + console.log(test); + test = await res.text(); + if (test === '{"status":"complete","output":["yes you are"]}') { + success = true; + break; + } + await scheduler.wait(50); + } + t.true(success, `Condition was not met in 2000ms - output is ${test}`); + + // check if files were commited + const names = await fs.readdir(tmp); + t.deepEqual(names, ["miniflare-workflows-MY_WORKFLOW"]); + + // restart miniflare + await mf.dispose(); + mf = new Miniflare(opts); + + // state should be persisted now + res = await mf.dispatchFetch("http://localhost"); + t.is(await res.text(), '{"status":"complete","output":["yes you are"]}'); +}); diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index a72f3f5fe68e..4a80f6cdbdc1 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -53,9 +53,10 @@ export type DatabaseInstance = { ended_on: string | null; }; +const ENGINE_STATUS_KEY = "ENGINE_STATUS"; + export class Engine extends DurableObject { logs: Array = []; - status: InstanceStatus = InstanceStatus.Queued; isRunning: boolean = false; accountId: number | undefined; @@ -66,21 +67,32 @@ export class Engine extends DurableObject { constructor(state: DurableObjectState, env: Env) { super(state, env); - void this.ctx.blockConcurrencyWhile(async () => { this.ctx.storage.transactionSync(() => { - this.ctx.storage.sql.exec(` - CREATE TABLE IF NOT EXISTS priority_queue ( - id INTEGER PRIMARY KEY NOT NULL, - created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - target_timestamp INTEGER NOT NULL, - action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted), - entryType INTEGER NOT NULL, - hash TEXT NOT NULL, - CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1 - UNIQUE (action, entryType, hash) - ) - `); + try { + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS priority_queue ( + id INTEGER PRIMARY KEY NOT NULL, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + target_timestamp INTEGER NOT NULL, + action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted), + entryType INTEGER NOT NULL, + hash TEXT NOT NULL, + CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1 + UNIQUE (action, entryType, hash) + ); + CREATE TABLE IF NOT EXISTS states ( + id INTEGER PRIMARY KEY NOT NULL, + groupKey TEXT, + target TEXT, + metadata TEXT, + event INTEGER NOT NULL + ) + `); + } catch (e) { + console.error(e); + throw e; + } }); }); @@ -96,12 +108,13 @@ export class Engine extends DurableObject { target: string | null = null, metadata: Record ) { - this.logs.push({ + this.ctx.storage.sql.exec( + "INSERT INTO states (event, groupKey, target, metadata) VALUES (?, ?, ?, ?)", event, group, target, - metadata, - }); + JSON.stringify(metadata) + ); } readLogsFromStep(_cacheKey: string): RawInstanceLog[] { @@ -109,9 +122,19 @@ export class Engine extends DurableObject { } readLogs(): InstanceLogsResponse { + const logs = [ + ...this.ctx.storage.sql.exec>( + "SELECT event, groupKey, target, metadata FROM states" + ), + ]; + return { // @ts-expect-error TODO: Fix this - logs: this.logs, + logs: logs.map((log) => ({ + ...log, + metadata: JSON.parse(log.metadata as string), + group: log.groupKey, + })), }; } @@ -119,7 +142,13 @@ export class Engine extends DurableObject { _accountId: number, _instanceId: string ): Promise { - return this.status; + const res = await this.ctx.storage.get(ENGINE_STATUS_KEY); + + // NOTE(lduarte): if status don't exist, means that engine is running for the first time, so we assume queued + if (res === undefined) { + return InstanceStatus.Queued; + } + return res; } async setStatus( @@ -127,7 +156,7 @@ export class Engine extends DurableObject { instanceId: string, status: InstanceStatus ): Promise { - this.status = status; + await this.ctx.storage.put(ENGINE_STATUS_KEY, status); } async abort(_reason: string) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 73acc46b8003..86cb00ea9c0f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -741,6 +741,18 @@ importers: specifier: workspace:* version: link:../../packages/wrangler + fixtures/workflow-multiple: + devDependencies: + '@cloudflare/workers-types': + specifier: ^4.20241106.0 + version: 4.20241106.0 + undici: + specifier: catalog:default + version: 5.28.4 + wrangler: + specifier: workspace:* + version: link:../../packages/wrangler + packages/chrome-devtools-patches: devDependencies: patch-package: