Skip to content

Commit f858f6c

Browse files
committed
feat: make engine persist logs and status over multiple runs
1 parent 9945dc0 commit f858f6c

File tree

3 files changed

+61
-20
lines changed

3 files changed

+61
-20
lines changed

.changeset/cyan-geese-smell.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
"miniflare": patch
4+
---
5+
6+
Add proper engine persistance in .wrangler and fix multiple workflows in miniflare

fixtures/workflow/tests/index.test.ts

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { rm } from "fs/promises";
12
import { resolve } from "path";
23
import { fetch } from "undici";
34
import { afterAll, beforeAll, describe, it, vi } from "vitest";
@@ -10,6 +11,11 @@ describe("Workflows", () => {
1011
getOutput: () => string;
1112

1213
beforeAll(async () => {
14+
// delete previous run contents because of persistence
15+
await rm(resolve(__dirname, "..") + "/.wrangler", {
16+
force: true,
17+
recursive: true,
18+
});
1319
({ ip, port, stop, getOutput } = await runWranglerDev(
1420
resolve(__dirname, ".."),
1521
[

packages/workflows-shared/src/engine.ts

+49-20
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ export type DatabaseInstance = {
5353
ended_on: string | null;
5454
};
5555

56+
const ENGINE_STATUS_KEY = "ENGINE_STATUS";
57+
5658
export class Engine extends DurableObject<Env> {
5759
logs: Array<unknown> = [];
58-
status: InstanceStatus = InstanceStatus.Queued;
5960

6061
isRunning: boolean = false;
6162
accountId: number | undefined;
@@ -66,21 +67,32 @@ export class Engine extends DurableObject<Env> {
6667

6768
constructor(state: DurableObjectState, env: Env) {
6869
super(state, env);
69-
7070
void this.ctx.blockConcurrencyWhile(async () => {
7171
this.ctx.storage.transactionSync(() => {
72-
this.ctx.storage.sql.exec(`
73-
CREATE TABLE IF NOT EXISTS priority_queue (
74-
id INTEGER PRIMARY KEY NOT NULL,
75-
created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
76-
target_timestamp INTEGER NOT NULL,
77-
action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted),
78-
entryType INTEGER NOT NULL,
79-
hash TEXT NOT NULL,
80-
CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1
81-
UNIQUE (action, entryType, hash)
82-
)
83-
`);
72+
try {
73+
this.ctx.storage.sql.exec(`
74+
CREATE TABLE IF NOT EXISTS priority_queue (
75+
id INTEGER PRIMARY KEY NOT NULL,
76+
created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
77+
target_timestamp INTEGER NOT NULL,
78+
action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted),
79+
entryType INTEGER NOT NULL,
80+
hash TEXT NOT NULL,
81+
CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1
82+
UNIQUE (action, entryType, hash)
83+
);
84+
CREATE TABLE IF NOT EXISTS states (
85+
id INTEGER PRIMARY KEY NOT NULL,
86+
groupKey TEXT,
87+
target TEXT,
88+
metadata TEXT,
89+
event INTEGER NOT NULL
90+
)
91+
`);
92+
} catch (e) {
93+
console.error(e);
94+
throw e;
95+
}
8496
});
8597
});
8698

@@ -96,38 +108,55 @@ export class Engine extends DurableObject<Env> {
96108
target: string | null = null,
97109
metadata: Record<string, unknown>
98110
) {
99-
this.logs.push({
111+
this.ctx.storage.sql.exec(
112+
"INSERT INTO states (event, groupKey, target, metadata) VALUES (?, ?, ?, ?)",
100113
event,
101114
group,
102115
target,
103-
metadata,
104-
});
116+
JSON.stringify(metadata)
117+
);
105118
}
106119

107120
readLogsFromStep(_cacheKey: string): RawInstanceLog[] {
108121
return [];
109122
}
110123

111124
readLogs(): InstanceLogsResponse {
125+
const logs = [
126+
...this.ctx.storage.sql.exec<Record<string, string | number>>(
127+
"SELECT event, groupKey, target, metadata FROM states"
128+
),
129+
];
130+
112131
return {
113132
// @ts-expect-error TODO: Fix this
114-
logs: this.logs,
133+
logs: logs.map((log) => ({
134+
...log,
135+
metadata: JSON.parse(log.metadata as string),
136+
group: log.groupKey,
137+
})),
115138
};
116139
}
117140

118141
async getStatus(
119142
_accountId: number,
120143
_instanceId: string
121144
): Promise<InstanceStatus> {
122-
return this.status;
145+
const res = await this.ctx.storage.get<InstanceStatus>(ENGINE_STATUS_KEY);
146+
147+
// NOTE(lduarte): if status don't exist, means that engine is running for the first time, so we assume queued
148+
if (res === undefined) {
149+
return InstanceStatus.Queued;
150+
}
151+
return res;
123152
}
124153

125154
async setStatus(
126155
accountId: number,
127156
instanceId: string,
128157
status: InstanceStatus
129158
): Promise<void> {
130-
this.status = status;
159+
await this.ctx.storage.put(ENGINE_STATUS_KEY, status);
131160
}
132161

133162
async abort(_reason: string) {

0 commit comments

Comments
 (0)