From 9123dbf24403fae3ea4dc76d09fd0a52e38a6d06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 19:27:10 +0100 Subject: [PATCH 1/2] fix(sync): wire embedded feed-sync executor + workers/poll RangeError Two bugs blocked the headless `lobu run -> lobu apply -> trigger_feed -> events appear` data sync loop on a fresh install. Both confirmed via local repro against the embedded server. Bug A: embedded mode never executed runs(run_type='sync'). The gateway booted but no connector-worker ever called /api/workers/poll, so feed- sync rows sat in `pending` forever. New module wires the existing `WorkerDaemon` in-process, started after `listen()` so its boot-time health check can resolve. Opt-out via LOBU_DISABLE_EMBEDDED_WORKER=1. Atomic claim already lives in worker-api.ts (FOR UPDATE OF r SKIP LOCKED + claimed_by) so embedded + external fleet workers co-exist. Bug B: /api/workers/poll 500'd with `RangeError: init["status"] must be in the range of 200 to 599` whenever a valid Better-Auth session token bearer hit it. Root cause: `MultiTenantProvider.resolveAuth`'s `setContextAndContinue` helper returned `next()` (which can be either Hono's plain `Next` or a wrapped cb that may return a Response), but every caller did `await setContextAndContinue(...); return undefined;` - discarding the cb's Response. The workers/* gating middleware's "Worker token missing device_worker:run scope" 403 was silently dropped, Hono never saw it, and a downstream `c.header()` re-wrap on the half-initialized response collapsed via `new Response(c.body, c)` with the Hono Context as init -> init.status = c.status (a function) -> RangeError. Switch every caller to `return setContextAndContinue(...)`; widen the helper's return type and the WorkspaceProvider.resolveAuth `next` param (new `ResolveAuthNext`) so the Response now propagates. E2E reproducer (PGlite, port 8802): - Pre-fix: trigger_feed -> run sits forever; poll with session bearer -> 500. - Post-fix: trigger_feed -> run completes (336 events for HN openai search); poll -> 200/403 depending on auth, zero RangeErrors. Closes the install_operator headless sync loop on a fresh install. --- docs/fix-sync-loop-design.md | 92 +++++++++++++++++++ packages/server/src/auth/middleware.ts | 23 +++-- .../scheduled/embedded-connector-worker.ts | 84 +++++++++++++++++ packages/server/src/server.ts | 18 ++++ packages/server/src/start-local.ts | 17 ++++ packages/server/src/workspace/multi-tenant.ts | 38 ++++---- packages/server/src/workspace/types.ts | 25 ++++- 7 files changed, 264 insertions(+), 33 deletions(-) create mode 100644 docs/fix-sync-loop-design.md create mode 100644 packages/server/src/scheduled/embedded-connector-worker.ts diff --git a/docs/fix-sync-loop-design.md b/docs/fix-sync-loop-design.md new file mode 100644 index 000000000..f5261bc7e --- /dev/null +++ b/docs/fix-sync-loop-design.md @@ -0,0 +1,92 @@ +# fix-sync-loop design + +Two bugs block the headless `lobu run → lobu apply → trigger_feed → events appear` data sync loop on a fresh install. Both confirmed via local repro against the embedded server (PGlite, port 8802). + +## Bug A — embedded mode never executes `runs(run_type='sync')` + +`packages/server/src/tools/admin/manage_feeds.ts:483-509` and `packages/server/src/scheduled/check-due-feeds.ts` both insert pending `runs` rows. They are claimed and executed by the **out-of-process connector-worker daemon** (`packages/connector-worker/src/daemon/worker.ts` + `executor.ts`), which polls `/api/workers/poll` over HTTP. `lobu run` (embedded mode) boots the gateway + task scheduler but never starts the daemon, so feed-sync rows sit in `pending` forever. No `events` ever land. + +`packages/server/src/lib/feed-sync.ts::runFeed` exists but only calls `executeCompiledConnector` and discards results — it does NOT persist `events`. The persistence path is the daemon's `client.stream(...)` call, which lands in `worker-api.ts::streamContent` and `insertEvent`. So a "tick that calls `runFeed` per pending run" would not produce events. We need the actual claim → execute → stream → complete pipeline. + +### Fix + +Run the connector-worker daemon **in-process** under `start-local.ts` / `server.ts` immediately after `bootTaskScheduler`, pointed at `http://127.0.0.1:${PORT}`. Same code path the standalone daemon uses; same atomic claim SQL (`packages/server/src/worker-api.ts::pollWorkerJob`); same complete/stream wiring. No new claim semantics, no double-execution concern — the daemon and any external worker fleet would coordinate via the existing `FOR UPDATE OF r SKIP LOCKED` claim filter. + +Implementation: +- New module `packages/server/src/scheduled/embedded-connector-worker.ts` that **constructs `WorkerDaemon` directly** (NOT `startDaemon` — that installs signal handlers + `process.exit`, wrong for in-process use), then `void daemon.start().catch(logger.error)`. +- Started **after `httpServer.listen()` callback fires** so the daemon's boot-time `/api/health` check can resolve. In `start-local.ts`/`server.ts`, move the embedded-daemon spawn into the listen callback (or use a setImmediate post-listen). +- Wired into both `server.ts` and `start-local.ts`. Default ON in embedded mode; opt-out via `LOBU_DISABLE_EMBEDDED_WORKER=1` (e.g. prod with external fleet). +- Stable `worker_id` = `embedded:${hostname()}:${pid}` so claims are attributable in logs. +- Shutdown: call `daemon.stop()` + `await daemon.waitForActiveJobs(30_000)` from the existing `shutdown(signal)` path. Note: `stop()` only flips the running flag; it does NOT interrupt the in-flight `sleep(pollIntervalMs)`. The wait covers in-flight jobs; the daemon exits within `pollIntervalMs` after. + +### Race / correctness + +- Atomic claim already exists in `worker-api.ts::pollWorkerJob` (`FOR UPDATE OF r SKIP LOCKED LIMIT 1`). Embedded + external daemons can co-exist; whichever calls `/api/workers/poll` first wins the row. No double-execute. +- Heartbeat-lost reaper (`startStaleRunReaper`) already handles crashed/killed runs. +- Default OFF when `WORKER_API_TOKEN` is set AND `LOBU_DISABLE_EMBEDDED_WORKER=1` — so prod with external fleet can opt out. + +## Bug B — `/api/workers/poll` 500s with `RangeError: init["status"] must be in the range of 200 to 599` + +Repro: hit `/api/workers/poll` with a Bearer that resolves to a valid Better-Auth session (i.e. the install_operator). Server returns 500. + +Stack: +``` +RangeError at undici/initializeResponse + new Response(body, init) + at [getResponseCache] (@hono/node-server) + at get headers (@hono/node-server) + at set res (hono/context.js:133) + at dispatch (hono/compose.js:38) +``` + +Trap output (instrumented `globalThis.Response` constructor): +- `body` = `(data, arg, headers) => this.#newResponse(data, arg, headers)` — i.e. the Hono Context's `c.body` method +- `init` = a Hono `Context` object (with `init.status` = the `c.status` method, which is a Function) + +### Root cause + +`packages/server/src/workspace/multi-tenant.ts::resolveAuth` is invoked two ways: +1. As a Hono middleware: `app.use('/foo', mcpAuth)` — `next` returns `Promise`. +2. As a wrapped call: `app.use('/api/workers/*', async (c, next) => mcpAuth(c, async () => { ... return c.json(...); }))` — the cb's return value is a `Response`. + +Inside `resolveAuth`, every branch uses the same pattern: +```ts +await setContextAndContinue({...}); +return undefined; +``` + +`setContextAndContinue` returns `next()`. In case (2), `next` is the cb; the cb returns a `Response`; `setContextAndContinue` returns that `Response`. The caller **awaits then discards** it and returns `undefined`. + +When the workers/* middleware's cb returns `c.json(..., 403)` (e.g. "Worker token missing device_worker:run scope" — session auth populates `mcpIsAuthenticated=true` but never sets `mcpAuthInfo`, so the scopes check fails), that 403 Response is lost. `mcpAuth` returns `undefined`. The workers middleware returns `undefined`. Hono compose sees `res=undefined && context.finalized=false` → does NOT set `c.res` AND advances to next handler (because the OUTER `next` was called via `setContextAndContinue → cb`-wait, actually the cb returned BEFORE calling outer `next`, so Hono should stop). The actual mechanism for the bad Response getting into `c.res` is somewhere downstream re-wraps via `c.header()`'s line 211 finalized-path which does `this.#res = createResponseInstance(this.#res.body, this.#res)` — but the upstream root cause is the discarded Response. + +### Fix + +Change `resolveAuth` so it propagates `setContextAndContinue`'s return value instead of discarding it. All eight call sites switch from: +```ts +await setContextAndContinue({...}); +return undefined; +``` +to: +```ts +return setContextAndContinue({...}); +``` + +(`setContextAndContinue` already returns a Promise resolving to whatever `next()` resolves to. For middleware-style use, `next()` resolves to `undefined` — behavior preserved. For wrapped-cb use, the cb's Response now propagates.) + +**Type widening required.** `WorkspaceProvider.resolveAuth` currently takes Hono `Next` (`() => Promise`). `mcpAuth` already widens its `next` param to `() => Promise` and casts to `Next` before calling `resolveAuth`. To make TypeScript see the cb's `Response` return value flow through, widen `WorkspaceProvider.resolveAuth`'s `next` param to `() => Promise` in `packages/server/src/workspace/types.ts`, and stop the `as Next` cast in `auth/middleware.ts`. Existing call sites that pass Hono's Next still typecheck (void is a subtype here). + +### Validation + +- Curl `/api/workers/poll` with the install_operator's signed session-token bearer: must return JSON (403 "missing device_worker:run scope" if session-only; 200 next_poll if PAT with proper scope). Not 500. +- Existing PAT/OAuth paths unchanged (cb already returns through `setContextAndContinue → next()`). + +## Test plan + +1. `make build-packages` → clean. +2. `make typecheck` → clean. +3. Boot `lobu run` against PGlite (port 8802). Sign in as install_operator. Trigger a feed via `manage_feeds.trigger_feed`. Wait ≤ 30s. Query `events` table — must have new rows. (Bug A fix.) +4. Curl `POST /api/workers/poll` with a session-token Bearer. Must return 200/403/204 — never 500. Grep server log for "RangeError" — none. (Bug B fix.) + +## Schema / migrations + +None. No new tables, no column changes. diff --git a/packages/server/src/auth/middleware.ts b/packages/server/src/auth/middleware.ts index be99135be..48e44ca56 100644 --- a/packages/server/src/auth/middleware.ts +++ b/packages/server/src/auth/middleware.ts @@ -7,6 +7,7 @@ import type { Context, Next } from 'hono'; import type { Env } from '../index'; import { getWorkspaceProvider } from '../workspace'; +import type { ResolveAuthNext } from '../workspace/types'; import { createAuth } from './index'; import type { AuthInfo } from './oauth/types'; @@ -93,15 +94,17 @@ export async function requireAuth(c: Context<{ Bindings: Env }>, next: Next) { * Middleware: MCP authentication (optional auth for MCP endpoints) * Delegates entirely to WorkspaceProvider.resolveAuth. * - * `next` is widened past Hono's `Next` so callers that use `mcpAuth(c, cb)` - * with an `async` callback that may short-circuit by returning a `Response` - * (e.g. the /api/workers/* gating middleware) still typecheck — Hono's own - * `Next` (`() => Promise`) is a subtype of this, so the normal - * `app.use(..., mcpAuth)` usage is unchanged. + * `next` is typed as `ResolveAuthNext` (`() => Promise`) so + * callers that use `mcpAuth(c, cb)` with an `async` callback that may + * short-circuit by returning a `Response` (e.g. the /api/workers/* gating + * middleware) actually have that Response propagate back. Hono's own `Next` + * (`() => Promise`) is a subtype, so plain `app.use(..., mcpAuth)` + * usage is unchanged. + * + * See `workspace/types.ts::ResolveAuthNext` for the widening rationale — + * the prior `next as Next` cast silently dropped the cb's Response and + * caused Bug B (workers/poll 500s with the undici RangeError). */ -export async function mcpAuth( - c: Context<{ Bindings: Env }>, - next: () => Promise -) { - return getWorkspaceProvider().resolveAuth(c, next as Next); +export async function mcpAuth(c: Context<{ Bindings: Env }>, next: ResolveAuthNext | Next) { + return getWorkspaceProvider().resolveAuth(c, next as ResolveAuthNext); } diff --git a/packages/server/src/scheduled/embedded-connector-worker.ts b/packages/server/src/scheduled/embedded-connector-worker.ts new file mode 100644 index 000000000..0f4b19b39 --- /dev/null +++ b/packages/server/src/scheduled/embedded-connector-worker.ts @@ -0,0 +1,84 @@ +/** + * Embedded connector-worker daemon. + * + * In embedded mode (`lobu run` / `bun run dev`), the gateway and the + * connector-worker run in the same Node process. Previously only the + * gateway booted, which meant `runs(run_type='sync')` rows sat in + * `pending` forever — nothing called `/api/workers/poll`. `manage_feeds` + * with `trigger_feed` would happily enqueue a run; no events ever + * landed. + * + * This module spins up the existing `WorkerDaemon` in-process, pointed + * at the local gateway (`http://127.0.0.1:${PORT}`). The atomic claim + * already lives in `worker-api.ts::pollWorkerJob` (`FOR UPDATE OF r SKIP + * LOCKED LIMIT 1` + `claimed_by = worker_id`), so an embedded daemon and + * any external fleet worker co-exist without double-execution. + * + * Opt-out: set `LOBU_DISABLE_EMBEDDED_WORKER=1` (prod deployments with a + * separate connector-worker pod). + */ + +import { hostname } from 'node:os'; +import { WorkerDaemon } from '../../../connector-worker/src/daemon/worker'; +import type { Env } from '../index'; +import logger from '../utils/logger'; + +const DEFAULT_POLL_INTERVAL_MS = 5_000; + +export interface EmbeddedConnectorWorkerHandle { + /** Stop polling. In-flight jobs continue to completion (or `wait()`). */ + stop(): void; + /** Wait for any in-flight jobs to drain after `stop()`. */ + wait(timeoutMs?: number): Promise; +} + +/** + * Start the embedded connector-worker. Returns a handle the server's + * shutdown path can use to drain in-flight runs. + * + * Must be called AFTER the HTTP server is listening — `WorkerDaemon.start()` + * does a `GET /api/health` check up-front, which will fail if the listener + * isn't ready yet. + */ +export function startEmbeddedConnectorWorker( + env: Env, + apiUrl: string +): EmbeddedConnectorWorkerHandle | null { + if (process.env.LOBU_DISABLE_EMBEDDED_WORKER === '1') { + logger.info('[embedded-worker] disabled via LOBU_DISABLE_EMBEDDED_WORKER=1'); + return null; + } + + const workerId = `embedded:${hostname() || 'localhost'}:${process.pid}`; + const daemon = new WorkerDaemon( + { + apiUrl, + workerId, + workerApiToken: env.WORKER_API_TOKEN, + capabilities: { browser: false }, + pollIntervalMs: DEFAULT_POLL_INTERVAL_MS, + maxConcurrentJobs: 1, + }, + env + ); + + // Fire-and-forget. WorkerDaemon.start() loops until stop() and surfaces + // poll errors via console.error internally; failure here is recoverable + // (next tick retries) so we don't want to crash the gateway boot. + void daemon + .start() + .then(() => logger.info({ workerId }, '[embedded-worker] stopped cleanly')) + .catch((err) => { + logger.error( + { err, workerId }, + '[embedded-worker] crashed; runs(run_type=sync) will not drain until restart' + ); + }); + + logger.info({ workerId, apiUrl }, '[embedded-worker] started'); + + return { + stop: () => daemon.stop(), + wait: (timeoutMs?: number) => daemon.waitForActiveJobs(timeoutMs), + }; +} diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index afaf9af23..a5ef10e10 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -216,6 +216,16 @@ async function main() { // backstop — the lock keeps the two cadences from double-failing rows. const stopReaper = startStaleRunReaper(); + // Embedded connector-worker daemon — polls our own `/api/workers/poll` + // and executes claimed runs in-process. Started AFTER `listen()` below + // so the daemon's boot-time health check resolves. Prod deployments + // running a separate connector-worker pod should set + // `LOBU_DISABLE_EMBEDDED_WORKER=1`. + const { startEmbeddedConnectorWorker } = await import( + './scheduled/embedded-connector-worker' + ); + let embeddedWorker: ReturnType = null; + const port = parseInt(process.env.PORT || '8787', 10); const host = process.env.HOST?.trim() || '0.0.0.0'; @@ -238,6 +248,10 @@ async function main() { // Graceful shutdown const shutdown = async (signal: string) => { logger.info({ signal }, 'Received shutdown signal, stopping gracefully...'); + if (embeddedWorker) { + embeddedWorker.stop(); + await embeddedWorker.wait(15_000); + } await vite?.close(); stopReaper(); taskScheduler.stop(); @@ -308,6 +322,10 @@ async function main() { logger.error({ err }, 'Connector external dependency check failed'); process.exit(1); } + // Embedded daemon waits for the listener because its boot health check + // hits `/api/health` on this same process. + const daemonHost = host === '0.0.0.0' ? '127.0.0.1' : host; + embeddedWorker = startEmbeddedConnectorWorker(env, `http://${daemonHost}:${port}`); }); } diff --git a/packages/server/src/start-local.ts b/packages/server/src/start-local.ts index 78e403409..27dabff4c 100644 --- a/packages/server/src/start-local.ts +++ b/packages/server/src/start-local.ts @@ -180,6 +180,15 @@ async function main() { const { startStaleRunReaper } = await import('./scheduled/check-stalled-executions'); const stopReaper = startStaleRunReaper(); + // Embedded connector-worker daemon — same process executes + // `runs(run_type='sync')` by polling our own `/api/workers/poll`. + // Started AFTER `listen()` so the daemon's boot-time health check + // can resolve. Opt-out: `LOBU_DISABLE_EMBEDDED_WORKER=1`. + const { startEmbeddedConnectorWorker } = await import( + './scheduled/embedded-connector-worker' + ); + let embeddedWorker: ReturnType = null; + const wrapper = new Hono<{ Bindings: Env }>(); wrapper.use('*', async (c, next) => { // Stash the peer TCP remote-address so handlers that need to enforce @@ -214,6 +223,11 @@ async function main() { const shutdown = async (signal: string) => { logger.info({ signal }, 'Shutting down'); + if (embeddedWorker) { + embeddedWorker.stop(); + // Best-effort drain; don't block shutdown forever on a stuck connector. + await embeddedWorker.wait(15_000); + } stopReaper(); stopScheduler(); await vite?.close(); @@ -270,6 +284,9 @@ async function main() { httpServer.listen(PORT, HOST, () => { logger.info(`Lobu running at http://${HOST}:${PORT}`); logger.info(`Data: ${DATA_DIR}`); + // Embedded daemon must wait for the listener — its boot-time + // health check hits `/api/health` on this same process. + embeddedWorker = startEmbeddedConnectorWorker(env, `http://${HOST}:${PORT}`); }); } diff --git a/packages/server/src/workspace/multi-tenant.ts b/packages/server/src/workspace/multi-tenant.ts index b7760d77f..fa79ab7dd 100644 --- a/packages/server/src/workspace/multi-tenant.ts +++ b/packages/server/src/workspace/multi-tenant.ts @@ -1,5 +1,4 @@ import { verifyWorkerToken } from '@lobu/core'; -import type { Next } from 'hono'; import { getAuthConfig as getAuthConfigFromEnv } from '../auth/config'; import { createAuth } from '../auth/index'; import { OAuthProvider } from '../auth/oauth/provider'; @@ -14,6 +13,7 @@ import type { AuthConfigData, HonoContext, OrgInfo, + ResolveAuthNext, ResolvedOwner, WorkspaceProvider, } from './types'; @@ -134,7 +134,7 @@ export class MultiTenantProvider implements WorkspaceProvider { logger.info('[MultiTenantProvider] Initialized'); } - async resolveAuth(c: HonoContext, next: Next): Promise { + async resolveAuth(c: HonoContext, next: ResolveAuthNext): Promise { const authHeader = c.req.header('Authorization'); const sql = getDb(); const baseUrl = getConfiguredPublicOrigin() ?? new URL(c.req.url).origin; @@ -236,7 +236,7 @@ export class MultiTenantProvider implements WorkspaceProvider { return role; } - function setContextAndContinue( + async function setContextAndContinue( overrides: Partial<{ mcpAuthInfo: AuthInfo | null; mcpIsAuthenticated: boolean; @@ -246,7 +246,7 @@ export class MultiTenantProvider implements WorkspaceProvider { session: unknown; authSource: 'session' | 'pat' | 'oauth' | null; }> - ) { + ): Promise { if (overrides.mcpAuthInfo !== undefined) c.set('mcpAuthInfo', overrides.mcpAuthInfo); if (overrides.mcpIsAuthenticated !== undefined) c.set('mcpIsAuthenticated', overrides.mcpIsAuthenticated); @@ -255,7 +255,11 @@ export class MultiTenantProvider implements WorkspaceProvider { if (overrides.user !== undefined) c.set('user', overrides.user as any); if (overrides.session !== undefined) c.set('session', overrides.session as any); if (overrides.authSource !== undefined) c.set('authSource', overrides.authSource); - return next(); + // The cb (workers/* gating mw) may return a Response to short-circuit; + // Hono's plain `Next` returns void. `next()` resolves to one of those — + // pass it back to the caller so a short-circuit Response actually + // reaches Hono compose and gets installed as c.res. See Bug B fix doc. + return (await next()) ?? undefined; } // 1) Embedded worker direct-auth for the in-process lobu-memory MCP. @@ -317,7 +321,7 @@ export class MultiTenantProvider implements WorkspaceProvider { 403 ); } - await setContextAndContinue({ + return setContextAndContinue({ mcpAuthInfo: { userId: directAuthUserId, organizationId: requestedOrgId, @@ -331,7 +335,6 @@ export class MultiTenantProvider implements WorkspaceProvider { memberRole: directAuthRole, authSource: 'pat', }); - return undefined; } // 2) Bearer token auth (PAT or OAuth) @@ -402,14 +405,13 @@ export class MultiTenantProvider implements WorkspaceProvider { if (!effectiveOrgId) { if (isUnscopedRoute) { - await setContextAndContinue({ + return setContextAndContinue({ mcpAuthInfo: authInfo, mcpIsAuthenticated: true, organizationId: null, memberRole: null, authSource: isPat ? 'pat' : 'oauth', }); - return undefined; } return c.json( { @@ -471,7 +473,7 @@ export class MultiTenantProvider implements WorkspaceProvider { bearerUser = null; } - await setContextAndContinue({ + return setContextAndContinue({ mcpAuthInfo: authInfo, mcpIsAuthenticated: true, organizationId: effectiveOrgId, @@ -479,7 +481,6 @@ export class MultiTenantProvider implements WorkspaceProvider { user: bearerUser, authSource: isPat ? 'pat' : 'oauth', }); - return undefined; } // end of `else` branch (PAT/OAuth verify hit) } @@ -521,7 +522,7 @@ export class MultiTenantProvider implements WorkspaceProvider { if (session?.user && session.session) { if (!requestedOrgId) { if (isUnscopedRoute) { - await setContextAndContinue({ + return setContextAndContinue({ mcpIsAuthenticated: true, organizationId: null, memberRole: null, @@ -529,7 +530,6 @@ export class MultiTenantProvider implements WorkspaceProvider { session: session.session, authSource: 'session', }); - return undefined; } return c.json( { error: 'invalid_request', error_description: 'Organization slug is required in URL' }, @@ -539,7 +539,7 @@ export class MultiTenantProvider implements WorkspaceProvider { const role = await getMembershipRole(requestedOrgId, session.session.userId); if (role) { - await setContextAndContinue({ + return setContextAndContinue({ mcpIsAuthenticated: true, organizationId: requestedOrgId, memberRole: role, @@ -547,7 +547,6 @@ export class MultiTenantProvider implements WorkspaceProvider { session: session.session, authSource: 'session', }); - return undefined; } // Non-member: only allow through for public-readable endpoints @@ -560,7 +559,7 @@ export class MultiTenantProvider implements WorkspaceProvider { 403 ); } - await setContextAndContinue({ + return setContextAndContinue({ mcpIsAuthenticated: false, organizationId: requestedOrgId, memberRole: null, @@ -568,7 +567,6 @@ export class MultiTenantProvider implements WorkspaceProvider { session: session.session, authSource: 'session', }); - return undefined; } } catch { // Session validation failed, continue to anonymous @@ -593,8 +591,7 @@ export class MultiTenantProvider implements WorkspaceProvider { // 3) Anonymous: allow through with null org for discovery (tools/list, initialize) // tools/call will enforce org context at the handler level. if (!requestedOrgId) { - await setContextAndContinue({ organizationId: null, memberRole: null }); - return undefined; + return setContextAndContinue({ organizationId: null, memberRole: null }); } if (!allowOrgLevelPublicRead && !allowAnonymousPublicOrgMcp) { @@ -608,11 +605,10 @@ export class MultiTenantProvider implements WorkspaceProvider { ); } - await setContextAndContinue({ + return setContextAndContinue({ organizationId: requestedOrgId, memberRole: null, }); - return undefined; } async listOrganizations(search?: string, userId?: string | null): Promise { diff --git a/packages/server/src/workspace/types.ts b/packages/server/src/workspace/types.ts index 45f640a97..113312b34 100644 --- a/packages/server/src/workspace/types.ts +++ b/packages/server/src/workspace/types.ts @@ -1,6 +1,27 @@ -import type { Context, Next } from 'hono'; +import type { Context } from 'hono'; import type { Env } from '../index'; +/** + * Widened `next` for `resolveAuth`. + * + * Hono's own `Next` is `() => Promise`. That's correct when `mcpAuth` + * is used as `app.use('/path', mcpAuth)` — the inner handler chain returns + * void. + * + * `mcpAuth` is also used as `mcpAuth(c, async () => { ...; return c.json(...); })` + * (see the `/api/workers/*` gating middleware in `index.ts`) — the cb may + * short-circuit by returning a `Response`. If `resolveAuth`'s `next` is + * typed as `Promise`, TypeScript collapses the cb's Response return + * type, the helpers inside `resolveAuth` infer `Promise`, and every + * caller has to choose between discarding the cb's return (silent 500s — + * see Bug B fix doc) or fighting the type checker. + * + * Widening to `Promise` lets the cb's `Response` flow back + * through `setContextAndContinue → next()` and out to Hono's `dispatch` + * which sets `c.res` correctly. + */ +export type ResolveAuthNext = () => Promise; + export interface OrgInfo { id: string; name: string; @@ -33,7 +54,7 @@ export interface WorkspaceProvider { init(): Promise; /** Hono middleware: resolve auth + workspace context for a request */ - resolveAuth(c: HonoContext, next: Next): Promise; + resolveAuth(c: HonoContext, next: ResolveAuthNext): Promise; /** List organizations the user is a member of */ listOrganizations(search?: string, userId?: string | null): Promise; From 7dc32dd99e48f9856904102ea03df1eb27fcdb93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 19:42:36 +0100 Subject: [PATCH 2/2] fix(embedded-worker): sanitize connector env + resolve bundled CLI source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two follow-ups from pi review: 1. Secret leak: passing the gateway's full env into WorkerDaemon would spread DATABASE_URL / ENCRYPTION_KEY / BETTER_AUTH_SECRET / provider secrets onto every connector subprocess (`SubprocessExecutor.fork` does `{...pickSystemEnv(), ...context.env}`). The standalone connector-worker CLI deliberately whitelists which env vars connectors see via `buildEnv()`. Extract that whitelist to its own module (`packages/connector-worker/src/env.ts::buildConnectorWorkerEnv`) so the embedded daemon can re-use it without pulling in `bin.ts`'s top-level `main()` execution. 2. Published-CLI connector resolution: the worker-side compile resolver in `packages/connector-worker/src/compile-connector.ts` didn't include a candidate for `node_modules/@lobu/cli/dist/connectors`, where `packages/cli/scripts/build.cjs` actually copies bundled connector sources. In the monorepo this didn't matter because `packages/connectors/src` was reachable via the `../../../connectors/src` candidate, so my repro passed — but a fresh `npx @lobu/cli` install would have claimed every sync run and failed it with "did not resolve to a local source file". Add `resolve(HERE, 'connectors')` so the bundled-CLI layout works. --- packages/connector-worker/src/bin.ts | 22 +++--------- .../connector-worker/src/compile-connector.ts | 11 ++++++ packages/connector-worker/src/env.ts | 34 +++++++++++++++++++ .../scheduled/embedded-connector-worker.ts | 26 ++++++++++---- 4 files changed, 69 insertions(+), 24 deletions(-) create mode 100644 packages/connector-worker/src/env.ts diff --git a/packages/connector-worker/src/bin.ts b/packages/connector-worker/src/bin.ts index 7ba42d4eb..8fa21146c 100644 --- a/packages/connector-worker/src/bin.ts +++ b/packages/connector-worker/src/bin.ts @@ -10,8 +10,8 @@ import { randomUUID } from 'node:crypto'; import { createRequire } from 'node:module'; -import type { Env } from '@lobu/connector-sdk'; import { startDaemon } from './daemon/index.js'; +import { buildConnectorWorkerEnv } from './env.js'; import { assertExternalDepsResolvable } from './runtime-deps.js'; function printUsage(): void { @@ -68,22 +68,10 @@ function parseArgs(args: string[]): { command: string; options: Record { const args = process.argv.slice(2); diff --git a/packages/connector-worker/src/compile-connector.ts b/packages/connector-worker/src/compile-connector.ts index 59151b8e5..00b5840ad 100644 --- a/packages/connector-worker/src/compile-connector.ts +++ b/packages/connector-worker/src/compile-connector.ts @@ -38,11 +38,22 @@ import { EXTERNAL_RUNTIME_DEPS } from './runtime-deps.js'; // gateway-supplied absolute paths. const HERE = fileURLToPath(new URL('.', import.meta.url)); const WORKER_CONNECTOR_DIR_CANDIDATES = [ + // Monorepo: workspace package at packages/connector-worker/src/ → + // packages/connectors/src. resolve(HERE, '../../../connectors/src'), resolve(HERE, '../../../connectors/dist'), resolve(HERE, '../../connectors/src'), resolve(HERE, '../../connectors/dist'), resolve(HERE, '../connectors/src'), + // Embedded CLI install (`npx @lobu/cli`): the start-local bundle and the + // connectors directory ship side-by-side under + // node_modules/@lobu/cli/dist/. When the bundled connector-worker code is + // running from that bundle, `HERE` resolves to that same dist dir, so + // `./connectors` is the layout the CLI build produced + // (packages/cli/scripts/build.cjs::copyDirIfExists('../connectors/src', + // 'dist/connectors')). Without this, the embedded worker claims runs and + // fails them with "did not resolve to a local source file". + resolve(HERE, 'connectors'), resolve(process.cwd(), 'packages/connectors/src'), resolve(process.cwd(), 'connectors'), ]; diff --git a/packages/connector-worker/src/env.ts b/packages/connector-worker/src/env.ts new file mode 100644 index 000000000..05bc27706 --- /dev/null +++ b/packages/connector-worker/src/env.ts @@ -0,0 +1,34 @@ +/** + * Connector-runtime env whitelist. + * + * Connector subprocesses (`SubprocessExecutor.fork`) inherit + * `context.env`, which becomes `process.env` inside the connector child. + * The standalone `connector-worker` CLI builds this set deliberately so + * connectors only see the env vars they actually need (GitHub token, + * provider API keys, etc.) — never the host process's secrets. + * + * Used by both the standalone CLI (`bin.ts`) and the in-process embedded + * worker (`packages/server/src/scheduled/embedded-connector-worker.ts`). + * Lives in its own module so the embedded worker can import the helper + * without pulling in `bin.ts`'s top-level `main()` call (which would + * print CLI usage and `process.exit` on startup). + */ + +import type { Env } from '@lobu/connector-sdk'; + +export function buildConnectorWorkerEnv(): Env { + return { + ENVIRONMENT: process.env.ENVIRONMENT || 'production', + GITHUB_TOKEN: process.env.GITHUB_TOKEN, + GOOGLE_MAPS_API_KEY: process.env.GOOGLE_MAPS_API_KEY, + X_USERNAME: process.env.X_USERNAME, + X_PASSWORD: process.env.X_PASSWORD, + X_EMAIL: process.env.X_EMAIL, + X_2FA_SECRET: process.env.X_2FA_SECRET, + X_COOKIES: process.env.X_COOKIES, + REDDIT_CLIENT_ID: process.env.REDDIT_CLIENT_ID, + REDDIT_CLIENT_SECRET: process.env.REDDIT_CLIENT_SECRET, + REDDIT_USER_AGENT: process.env.REDDIT_USER_AGENT, + WORKER_API_TOKEN: process.env.WORKER_API_TOKEN, + }; +} diff --git a/packages/server/src/scheduled/embedded-connector-worker.ts b/packages/server/src/scheduled/embedded-connector-worker.ts index 0f4b19b39..eca6cc8cb 100644 --- a/packages/server/src/scheduled/embedded-connector-worker.ts +++ b/packages/server/src/scheduled/embedded-connector-worker.ts @@ -20,6 +20,7 @@ import { hostname } from 'node:os'; import { WorkerDaemon } from '../../../connector-worker/src/daemon/worker'; +import { buildConnectorWorkerEnv } from '../../../connector-worker/src/env'; import type { Env } from '../index'; import logger from '../utils/logger'; @@ -41,7 +42,7 @@ export interface EmbeddedConnectorWorkerHandle { * isn't ready yet. */ export function startEmbeddedConnectorWorker( - env: Env, + serverEnv: Env, apiUrl: string ): EmbeddedConnectorWorkerHandle | null { if (process.env.LOBU_DISABLE_EMBEDDED_WORKER === '1') { @@ -50,28 +51,39 @@ export function startEmbeddedConnectorWorker( } const workerId = `embedded:${hostname() || 'localhost'}:${process.pid}`; + // Connector subprocesses inherit `context.env` from the WorkerDaemon's + // `env` arg (`SubprocessExecutor.fork` spreads it onto `pickSystemEnv`). + // Passing the gateway's full env would leak ENCRYPTION_KEY, + // BETTER_AUTH_SECRET, DATABASE_URL, and provider secrets into every + // connector run. Re-use the same whitelist the standalone connector-worker + // CLI applies in `packages/connector-worker/src/bin.ts::buildConnectorWorkerEnv`. + const connectorEnv = buildConnectorWorkerEnv(); const daemon = new WorkerDaemon( { apiUrl, workerId, - workerApiToken: env.WORKER_API_TOKEN, + workerApiToken: serverEnv.WORKER_API_TOKEN, capabilities: { browser: false }, pollIntervalMs: DEFAULT_POLL_INTERVAL_MS, maxConcurrentJobs: 1, }, - env + connectorEnv ); - // Fire-and-forget. WorkerDaemon.start() loops until stop() and surfaces - // poll errors via console.error internally; failure here is recoverable - // (next tick retries) so we don't want to crash the gateway boot. + // Fire-and-forget. `WorkerDaemon.start()` does a one-shot + // `GET /api/health` check up front and throws on failure — if that + // throws, the .catch logs once and the worker is dead until process + // restart (no exponential-retry built into the daemon). Future + // hardening could re-spawn on startup failure, but the listen-callback + // ordering in start-local.ts / server.ts already makes this path + // succeed in practice. void daemon .start() .then(() => logger.info({ workerId }, '[embedded-worker] stopped cleanly')) .catch((err) => { logger.error( { err, workerId }, - '[embedded-worker] crashed; runs(run_type=sync) will not drain until restart' + '[embedded-worker] failed to start or crashed mid-loop; runs(run_type=sync) will not drain until restart' ); });