Skip to content

Commit f7e35c7

Browse files
committed
Config
1 parent 45692ec commit f7e35c7

File tree

5 files changed

+119
-51
lines changed

5 files changed

+119
-51
lines changed

packages/world-postgres/src/cli.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { config } from 'dotenv';
44
import { drizzle } from 'drizzle-orm/postgres-js';
55
import { migrate } from 'drizzle-orm/postgres-js/migrator';
66
import postgres from 'postgres';
7+
import { DEFAULT_PG_URL } from './config.js';
78

89
const __dirname = dirname(fileURLToPath(import.meta.url));
910

@@ -14,7 +15,7 @@ async function setupDatabase() {
1415
const connectionString =
1516
process.env.WORKFLOW_POSTGRES_URL ||
1617
process.env.DATABASE_URL ||
17-
'postgres://world:world@localhost:5432/world';
18+
DEFAULT_PG_URL;
1819

1920
console.log('🔧 Setting up database schema...');
2021
console.log(
Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,58 @@
11
import type { QueueDriver } from './queue-drivers/types.js';
22

3-
export type PostgresWorldConfig = {
4-
securityToken: string;
5-
connectionString: string;
6-
queueFactory: () => QueueDriver;
3+
export type BaseWorldConfig = {
4+
connectionString?: string;
5+
securityToken?: string;
76
};
7+
8+
export type PostgresWorldConfig = BaseWorldConfig & {
9+
queueFactory?: () => QueueDriver;
10+
};
11+
12+
export type ResolvedBaseWorldConfig = Required<BaseWorldConfig>;
13+
14+
export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world';
15+
export const DEFAULT_SECURITY_TOKEN = 'secret';
16+
export const DEFAULT_JOB_PREFIX = 'workflow_';
17+
export const DEFAULT_QUEUE_CONCURRENCY = 10;
18+
19+
let worldConfig: ResolvedBaseWorldConfig | null = null;
20+
21+
export function loadWorldConfig(
22+
config: BaseWorldConfig = {}
23+
): ResolvedBaseWorldConfig {
24+
worldConfig = {
25+
connectionString:
26+
config.connectionString ??
27+
process.env.WORKFLOW_POSTGRES_URL ??
28+
process.env.DATABASE_URL ??
29+
DEFAULT_PG_URL,
30+
31+
securityToken:
32+
config.securityToken ??
33+
process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN ??
34+
DEFAULT_SECURITY_TOKEN,
35+
};
36+
37+
return worldConfig;
38+
}
39+
40+
export function getWorldConfig(): ResolvedBaseWorldConfig {
41+
if (!worldConfig) {
42+
throw new Error(
43+
'World config not loaded. Call createWorld() or loadWorldConfig().'
44+
);
45+
}
46+
47+
return worldConfig;
48+
}
49+
50+
export function getQueueConfig() {
51+
return {
52+
jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX ?? DEFAULT_JOB_PREFIX,
53+
queueConcurrency:
54+
(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY
55+
? parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY, 10)
56+
: undefined) ?? DEFAULT_QUEUE_CONCURRENCY,
57+
};
58+
}

packages/world-postgres/src/index.ts

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Storage, World } from '@workflow/world';
22
import createPostgres from 'postgres';
3-
import type { PostgresWorldConfig } from './config.js';
3+
import { loadWorldConfig, type PostgresWorldConfig } from './config.js';
44
import { createClient, type Drizzle } from './drizzle/index.js';
55
import { createFunctionProxy } from './proxies/function-proxy.js';
66
import { createHttpProxy } from './proxies/http-proxy.js';
@@ -18,16 +18,15 @@ import {
1818
} from './storage.js';
1919
import { createStreamer } from './streamer.js';
2020

21-
export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world';
22-
2321
export function createWorld(
24-
config: PostgresWorldConfig = {
25-
connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL,
26-
securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'secret',
27-
queueFactory: defaultQueueFactory,
28-
}
22+
opts: PostgresWorldConfig = {}
2923
): World & { start(): Promise<void> } {
30-
const queueDriver = config.queueFactory();
24+
const config = loadWorldConfig(opts);
25+
26+
const queueDriver = opts.queueFactory
27+
? opts.queueFactory()
28+
: createPgBossHttpProxyQueue();
29+
3130
const postgres = createPostgres(config.connectionString);
3231
const drizzle = createClient(postgres);
3332

@@ -54,26 +53,13 @@ function createStorage(drizzle: Drizzle): Storage {
5453
};
5554
}
5655

57-
function defaultQueueFactory() {
58-
return createPgBossHttpProxyQueue({
59-
baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL,
60-
connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL,
61-
securityToken:
62-
process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe',
63-
jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX,
64-
queueConcurrency:
65-
parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) ||
66-
10,
67-
});
68-
}
69-
7056
export type { PostgresWorldConfig } from './config.js';
7157
// Re-export schema for users who want to extend or inspect the database schema
7258
export * from './drizzle/schema.js';
7359

7460
export { createFunctionProxy, createHttpProxy };
7561
export {
7662
createPgBossQueue,
77-
createPgBossFunctionProxyQueue as createPgBossFunctionProxy,
78-
createPgBossHttpProxyQueue as createPgBossHttpProxy,
63+
createPgBossFunctionProxyQueue,
64+
createPgBossHttpProxyQueue,
7965
};

packages/world-postgres/src/queue-drivers/factories.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { getQueueConfig, getWorldConfig } from '../config.js';
12
import { createFunctionProxy } from '../proxies/function-proxy.js';
23
import { createHttpProxy } from '../proxies/http-proxy.js';
34
import { createPgBossQueue } from './pgboss.js';
@@ -9,20 +10,30 @@ import type { QueueDriver } from './types.js';
910
*/
1011
export function createPgBossFunctionProxyQueue(opts: {
1112
jobPrefix?: string;
12-
securityToken: string;
13-
connectionString: string;
13+
securityToken?: string;
14+
connectionString?: string;
1415
queueConcurrency?: number;
1516
stepEntrypoint: (request: Request) => Promise<Response>;
1617
workflowEntrypoint: (request: Request) => Promise<Response>;
1718
}): QueueDriver {
19+
const worldDefaults = getWorldConfig();
20+
const queueDefaults = getQueueConfig();
21+
22+
const config = {
23+
connectionString: opts.connectionString ?? worldDefaults.connectionString,
24+
securityToken: opts.securityToken ?? worldDefaults.securityToken,
25+
jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix,
26+
queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency,
27+
};
28+
1829
return createPgBossQueue(
1930
{
20-
jobPrefix: opts.jobPrefix,
21-
connectionString: opts.connectionString,
22-
queueConcurrency: opts.queueConcurrency,
31+
jobPrefix: config.jobPrefix,
32+
connectionString: config.connectionString,
33+
queueConcurrency: config.queueConcurrency,
2334
},
2435
createFunctionProxy({
25-
securityToken: opts.securityToken,
36+
securityToken: config.securityToken,
2637
stepEntrypoint: opts.stepEntrypoint,
2738
workflowEntrypoint: opts.workflowEntrypoint,
2839
})
@@ -33,14 +44,34 @@ export function createPgBossFunctionProxyQueue(opts: {
3344
* QueueDriver implementation using pg-boss for job management
3445
* and HTTP for execution.
3546
*/
36-
export function createPgBossHttpProxyQueue(config: {
37-
port?: number;
38-
baseUrl?: string;
39-
jobPrefix?: string;
40-
securityToken: string;
41-
connectionString: string;
42-
queueConcurrency?: number;
43-
}): QueueDriver {
47+
export function createPgBossHttpProxyQueue(
48+
opts: {
49+
port?: number;
50+
baseUrl?: string;
51+
jobPrefix?: string;
52+
securityToken?: string;
53+
connectionString?: string;
54+
queueConcurrency?: number;
55+
} = {}
56+
): QueueDriver {
57+
const worldDefaults = getWorldConfig();
58+
const queueDefaults = getQueueConfig();
59+
60+
const config = {
61+
connectionString: opts.connectionString ?? worldDefaults.connectionString,
62+
securityToken: opts.securityToken ?? worldDefaults.securityToken,
63+
jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix,
64+
queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency,
65+
66+
port:
67+
opts.port ??
68+
(process.env.WORKFLOW_POSTGRES_APP_PORT
69+
? parseInt(process.env.WORKFLOW_POSTGRES_APP_PORT, 10)
70+
: undefined),
71+
72+
baseUrl: opts.baseUrl ?? process.env.WORKFLOW_POSTGRES_APP_URL,
73+
};
74+
4475
return createPgBossQueue(
4576
{
4677
jobPrefix: config.jobPrefix,

packages/world-postgres/src/queue-drivers/pgboss.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,18 @@ import { MessageData, type QueueDriver } from './types.js';
77
* Takes in a proxy that will handle the actual step/flow execution.
88
*/
99
export function createPgBossQueue(
10-
config: {
11-
jobPrefix?: string;
10+
opts: {
11+
jobPrefix: string;
1212
connectionString: string;
13-
queueConcurrency?: number;
13+
queueConcurrency: number;
1414
},
1515
proxy: WkfProxy
1616
): QueueDriver {
1717
let startPromise: Promise<unknown> | null = null;
18-
const boss = new PgBoss(config.connectionString);
18+
const boss = new PgBoss(opts.connectionString);
1919

20-
const prefix = config.jobPrefix || 'workflow_';
21-
const stepQueueName = `${prefix}steps`;
22-
const workflowQueueName = `${prefix}flows`;
20+
const stepQueueName = `${opts.jobPrefix}steps`;
21+
const workflowQueueName = `${opts.jobPrefix}flows`;
2322

2423
const ensureStarted = async () => {
2524
if (!startPromise) {
@@ -59,7 +58,7 @@ export function createPgBossQueue(
5958
const stepWorker = createWorker(proxy.proxyStep);
6059
const workflowWorker = createWorker(proxy.proxyWorkflow);
6160

62-
for (let i = 0; i < (config.queueConcurrency || 10); i++) {
61+
for (let i = 0; i < opts.queueConcurrency; i++) {
6362
await boss.work(workflowQueueName, workflowWorker);
6463
await boss.work(stepQueueName, stepWorker);
6564
}

0 commit comments

Comments
 (0)