Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions docs/fix-sync-loop-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# fix-sync-loop design

Two bugs block the headless `lobu run → lobu apply → trigger_feed → events appear` data sync loop on a fresh install. Both confirmed via local repro against the embedded server (PGlite, port 8802).

## Bug A — embedded mode never executes `runs(run_type='sync')`

`packages/server/src/tools/admin/manage_feeds.ts:483-509` and `packages/server/src/scheduled/check-due-feeds.ts` both insert pending `runs` rows. They are claimed and executed by the **out-of-process connector-worker daemon** (`packages/connector-worker/src/daemon/worker.ts` + `executor.ts`), which polls `/api/workers/poll` over HTTP. `lobu run` (embedded mode) boots the gateway + task scheduler but never starts the daemon, so feed-sync rows sit in `pending` forever. No `events` ever land.

`packages/server/src/lib/feed-sync.ts::runFeed` exists but only calls `executeCompiledConnector` and discards results — it does NOT persist `events`. The persistence path is the daemon's `client.stream(...)` call, which lands in `worker-api.ts::streamContent` and `insertEvent`. So a "tick that calls `runFeed` per pending run" would not produce events. We need the actual claim → execute → stream → complete pipeline.

### Fix

Run the connector-worker daemon **in-process** under `start-local.ts` / `server.ts` immediately after `bootTaskScheduler`, pointed at `http://127.0.0.1:${PORT}`. Same code path the standalone daemon uses; same atomic claim SQL (`packages/server/src/worker-api.ts::pollWorkerJob`); same complete/stream wiring. No new claim semantics, no double-execution concern — the daemon and any external worker fleet would coordinate via the existing `FOR UPDATE OF r SKIP LOCKED` claim filter.

Implementation:
- New module `packages/server/src/scheduled/embedded-connector-worker.ts` that **constructs `WorkerDaemon` directly** (NOT `startDaemon` — that installs signal handlers + `process.exit`, wrong for in-process use), then `void daemon.start().catch(logger.error)`.
- Started **after `httpServer.listen()` callback fires** so the daemon's boot-time `/api/health` check can resolve. In `start-local.ts`/`server.ts`, move the embedded-daemon spawn into the listen callback (or use a setImmediate post-listen).
- Wired into both `server.ts` and `start-local.ts`. Default ON in embedded mode; opt-out via `LOBU_DISABLE_EMBEDDED_WORKER=1` (e.g. prod with external fleet).
- Stable `worker_id` = `embedded:${hostname()}:${pid}` so claims are attributable in logs.
- Shutdown: call `daemon.stop()` + `await daemon.waitForActiveJobs(30_000)` from the existing `shutdown(signal)` path. Note: `stop()` only flips the running flag; it does NOT interrupt the in-flight `sleep(pollIntervalMs)`. The wait covers in-flight jobs; the daemon exits within `pollIntervalMs` after.

### Race / correctness

- Atomic claim already exists in `worker-api.ts::pollWorkerJob` (`FOR UPDATE OF r SKIP LOCKED LIMIT 1`). Embedded + external daemons can co-exist; whichever calls `/api/workers/poll` first wins the row. No double-execute.
- Heartbeat-lost reaper (`startStaleRunReaper`) already handles crashed/killed runs.
- Default OFF when `WORKER_API_TOKEN` is set AND `LOBU_DISABLE_EMBEDDED_WORKER=1` — so prod with external fleet can opt out.

## Bug B — `/api/workers/poll` 500s with `RangeError: init["status"] must be in the range of 200 to 599`

Repro: hit `/api/workers/poll` with a Bearer that resolves to a valid Better-Auth session (i.e. the install_operator). Server returns 500.

Stack:
```
RangeError at undici/initializeResponse
new Response(body, init)
at [getResponseCache] (@hono/node-server)
at get headers (@hono/node-server)
at set res (hono/context.js:133)
at dispatch (hono/compose.js:38)
```
Comment on lines +33 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add a language identifier to the fenced stack-trace block.

This currently triggers markdownlint MD040 and can fail docs linting pipelines.

Suggested fix
-```
+```text
 RangeError at undici/initializeResponse
   new Response(body, init)
   at [getResponseCache] (`@hono/node-server`)
   at get headers (`@hono/node-server`)
   at set res (hono/context.js:133)
   at dispatch (hono/compose.js:38)
</details>

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>

[warning] 33-33: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @docs/fix-sync-loop-design.md around lines 33 - 40, Update the fenced code
block that begins with "RangeError at undici/initializeResponse" to include a
language identifier (use "text") after the opening backticks so the block
becomes ```text; this fixes markdownlint MD040 and prevents docs lint failures
for the stack-trace block in the document.


</details>

<!-- fingerprinting:phantom:triton:hawk -->

<!-- This is an auto-generated comment by CodeRabbit -->


Trap output (instrumented `globalThis.Response` constructor):
- `body` = `(data, arg, headers) => this.#newResponse(data, arg, headers)` — i.e. the Hono Context's `c.body` method
- `init` = a Hono `Context` object (with `init.status` = the `c.status` method, which is a Function)

### Root cause

`packages/server/src/workspace/multi-tenant.ts::resolveAuth` is invoked two ways:
1. As a Hono middleware: `app.use('/foo', mcpAuth)` — `next` returns `Promise<void>`.
2. As a wrapped call: `app.use('/api/workers/*', async (c, next) => mcpAuth(c, async () => { ... return c.json(...); }))` — the cb's return value is a `Response`.

Inside `resolveAuth`, every branch uses the same pattern:
```ts
await setContextAndContinue({...});
return undefined;
```

`setContextAndContinue` returns `next()`. In case (2), `next` is the cb; the cb returns a `Response`; `setContextAndContinue` returns that `Response`. The caller **awaits then discards** it and returns `undefined`.

When the workers/* middleware's cb returns `c.json(..., 403)` (e.g. "Worker token missing device_worker:run scope" — session auth populates `mcpIsAuthenticated=true` but never sets `mcpAuthInfo`, so the scopes check fails), that 403 Response is lost. `mcpAuth` returns `undefined`. The workers middleware returns `undefined`. Hono compose sees `res=undefined && context.finalized=false` → does NOT set `c.res` AND advances to next handler (because the OUTER `next` was called via `setContextAndContinue → cb`-wait, actually the cb returned BEFORE calling outer `next`, so Hono should stop). The actual mechanism for the bad Response getting into `c.res` is somewhere downstream re-wraps via `c.header()`'s line 211 finalized-path which does `this.#res = createResponseInstance(this.#res.body, this.#res)` — but the upstream root cause is the discarded Response.

### Fix

Change `resolveAuth` so it propagates `setContextAndContinue`'s return value instead of discarding it. All eight call sites switch from:
```ts
await setContextAndContinue({...});
return undefined;
```
to:
```ts
return setContextAndContinue({...});
```

(`setContextAndContinue` already returns a Promise resolving to whatever `next()` resolves to. For middleware-style use, `next()` resolves to `undefined` — behavior preserved. For wrapped-cb use, the cb's Response now propagates.)

**Type widening required.** `WorkspaceProvider.resolveAuth` currently takes Hono `Next` (`() => Promise<void>`). `mcpAuth` already widens its `next` param to `() => Promise<unknown>` and casts to `Next` before calling `resolveAuth`. To make TypeScript see the cb's `Response` return value flow through, widen `WorkspaceProvider.resolveAuth`'s `next` param to `() => Promise<Response | undefined | void>` in `packages/server/src/workspace/types.ts`, and stop the `as Next` cast in `auth/middleware.ts`. Existing call sites that pass Hono's Next still typecheck (void is a subtype here).

### Validation

- Curl `/api/workers/poll` with the install_operator's signed session-token bearer: must return JSON (403 "missing device_worker:run scope" if session-only; 200 next_poll if PAT with proper scope). Not 500.
- Existing PAT/OAuth paths unchanged (cb already returns through `setContextAndContinue → next()`).

## Test plan

1. `make build-packages` → clean.
2. `make typecheck` → clean.
3. Boot `lobu run` against PGlite (port 8802). Sign in as install_operator. Trigger a feed via `manage_feeds.trigger_feed`. Wait ≤ 30s. Query `events` table — must have new rows. (Bug A fix.)
4. Curl `POST /api/workers/poll` with a session-token Bearer. Must return 200/403/204 — never 500. Grep server log for "RangeError" — none. (Bug B fix.)

## Schema / migrations

None. No new tables, no column changes.
22 changes: 5 additions & 17 deletions packages/connector-worker/src/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import { randomUUID } from 'node:crypto';
import { createRequire } from 'node:module';
import type { Env } from '@lobu/connector-sdk';
import { startDaemon } from './daemon/index.js';
import { buildConnectorWorkerEnv } from './env.js';
import { assertExternalDepsResolvable } from './runtime-deps.js';

function printUsage(): void {
Expand Down Expand Up @@ -68,22 +68,10 @@ function parseArgs(args: string[]): { command: string; options: Record<string, s
return { command, options };
}

function buildEnv(): Env {
return {
ENVIRONMENT: process.env.ENVIRONMENT || 'production',
GITHUB_TOKEN: process.env.GITHUB_TOKEN,
GOOGLE_MAPS_API_KEY: process.env.GOOGLE_MAPS_API_KEY,
X_USERNAME: process.env.X_USERNAME,
X_PASSWORD: process.env.X_PASSWORD,
X_EMAIL: process.env.X_EMAIL,
X_2FA_SECRET: process.env.X_2FA_SECRET,
X_COOKIES: process.env.X_COOKIES,
REDDIT_CLIENT_ID: process.env.REDDIT_CLIENT_ID,
REDDIT_CLIENT_SECRET: process.env.REDDIT_CLIENT_SECRET,
REDDIT_USER_AGENT: process.env.REDDIT_USER_AGENT,
WORKER_API_TOKEN: process.env.WORKER_API_TOKEN,
};
}
// Connector-runtime env whitelist now lives in `./env.ts` so the in-process
// embedded worker can import it without pulling in `bin.ts`'s top-level
// `main()` execution.
const buildEnv = buildConnectorWorkerEnv;

async function main(): Promise<void> {
const args = process.argv.slice(2);
Expand Down
11 changes: 11 additions & 0 deletions packages/connector-worker/src/compile-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,22 @@ import { EXTERNAL_RUNTIME_DEPS } from './runtime-deps.js';
// gateway-supplied absolute paths.
const HERE = fileURLToPath(new URL('.', import.meta.url));
const WORKER_CONNECTOR_DIR_CANDIDATES = [
// Monorepo: workspace package at packages/connector-worker/src/ →
// packages/connectors/src.
resolve(HERE, '../../../connectors/src'),
resolve(HERE, '../../../connectors/dist'),
resolve(HERE, '../../connectors/src'),
resolve(HERE, '../../connectors/dist'),
resolve(HERE, '../connectors/src'),
// Embedded CLI install (`npx @lobu/cli`): the start-local bundle and the
// connectors directory ship side-by-side under
// node_modules/@lobu/cli/dist/. When the bundled connector-worker code is
// running from that bundle, `HERE` resolves to that same dist dir, so
// `./connectors` is the layout the CLI build produced
// (packages/cli/scripts/build.cjs::copyDirIfExists('../connectors/src',
// 'dist/connectors')). Without this, the embedded worker claims runs and
// fails them with "did not resolve to a local source file".
resolve(HERE, 'connectors'),
resolve(process.cwd(), 'packages/connectors/src'),
resolve(process.cwd(), 'connectors'),
];
Expand Down
34 changes: 34 additions & 0 deletions packages/connector-worker/src/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Connector-runtime env whitelist.
*
* Connector subprocesses (`SubprocessExecutor.fork`) inherit
* `context.env`, which becomes `process.env` inside the connector child.
* The standalone `connector-worker` CLI builds this set deliberately so
* connectors only see the env vars they actually need (GitHub token,
* provider API keys, etc.) — never the host process's secrets.
*
* Used by both the standalone CLI (`bin.ts`) and the in-process embedded
* worker (`packages/server/src/scheduled/embedded-connector-worker.ts`).
* Lives in its own module so the embedded worker can import the helper
* without pulling in `bin.ts`'s top-level `main()` call (which would
* print CLI usage and `process.exit` on startup).
*/

import type { Env } from '@lobu/connector-sdk';

export function buildConnectorWorkerEnv(): Env {
return {
ENVIRONMENT: process.env.ENVIRONMENT || 'production',
GITHUB_TOKEN: process.env.GITHUB_TOKEN,
GOOGLE_MAPS_API_KEY: process.env.GOOGLE_MAPS_API_KEY,
X_USERNAME: process.env.X_USERNAME,
X_PASSWORD: process.env.X_PASSWORD,
X_EMAIL: process.env.X_EMAIL,
X_2FA_SECRET: process.env.X_2FA_SECRET,
X_COOKIES: process.env.X_COOKIES,
REDDIT_CLIENT_ID: process.env.REDDIT_CLIENT_ID,
REDDIT_CLIENT_SECRET: process.env.REDDIT_CLIENT_SECRET,
REDDIT_USER_AGENT: process.env.REDDIT_USER_AGENT,
WORKER_API_TOKEN: process.env.WORKER_API_TOKEN,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Remove WORKER_API_TOKEN from connector subprocess env whitelist.

This token is for worker↔gateway auth and is already passed to daemon config; exposing it through context.env leaks it into every connector child process.

Suggested fix
 export function buildConnectorWorkerEnv(): Env {
   return {
@@
-    WORKER_API_TOKEN: process.env.WORKER_API_TOKEN,
   };
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
WORKER_API_TOKEN: process.env.WORKER_API_TOKEN,
export function buildConnectorWorkerEnv(): Env {
return {
};
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/connector-worker/src/env.ts` at line 32, Remove WORKER_API_TOKEN
from the connector subprocess environment whitelist so it is not propagated into
connector child processes; locate the env whitelist (the object/constant that
includes the key WORKER_API_TOKEN in packages/connector-worker/src/env.ts or any
code that builds context.env) and delete that entry (or stop adding
process.env.WORKER_API_TOKEN) so the token remains only in daemon/worker config
and is not exposed to connectors.

};
}
23 changes: 13 additions & 10 deletions packages/server/src/auth/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import type { Context, Next } from 'hono';
import type { Env } from '../index';
import { getWorkspaceProvider } from '../workspace';
import type { ResolveAuthNext } from '../workspace/types';
import { createAuth } from './index';
import type { AuthInfo } from './oauth/types';

Expand Down Expand Up @@ -93,15 +94,17 @@ export async function requireAuth(c: Context<{ Bindings: Env }>, next: Next) {
* Middleware: MCP authentication (optional auth for MCP endpoints)
* Delegates entirely to WorkspaceProvider.resolveAuth.
*
* `next` is widened past Hono's `Next` so callers that use `mcpAuth(c, cb)`
* with an `async` callback that may short-circuit by returning a `Response`
* (e.g. the /api/workers/* gating middleware) still typecheck — Hono's own
* `Next` (`() => Promise<void>`) is a subtype of this, so the normal
* `app.use(..., mcpAuth)` usage is unchanged.
* `next` is typed as `ResolveAuthNext` (`() => Promise<Response | void>`) so
* callers that use `mcpAuth(c, cb)` with an `async` callback that may
* short-circuit by returning a `Response` (e.g. the /api/workers/* gating
* middleware) actually have that Response propagate back. Hono's own `Next`
* (`() => Promise<void>`) is a subtype, so plain `app.use(..., mcpAuth)`
* usage is unchanged.
*
* See `workspace/types.ts::ResolveAuthNext` for the widening rationale —
* the prior `next as Next` cast silently dropped the cb's Response and
* caused Bug B (workers/poll 500s with the undici RangeError).
*/
export async function mcpAuth(
c: Context<{ Bindings: Env }>,
next: () => Promise<unknown>
) {
return getWorkspaceProvider().resolveAuth(c, next as Next);
export async function mcpAuth(c: Context<{ Bindings: Env }>, next: ResolveAuthNext | Next) {
return getWorkspaceProvider().resolveAuth(c, next as ResolveAuthNext);
}
96 changes: 96 additions & 0 deletions packages/server/src/scheduled/embedded-connector-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Embedded connector-worker daemon.
*
* In embedded mode (`lobu run` / `bun run dev`), the gateway and the
* connector-worker run in the same Node process. Previously only the
* gateway booted, which meant `runs(run_type='sync')` rows sat in
* `pending` forever — nothing called `/api/workers/poll`. `manage_feeds`
* with `trigger_feed` would happily enqueue a run; no events ever
* landed.
*
* This module spins up the existing `WorkerDaemon` in-process, pointed
* at the local gateway (`http://127.0.0.1:${PORT}`). The atomic claim
* already lives in `worker-api.ts::pollWorkerJob` (`FOR UPDATE OF r SKIP
* LOCKED LIMIT 1` + `claimed_by = worker_id`), so an embedded daemon and
* any external fleet worker co-exist without double-execution.
*
* Opt-out: set `LOBU_DISABLE_EMBEDDED_WORKER=1` (prod deployments with a
* separate connector-worker pod).
*/

import { hostname } from 'node:os';
import { WorkerDaemon } from '../../../connector-worker/src/daemon/worker';
import { buildConnectorWorkerEnv } from '../../../connector-worker/src/env';
import type { Env } from '../index';
import logger from '../utils/logger';

const DEFAULT_POLL_INTERVAL_MS = 5_000;

export interface EmbeddedConnectorWorkerHandle {
/** Stop polling. In-flight jobs continue to completion (or `wait()`). */
stop(): void;
/** Wait for any in-flight jobs to drain after `stop()`. */
wait(timeoutMs?: number): Promise<boolean>;
}

/**
* Start the embedded connector-worker. Returns a handle the server's
* shutdown path can use to drain in-flight runs.
*
* Must be called AFTER the HTTP server is listening — `WorkerDaemon.start()`
* does a `GET /api/health` check up-front, which will fail if the listener
* isn't ready yet.
*/
export function startEmbeddedConnectorWorker(
serverEnv: Env,
apiUrl: string
): EmbeddedConnectorWorkerHandle | null {
if (process.env.LOBU_DISABLE_EMBEDDED_WORKER === '1') {
logger.info('[embedded-worker] disabled via LOBU_DISABLE_EMBEDDED_WORKER=1');
return null;
}

const workerId = `embedded:${hostname() || 'localhost'}:${process.pid}`;
// Connector subprocesses inherit `context.env` from the WorkerDaemon's
// `env` arg (`SubprocessExecutor.fork` spreads it onto `pickSystemEnv`).
// Passing the gateway's full env would leak ENCRYPTION_KEY,
// BETTER_AUTH_SECRET, DATABASE_URL, and provider secrets into every
// connector run. Re-use the same whitelist the standalone connector-worker
// CLI applies in `packages/connector-worker/src/bin.ts::buildConnectorWorkerEnv`.
const connectorEnv = buildConnectorWorkerEnv();
const daemon = new WorkerDaemon(
{
apiUrl,
workerId,
workerApiToken: serverEnv.WORKER_API_TOKEN,
capabilities: { browser: false },
pollIntervalMs: DEFAULT_POLL_INTERVAL_MS,
maxConcurrentJobs: 1,
},
connectorEnv
);

// Fire-and-forget. `WorkerDaemon.start()` does a one-shot
// `GET /api/health` check up front and throws on failure — if that
// throws, the .catch logs once and the worker is dead until process
// restart (no exponential-retry built into the daemon). Future
// hardening could re-spawn on startup failure, but the listen-callback
// ordering in start-local.ts / server.ts already makes this path
// succeed in practice.
void daemon
.start()
.then(() => logger.info({ workerId }, '[embedded-worker] stopped cleanly'))
.catch((err) => {
logger.error(
{ err, workerId },
'[embedded-worker] failed to start or crashed mid-loop; runs(run_type=sync) will not drain until restart'
);
});

logger.info({ workerId, apiUrl }, '[embedded-worker] started');

return {
stop: () => daemon.stop(),
wait: (timeoutMs?: number) => daemon.waitForActiveJobs(timeoutMs),
};
}
18 changes: 18 additions & 0 deletions packages/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ async function main() {
// backstop — the lock keeps the two cadences from double-failing rows.
const stopReaper = startStaleRunReaper();

// Embedded connector-worker daemon — polls our own `/api/workers/poll`
// and executes claimed runs in-process. Started AFTER `listen()` below
// so the daemon's boot-time health check resolves. Prod deployments
// running a separate connector-worker pod should set
// `LOBU_DISABLE_EMBEDDED_WORKER=1`.
const { startEmbeddedConnectorWorker } = await import(
'./scheduled/embedded-connector-worker'
);
let embeddedWorker: ReturnType<typeof startEmbeddedConnectorWorker> = null;

const port = parseInt(process.env.PORT || '8787', 10);
const host = process.env.HOST?.trim() || '0.0.0.0';

Expand All @@ -238,6 +248,10 @@ async function main() {
// Graceful shutdown
const shutdown = async (signal: string) => {
logger.info({ signal }, 'Received shutdown signal, stopping gracefully...');
if (embeddedWorker) {
embeddedWorker.stop();
await embeddedWorker.wait(15_000);
}
await vite?.close();
stopReaper();
taskScheduler.stop();
Expand Down Expand Up @@ -308,6 +322,10 @@ async function main() {
logger.error({ err }, 'Connector external dependency check failed');
process.exit(1);
}
// Embedded daemon waits for the listener because its boot health check
// hits `/api/health` on this same process.
const daemonHost = host === '0.0.0.0' ? '127.0.0.1' : host;
embeddedWorker = startEmbeddedConnectorWorker(env, `http://${daemonHost}:${port}`);
});
}

Expand Down
Loading
Loading