From 09a6a15c6bcbcda70856354ef3dc6fd4507e8a9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 02:20:50 +0100 Subject: [PATCH 1/3] refactor(server): extract createServerLifecycle to eliminate Postgres/PGlite drift (#948) Both entries (server.ts, start-local.ts) now call a shared lifecycle spine in server-lifecycle.ts. Middleware ordering, route mounts, httpServer timeouts, shutdown sequence, and signal wiring are identical by construction. Mode differences are confined to four named hooks (databaseReadiness, preListenHooks, postListenHooks, extraTeardown) and per-mode resource construction in the entry files. Closes #948. --- .../src/__tests__/server-lifecycle.test.ts | 347 +++++++ packages/server/src/server-lifecycle.ts | 442 +++++++++ packages/server/src/server.ts | 463 ++------- packages/server/src/start-local.ts | 919 ++++++++---------- 4 files changed, 1268 insertions(+), 903 deletions(-) create mode 100644 packages/server/src/__tests__/server-lifecycle.test.ts create mode 100644 packages/server/src/server-lifecycle.ts diff --git a/packages/server/src/__tests__/server-lifecycle.test.ts b/packages/server/src/__tests__/server-lifecycle.test.ts new file mode 100644 index 000000000..c79ec09df --- /dev/null +++ b/packages/server/src/__tests__/server-lifecycle.test.ts @@ -0,0 +1,347 @@ +/** + * Contract tests for the shared server lifecycle spine. + * + * The point of these tests is to lock the invariants that drift between + * `server.ts` (Postgres) and `start-local.ts` (PGlite) used to break (issue + * #948 + the #943 7-hygiene catch-up): + * + * 1. Middleware ordering on the Hono wrapper: + * peer-address stash → env-inject → sentry-5xx-capture → onError + * 2. Route mounts: `/lobu` mounted only when lobuApp is non-null; `/` always. + * 3. httpServer timeouts: keepAliveTimeout=75000, headersTimeout=76000. + * 4. Shutdown ordering documented in createServerLifecycle(). + * 5. `serializeBootError` walks nested cause chains and never returns `{}`. + * + * The wrapper-app and serializer assertions exercise real code paths; + * the lifecycle-shape assertions read the source so anything renaming the + * shutdown step labels has to update the test in the same PR. + */ + +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { describe, expect, it, vi } from "vitest"; + +vi.mock("@sentry/node", () => ({ + captureException: vi.fn(), + captureMessage: vi.fn(), +})); + +vi.mock("../utils/logger", () => { + const noop = (): void => undefined; + // Recursive `child` is required because several modules (e.g. + // identity/connectors/google.ts) call `logger.child(...)` at module-load + // time. Match pino's interface so any caller's `.info / .warn / .error / + // .child` works without instrumentation. + const make = (): Record => { + const self: Record = { + info: noop, + warn: noop, + error: noop, + debug: noop, + trace: noop, + fatal: noop, + }; + self.child = () => make(); + return self; + }; + const logger = make(); + return { default: logger }; +}); + +vi.mock("../sentry", () => { + const reported = new WeakSet(); + return { + isSentryReported: vi.fn((c: { req: unknown }) => + reported.has(c.req as object), + ), + markSentryReported: vi.fn((c: { req: unknown }) => { + reported.add(c.req as object); + }), + }; +}); + +// The wrapper imports `mainApp` from `./index` to mount at `/`. The real +// module pulls in ~1370 lines of routes + auth + connector graphs we don't +// need here, and forces a Postgres connection at load time. Replace it with +// a real Hono app constructed via async `Hono` import inside the factory so +// the mock matches the same shape the wrapper expects (Hono with `.fetch`). +vi.mock("../index", async () => { + const { Hono } = await import("hono"); + const app = new Hono(); + app.get("/health", (c) => c.text("main-ok")); + return { app }; +}); + +const LIFECYCLE_SOURCE = readFileSync( + join(dirname(fileURLToPath(import.meta.url)), "..", "server-lifecycle.ts"), + "utf8", +); + +describe("serializeBootError", () => { + it("returns message + stack for a plain Error", async () => { + const { serializeBootError } = await import("../server-lifecycle"); + const err = new Error("boom"); + const out = serializeBootError(err); + expect(out.type).toBe("Error"); + expect(out.message).toBe("boom"); + expect(typeof out.stack).toBe("string"); + }); + + it("walks nested cause chains", async () => { + const { serializeBootError } = await import("../server-lifecycle"); + const inner = new Error("inner"); + const outer = new Error("outer", { cause: inner }); + const out = serializeBootError(outer); + expect(out.message).toBe("outer"); + const cause = out.cause as Record | undefined; + expect(cause?.message).toBe("inner"); + }); + + it("preserves ZodError-shaped issues array", async () => { + const { serializeBootError } = await import("../server-lifecycle"); + const err = Object.assign(new Error("validation failed"), { + issues: [{ path: ["DATABASE_URL"], message: "required" }], + }); + const out = serializeBootError(err); + expect(out.issues).toEqual([ + { path: ["DATABASE_URL"], message: "required" }, + ]); + }); + + it("handles non-object values without throwing", async () => { + const { serializeBootError } = await import("../server-lifecycle"); + expect(serializeBootError("a string")).toEqual({ + value: "a string", + type: "string", + }); + expect(serializeBootError(null)).toEqual({ value: "null" }); + expect(serializeBootError(undefined)).toEqual({ value: "undefined" }); + }); +}); + +describe("buildWrapperApp", () => { + it("mounts mainApp at / and lobuApp at /lobu when present", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const { Hono } = await import("hono"); + const lobuApp = new Hono(); + lobuApp.get("/ping", (c) => c.text("lobu-pong")); + const wrapper = buildWrapperApp({} as never, lobuApp); + + const lobuRes = await wrapper.request("/lobu/ping"); + expect(lobuRes.status).toBe(200); + expect(await lobuRes.text()).toBe("lobu-pong"); + + const mainRes = await wrapper.request("/health"); + expect(mainRes.status).toBe(200); + expect(await mainRes.text()).toBe("main-ok"); + }); + + it("skips the /lobu mount when lobuApp is null", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const wrapper = buildWrapperApp({} as never, null); + + const lobuRes = await wrapper.request("/lobu/ping"); + expect(lobuRes.status).toBe(404); + }); + + it("injects env onto c.env without dropping adapter fields", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const { Hono } = await import("hono"); + const lobuApp = new Hono(); + // Probe runs against the lobuApp so it sees the wrapper's middleware. + lobuApp.get("/probe", (c) => { + // env was merged: app secrets are visible + const seenSecret = (c.env as { SECRET?: string }).SECRET; + // adapter field was preserved: `incoming` still set when the runner + // injects it (we set a fake below to prove Object.assign doesn't drop it) + const incoming = (c.env as { incoming?: unknown }).incoming; + return c.json({ seenSecret, hasIncoming: incoming !== undefined }); + }); + const wrapper = buildWrapperApp({ SECRET: "shh" } as never, lobuApp); + + // Hono's `request()` helper doesn't simulate the Node adapter's + // `c.env.incoming`. Bind an `incoming` field via a one-shot middleware + // BEFORE the wrapper's stack runs to mimic what @hono/node-server does. + const outer = new Hono(); + outer.use("*", async (c, next) => { + if (!c.env) c.env = {}; + (c.env as { incoming?: unknown }).incoming = { + socket: { remoteAddress: "127.0.0.1" }, + }; + return next(); + }); + outer.route("/", wrapper); + + const res = await outer.request("/lobu/probe"); + expect(res.status).toBe(200); + const body = (await res.json()) as { + seenSecret: string; + hasIncoming: boolean; + }; + expect(body.seenSecret).toBe("shh"); + expect(body.hasIncoming).toBe(true); + }); + + it("stashes peer remote address into c.var before env-inject runs", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const { Hono } = await import("hono"); + const lobuApp = new Hono(); + lobuApp.get("/peer", (c) => c.text(c.get("peerRemoteAddress") ?? "none")); + const wrapper = buildWrapperApp({} as never, lobuApp); + + const outer = new Hono(); + outer.use("*", async (c, next) => { + if (!c.env) c.env = {}; + (c.env as { incoming?: unknown }).incoming = { + socket: { remoteAddress: "10.0.0.1" }, + }; + return next(); + }); + outer.route("/", wrapper); + + const res = await outer.request("/lobu/peer"); + expect(await res.text()).toBe("10.0.0.1"); + }); + + it("captures 5xx responses to Sentry via the post-response middleware", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const sentry = await import("@sentry/node"); + const { Hono } = await import("hono"); + const lobuApp = new Hono(); + // Routes that try/catch internally and return c.json(..., 500) — the + // framework never sees the exception, so onError doesn't fire. The + // post-response middleware is the only thing that catches these. + lobuApp.get("/silent-500", (c) => c.json({ error: "inner caught" }, 500)); + const wrapper = buildWrapperApp({} as never, lobuApp); + + const res = await wrapper.request("/lobu/silent-500"); + expect(res.status).toBe(500); + expect(sentry.captureMessage).toHaveBeenCalled(); + const calls = (sentry.captureMessage as ReturnType).mock + .calls; + const lastCall = calls[calls.length - 1] ?? []; + const [message, opts] = lastCall; + expect(message).toBe("inner caught"); + expect(opts.level).toBe("error"); + expect(opts.tags.source).toBe("http_response"); + }); + + it("routes thrown exceptions through onError + Sentry.captureException", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const sentry = await import("@sentry/node"); + const { Hono } = await import("hono"); + const lobuApp = new Hono(); + lobuApp.get("/boom", () => { + throw new Error("thrown from route"); + }); + const wrapper = buildWrapperApp({} as never, lobuApp); + + const res = await wrapper.request("/lobu/boom"); + expect(res.status).toBe(500); + expect(sentry.captureException).toHaveBeenCalled(); + const calls = (sentry.captureException as ReturnType).mock + .calls; + const lastCall = calls[calls.length - 1] ?? []; + const [errArg] = lastCall; + expect((errArg as Error).message).toBe("thrown from route"); + }); + + it("does NOT double-report when onError fires after post-response middleware", async () => { + const { buildWrapperApp } = await import("../server-lifecycle"); + const sentry = await import("@sentry/node"); + const { Hono } = await import("hono"); + const captureMessage = sentry.captureMessage as ReturnType; + const captureException = sentry.captureException as ReturnType< + typeof vi.fn + >; + captureMessage.mockClear(); + captureException.mockClear(); + + const lobuApp = new Hono(); + lobuApp.get("/boom", () => { + throw new Error("thrown"); + }); + const wrapper = buildWrapperApp({} as never, lobuApp); + + await wrapper.request("/lobu/boom"); + // onError marks the request as reported via markSentryReported BEFORE + // the post-response middleware runs; the latter must skip the 5xx path. + expect(captureException).toHaveBeenCalledTimes(1); + expect(captureMessage).toHaveBeenCalledTimes(0); + }); +}); + +describe("createServerLifecycle (source-level contract)", () => { + // These assertions read the source file. They exist so a code reviewer + // (human or pi) can't silently reorder shutdown or drop a step without + // updating the test in the same change. Functional ordering is also + // exercised by an explicit grep-and-position check below. + + function indexOf(needle: string): number { + const idx = LIFECYCLE_SOURCE.indexOf(needle); + if (idx === -1) { + throw new Error( + `server-lifecycle.ts: expected substring not found: ${JSON.stringify(needle)}`, + ); + } + return idx; + } + + it("locks httpServer keep-alive timeouts at 75/76s", () => { + expect(LIFECYCLE_SOURCE).toContain("httpServer.keepAliveTimeout = 75_000"); + expect(LIFECYCLE_SOURCE).toContain("httpServer.headersTimeout = 76_000"); + // Header timeout MUST be strictly greater than keep-alive. + expect(76_000).toBeGreaterThan(75_000); + }); + + it("runs databaseReadiness before workspace + gateway init", () => { + const dbReady = indexOf("await databaseReadiness()"); + const workspace = indexOf("await initWorkspaceProvider()"); + const gateway = indexOf("await initLobuGateway()"); + expect(dbReady).toBeLessThan(workspace); + expect(workspace).toBeLessThan(gateway); + }); + + it("runs preListenHooks before httpServer.listen", () => { + const preHooks = indexOf("for (const hook of preListenHooks)"); + const listen = indexOf("httpServer.listen(port, host"); + expect(preHooks).toBeLessThan(listen); + }); + + it("starts the embedded connector worker inside the listen callback", () => { + const listen = indexOf("httpServer.listen(port, host"); + const embedded = indexOf("embeddedWorker = startEmbeddedConnectorWorker"); + const postHooks = indexOf("for (const hook of postListenHooks)"); + expect(embedded).toBeGreaterThan(listen); + expect(postHooks).toBeGreaterThan(listen); + // postListenHooks fire BEFORE the embedded worker so any synchronous + // dep-resolve check can fail-fast without leaving a worker registered. + expect(postHooks).toBeLessThan(embedded); + }); + + it("shuts down in the documented order", () => { + const worker = indexOf("embeddedWorker.stop()"); + const vite = indexOf("await vite?.close()"); + const reaper = indexOf("stopReaper()"); + const scheduler = indexOf("taskScheduler.stop()"); + const gateway = indexOf("await stopLobuGateway()"); + const db = indexOf("await closeDbSingleton()"); + const extra = indexOf("for (const teardown of extraTeardown)"); + const close = indexOf("httpServer.close();"); + + expect(worker).toBeLessThan(vite); + expect(vite).toBeLessThan(reaper); + expect(reaper).toBeLessThan(scheduler); + expect(scheduler).toBeLessThan(gateway); + expect(gateway).toBeLessThan(db); + expect(db).toBeLessThan(extra); + expect(extra).toBeLessThan(close); + }); + + it("registers SIGTERM and SIGINT handlers", () => { + // Accept either quote style — biome may rewrite ' → " on save. + expect(/process\.on\(['"]SIGTERM['"]/.test(LIFECYCLE_SOURCE)).toBe(true); + expect(/process\.on\(['"]SIGINT['"]/.test(LIFECYCLE_SOURCE)).toBe(true); + }); +}); diff --git a/packages/server/src/server-lifecycle.ts b/packages/server/src/server-lifecycle.ts new file mode 100644 index 000000000..bda1fa08a --- /dev/null +++ b/packages/server/src/server-lifecycle.ts @@ -0,0 +1,442 @@ +/** + * Shared server lifecycle spine. + * + * Both entry points — `server.ts` (Postgres) and `start-local.ts` (PGlite) — + * call into `createServerLifecycle()` so middleware ordering, route mounts, + * httpServer timeouts, shutdown sequence, and signal wiring stay identical + * by construction. Drift between the two modes was the root cause of #948; + * the only way to express a per-mode difference now is the four named hooks + * on `ServerLifecycleConfig`. + * + * Do not add `new Hono`, `app.use`, `app.route`, `http.createServer`, or + * `process.on('SIGTERM' | 'SIGINT', …)` to either entry — they belong here. + */ + +import http from "node:http"; +import v8 from "node:v8"; +import { getRequestListener } from "@hono/node-server"; +import * as Sentry from "@sentry/node"; +import { Hono } from "hono"; +import { closeDbSingleton } from "./db/client"; +import { mountViteDev } from "./dev-vite"; +import type { Env } from "./index"; +import { app as mainApp } from "./index"; +import { + getLobuCoreServices, + initLobuGateway, + stopLobuGateway, +} from "./lobu/gateway"; +import { startStaleRunReaper } from "./scheduled/check-stalled-executions"; +import { startEmbeddedConnectorWorker } from "./scheduled/embedded-connector-worker"; +import { bootTaskScheduler } from "./scheduled/jobs"; +import { isSentryReported, markSentryReported } from "./sentry"; +import logger from "./utils/logger"; +import { initWorkspaceProvider } from "./workspace"; + +export type ServerMode = "postgres" | "pglite"; + +export interface ServerLifecycleConfig { + mode: ServerMode; + env: Env; + host: string; + port: number; + /** + * Runs before workspace/gateway init. Postgres asserts the migrations + * ledger matches the bundled migrations dir; PGlite runs them. + */ + databaseReadiness: () => Promise; + /** + * Runs after gateway + scheduler boot, before `httpServer.listen()`. + * PGlite uses this for `ensureInstallOperator` + `ensureDefaultAgent`. + */ + preListenHooks?: Array<() => Promise | void>; + /** + * Runs synchronously inside the `httpServer.listen()` callback, after the + * listener is live but before the embedded connector worker starts. + * Postgres uses this for the connector external-deps resolvability check. + */ + postListenHooks?: Array<() => void>; + /** + * Runs during shutdown AFTER `stopLobuGateway` + `closeDbSingleton`, in + * declared order, before `httpServer.close()`. PGlite uses this for the + * embeddings child kill, socket-server stop, and PGlite db close. + */ + extraTeardown?: Array<() => Promise | void>; +} + +export interface ServerLifecycleHandles { + /** Starts the listener and registers signal handlers. Resolves once listening. */ + start: () => Promise; +} + +/** + * Apply the LOBU_DEV_PROJECT_PATH fallback so downstream + * `buildGatewayConfig()` can derive worker paths even when the server is + * invoked from a package subdir (`cd packages/server && bun run dev`) or + * via `lobu run` from a project subdir. Both entries call this before + * lifecycle construction. + */ +export function applyDevProjectPathDefault(packageRepoRoot: string): void { + if (!process.env.LOBU_DEV_PROJECT_PATH) { + process.env.LOBU_DEV_PROJECT_PATH = packageRepoRoot; + } +} + +/** + * Defensive error → plain-object serializer for the top-level boot catch. + * + * pino's logger registers `err` / `error` serializers, but + * `JSON.stringify(new Error('boom'))` returns `{}` because Error's own + * properties are non-enumerable. If anything drops the pino serializer + * config (older image, bundler tree-shake, etc.), Docker users see only + * `"error":{}` with zero signal — exactly what #766 reported. Walk the + * error manually so the log line always carries message + stack regardless + * of pino config, ZodError `issues`, AggregateError children, or wrapped + * `cause` chains. + */ +export function serializeBootError(err: unknown): Record { + if (err === null || err === undefined) return { value: String(err) }; + if (typeof err !== "object") return { value: String(err), type: typeof err }; + const e = err as Error & { + code?: unknown; + cause?: unknown; + issues?: unknown; + errors?: unknown; + }; + const out: Record = { + type: e?.constructor?.name ?? "Error", + message: typeof e.message === "string" ? e.message : String(e), + }; + if (typeof e.stack === "string") out.stack = e.stack; + if (e.code !== undefined) out.code = e.code; + if (Array.isArray(e.issues)) out.issues = e.issues; + if (Array.isArray(e.errors)) { + out.errors = e.errors.map((child) => serializeBootError(child)); + } + if (e.cause !== undefined && e.cause !== err) { + out.cause = serializeBootError(e.cause); + } + return out; +} + +/** + * Run from each entry's `main().catch(...)`. Logs structured + plain-text + * fallback, then `process.exit(1)`. Never returns. + */ +export function reportBootFailure(err: unknown): never { + const serialized = serializeBootError(err); + logger.error( + { err: serialized, error: serialized }, + "Failed to start server", + ); + process.stderr.write( + `Failed to start server: ${serialized.type ?? "Error"}: ${serialized.message ?? ""}\n`, + ); + if (typeof serialized.stack === "string") { + process.stderr.write(`${serialized.stack}\n`); + } + process.exit(1); +} + +/** + * Build a Hono wrapper app with the canonical middleware stack and route + * mounts. Extracted so the contract test can assert middleware ordering + * without standing up the full lifecycle. + * + * Middleware order (locked): + * 1. peer-remote-address stash (read `c.env.incoming.socket.remoteAddress` + * BEFORE the env-inject middleware replaces shared fields) + * 2. env-inject (`Object.assign(c.env, env)` — preserves + * `c.env.incoming` so the Node adapter's `getConnInfo` keeps working) + * 3. sentry 5xx response capture (for inner-catch returns that never throw) + * 4. `app.onError` for thrown exceptions + * + * Route mounts: + * - `/lobu` → `lobuApp` (only when non-null) + * - `/` → `mainApp` + */ +export function buildWrapperApp( + env: Env, + lobuApp: Hono | null, + mountedMainApp: Hono = mainApp, +): Hono<{ Bindings: Env }> { + const wrapper = new Hono<{ Bindings: Env }>(); + + // 1. peer remote address stash — must run BEFORE env injection because + // env injection mutates `c.env` and could blow away the adapter field. + // @hono/node-server hands the request's IncomingMessage via `c.env.incoming` + // so `getConnInfo` can read `socket.remoteAddress`. Loopback-trust endpoints + // (e.g. `/api/local-init`) need this peer address to enforce their boundary. + wrapper.use("*", async (c, next) => { + const incoming = ( + c.env as + | { incoming?: { socket?: { remoteAddress?: string } } } + | undefined + )?.incoming; + const peerRemoteAddress = incoming?.socket?.remoteAddress ?? null; + if (peerRemoteAddress) c.set("peerRemoteAddress", peerRemoteAddress); + return next(); + }); + + // 2. Env injection — `Object.assign(c.env, env)` merges the app-wide + // config into the Hono adapter's `c.env` WITHOUT dropping adapter-set + // fields like `incoming`. The earlier replace-strategy + // (`c.env = env as Env`) silently broke `getConnInfo` and any other + // helper that read those fields. When `c.env` is undefined (only happens + // outside the Node adapter — e.g. in unit tests via `app.request()`), + // seed it with an empty object so the merge still works. + wrapper.use("*", async (c, next) => { + if (!c.env) c.env = {} as Env; + Object.assign(c.env, env); + return next(); + }); + + // 3. Server-error capture. Two layers cover both shapes of failing route: + // (a) routes that throw — handled by `app.onError` below. + // (b) routes that try/catch internally and `return c.json(..., 500)` — + // the framework never sees the exception, so onError doesn't fire. + // This post-response middleware catches anything with status >= 500 + // so silent 500s still reach Sentry. + // Either layer marks the request reported so we don't double-count. + // `Sentry.captureMessage` no-ops when `Sentry.init` was skipped (no DSN), + // so this is safe to wire unconditionally. + wrapper.use("*", async (c, next) => { + await next(); + if (c.res.status >= 500 && !isSentryReported(c)) { + let body: unknown = null; + try { + body = await c.res.clone().json(); + } catch { + // response wasn't JSON; ignore + } + const message = + (body && + typeof body === "object" && + "error" in body && + typeof (body as { error?: unknown }).error === "string" + ? (body as { error: string }).error + : null) ?? `HTTP ${c.res.status} from ${c.req.method} ${c.req.path}`; + Sentry.captureMessage(message, { + level: "error", + tags: { + source: "http_response", + http_method: c.req.method, + http_status: String(c.res.status), + }, + extra: { + path: c.req.path, + url: c.req.url, + response_body: body, + }, + }); + markSentryReported(c); + } + }); + + // 4. Catch-all error handler for thrown exceptions that bubble past route + // catches. Preserves the original stack trace. + wrapper.onError((err, c) => { + if (!isSentryReported(c)) { + Sentry.captureException(err, { + tags: { + source: "app_onError", + http_method: c.req.method, + }, + extra: { + path: c.req.path, + url: c.req.url, + }, + }); + markSentryReported(c); + } + // `sentryReported:true` tells the pino → Sentry forwarder in logger.ts + // to skip — Sentry already has this exception via captureException above. + logger.error( + { err, path: c.req.path, sentryReported: true }, + "Unhandled error in HTTP handler", + ); + return c.json({ error: "Internal server error" }, 500); + }); + + // Route mounts. `/lobu` is the public Agent API + bundled docs; without + // it `/lobu/api/v1/agents/*` returns 404 (this was the gap behind #940). + if (lobuApp) { + wrapper.route("/lobu", lobuApp); + } + wrapper.route("/", mountedMainApp); + + return wrapper; +} + +/** + * Optional SIGUSR2 → V8 heap snapshot wiring. Off by default because + * snapshots contain in-memory secrets (DB URL, OAuth tokens, secret-proxy + * cache). Operator opts in by setting `ALLOW_HEAP_SNAPSHOT=1`. + * + * Blocks the event loop for ~seconds (proportional to heap size) and + * requires ~heap-size extra memory while writing. Single-flight + fixed + * filename (`/tmp/lobu.heapsnapshot`) so a stuck-on flag can't fill tmpfs. + */ +function maybeWireHeapSnapshot(): void { + if (process.env.ALLOW_HEAP_SNAPSHOT !== "1") return; + const SNAPSHOT_PATH = "/tmp/lobu.heapsnapshot"; + let inProgress = false; + process.on("SIGUSR2", () => { + if (inProgress) { + logger.warn("[heap] SIGUSR2 ignored — snapshot already in progress"); + return; + } + inProgress = true; + logger.warn( + { path: SNAPSHOT_PATH }, + "[heap] SIGUSR2 received — writing heap snapshot (blocks event loop)", + ); + try { + v8.writeHeapSnapshot(SNAPSHOT_PATH); + logger.warn({ path: SNAPSHOT_PATH }, "[heap] snapshot written"); + } catch (err) { + logger.error({ err }, "[heap] writeHeapSnapshot failed"); + } finally { + inProgress = false; + } + }); + logger.warn( + "[heap] ALLOW_HEAP_SNAPSHOT=1 — SIGUSR2 will write heap dumps to " + + SNAPSHOT_PATH + + ". Unset and roll the pod when done; snapshots contain secrets.", + ); +} + +/** + * Build the shared lifecycle. Returns a `start()` that boots the full stack + * per the canonical ordering. Both entries call this with mode-specific + * hooks; everything else is identical by construction. + */ +export function createServerLifecycle( + config: ServerLifecycleConfig, +): ServerLifecycleHandles { + const { + mode, + env, + host, + port, + databaseReadiness, + preListenHooks = [], + postListenHooks = [], + extraTeardown = [], + } = config; + + const start = async (): Promise => { + // 1. Database readiness — Postgres asserts schema; PGlite runs migrations. + await databaseReadiness(); + + // 2. Workspace provider — required before gateway boot. + await initWorkspaceProvider(); + + // 3. Embedded Lobu gateway. Owns the public Agent API mounted at `/lobu` + // by `buildWrapperApp`. + const lobuApp = await initLobuGateway(); + + // 4. Task scheduler. Every periodic platform-internal job — token + // refresh, MCP DB cleanup, watcher automation — runs as a row in + // `public.runs` with cron-driven self-rescheduling. + const taskScheduler = await bootTaskScheduler(getLobuCoreServices(), env); + + // 5. 30s connector-run heartbeat-lost reaper. Cross-pod coordinated + // via advisory lock; the TaskScheduler cron also calls reapStaleRuns() + // every 5min as a backstop without double-failing rows. + const stopReaper = startStaleRunReaper(); + + // 6. Wrapper app + HTTP server. Timeouts are locked at 75/76s so SSE + // streams (MCP) survive idle periods above the typical 60s LB timeout. + const wrapper = buildWrapperApp(env, lobuApp); + const honoListener = getRequestListener(wrapper.fetch); + const httpServer = http.createServer(); + httpServer.keepAliveTimeout = 75_000; + httpServer.headersTimeout = 76_000; + + // 7. Vite dev middleware in development; otherwise wire Hono directly. + const vite = await mountViteDev(httpServer, honoListener); + if (!vite) { + httpServer.on("request", honoListener); + } + + // 8. Pre-listen hooks (PGlite: install-operator + default-agent). + for (const hook of preListenHooks) { + await hook(); + } + + // 9. Shutdown wiring — declared once, called from both SIGTERM and SIGINT. + // Embedded worker handle is captured in the listen callback below. + let embeddedWorker: ReturnType = null; + + const shutdown = async (signal: string): Promise => { + logger.info( + { signal, mode }, + "Received shutdown signal, stopping gracefully...", + ); + // Order matters: + // a. Stop accepting new work from the embedded connector worker. + if (embeddedWorker) { + embeddedWorker.stop(); + await embeddedWorker.wait(15_000); + } + // b. Close Vite (HMR sockets) before tearing down the http server + // so dev-mode listeners detach cleanly. + await vite?.close(); + // c. Stop the reaper poll loop. + stopReaper(); + // d. Stop the task scheduler dispatch loop. + taskScheduler.stop(); + // e. Drain MCP sessions / DB listeners / secret-proxy. Gateway + // holds postgres.js connections that must be released before + // mode-specific db teardown runs. + await stopLobuGateway(); + // f. Close the postgres.js singleton pool. + await closeDbSingleton(); + // g. Mode-specific teardown (PGlite kills embeddings child, stops + // socket server, closes the in-process db). + for (const teardown of extraTeardown) { + await teardown(); + } + // h. Finally, close the listener. Matches the historical behavior of + // not awaiting (server.ts:260; start-local.ts:322). + httpServer.close(); + process.exit(0); + }; + process.on("SIGTERM", () => { + void shutdown("SIGTERM"); + }); + process.on("SIGINT", () => { + void shutdown("SIGINT"); + }); + + // 10. Optional heap-snapshot wiring (gated on ALLOW_HEAP_SNAPSHOT=1). + maybeWireHeapSnapshot(); + + // 11. Listen. Post-listen hooks fire inside the callback so any + // `require.resolve` walks they do (connector dep check) don't add to + // cold-boot/readiness latency. The embedded connector daemon waits for + // the listener because its boot-time health check hits `/api/health`. + logger.info({ host, port, mode }, "Starting server"); + await new Promise((resolve) => { + httpServer.listen(port, host, () => { + logger.info( + { host, port, mode }, + `Lobu running at http://${host}:${port}`, + ); + for (const hook of postListenHooks) { + hook(); + } + const daemonHost = host === "0.0.0.0" ? "127.0.0.1" : host; + embeddedWorker = startEmbeddedConnectorWorker( + env, + `http://${daemonHost}:${port}`, + ); + resolve(); + }); + }); + }; + + return { start }; +} diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 5c92caba2..952b52632 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -1,11 +1,12 @@ /** - * Node.js Server Entry Point + * Node.js Server Entry Point (Postgres mode). * - * This file starts the Hono server with @hono/node-server and sets up: - * - HTTP server with environment injection - * - Vite dev server in development (middleware mode, same port) - * - Scheduled maintenance tasks - * - Sentry error tracking + * Mode-specific bootstrap only. The shared spine + * (Hono wrapper, middleware, route mounts, httpServer timeouts, Vite, + * scheduler boot, signal handlers, shutdown ordering) lives in + * `./server-lifecycle.ts`. DO NOT add `new Hono`, `app.use`, `app.route`, + * `http.createServer`, or `process.on('SIGTERM' | 'SIGINT', …)` here — they + * belong in the lifecycle. */ // Refuse to boot under an unsupported Node major (isolated-vm gate). The @@ -13,376 +14,108 @@ // first one — ESM evaluates sibling imports in textual order, so anything // above this line would otherwise run first and could itself crash on the // unsupported runtime. -import './utils/assert-node-version'; +import "./utils/assert-node-version"; // Sentry must init before any other imports for auto-instrumentation -import './instrument'; +import "./instrument"; -import dotenv from 'dotenv'; +import dotenv from "dotenv"; dotenv.config(); -import http from 'node:http'; -import { createRequire } from 'node:module'; -import path from 'node:path'; -import { fileURLToPath } from 'node:url'; -import v8 from 'node:v8'; -import { getRequestListener } from '@hono/node-server'; -import { Hono } from 'hono'; -import { closeDbSingleton, getDb, probeListenNotify } from './db/client'; -import { mountViteDev } from './dev-vite'; -import type { Env } from './index'; -import { app as mainApp } from './index'; +import { createRequire } from "node:module"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import { assertExternalDepsResolvable } from "@lobu/connector-worker/compile"; +import { getDb, probeListenNotify } from "./db/client"; import { - getLobuCoreServices, - initLobuGateway, - stopLobuGateway, -} from './lobu/gateway'; -import { startStaleRunReaper } from './scheduled/check-stalled-executions'; -import { bootTaskScheduler } from './scheduled/jobs'; -import * as Sentry from '@sentry/node'; -import { assertExternalDepsResolvable } from '@lobu/connector-worker/compile'; -import { isSentryReported, markSentryReported } from './sentry'; -import { getEnvFromProcess } from './utils/env'; -import logger from './utils/logger'; -import { assertSchemaUpToDate } from './utils/schema-version-check'; -import { initWorkspaceProvider } from './workspace'; - -// Create a wrapper app that injects environment into each request -const app = new Hono<{ Bindings: Env }>(); + applyDevProjectPathDefault, + createServerLifecycle, + reportBootFailure, +} from "./server-lifecycle"; +import { getEnvFromProcess } from "./utils/env"; +import logger from "./utils/logger"; +import { assertSchemaUpToDate } from "./utils/schema-version-check"; // Resolve repo root from this source file: …/packages/server/src/server.ts → repo root. const PACKAGE_REPO_ROOT = path.resolve( - fileURLToPath(new URL('.', import.meta.url)), - '../../..' + fileURLToPath(new URL(".", import.meta.url)), + "../../..", ); -// Make LOBU_DEV_PROJECT_PATH defaultable when invoked from the package dir -// (`cd packages/server && bun run dev`). Downstream consumers like -// the embedded gateway's buildGatewayConfig() read this to derive worker -// paths; without this fallback they'd resolve against process.cwd(). -if (!process.env.LOBU_DEV_PROJECT_PATH) { - process.env.LOBU_DEV_PROJECT_PATH = PACKAGE_REPO_ROOT; -} - -// Inject environment variables into Hono context. The snapshot is immutable -// post-boot and callers only read it, so assign it by reference instead of -// spreading it onto a fresh `c.env` object on every request. -const env = getEnvFromProcess(); -app.use('*', async (c, next) => { - // @hono/node-server hands the request's IncomingMessage to handlers via - // c.env.incoming (so `getConnInfo` can read socket.remoteAddress). The - // assignment below replaces c.env with our app-wide config object, which - // would lose that reference; stash the peer address in c.var first so - // handlers that need the actual TCP peer (e.g. `/api/local-init`'s - // loopback-peer defense) can still get at it. - const incoming = (c.env as { incoming?: { socket?: { remoteAddress?: string } } })?.incoming; - const peerRemoteAddress = incoming?.socket?.remoteAddress ?? null; - if (peerRemoteAddress) c.set('peerRemoteAddress', peerRemoteAddress); - c.env = env as Env; - return next(); -}); - -// Server-error capture. Two layers because routes split into two shapes: -// -// (a) routes that throw and let the framework respond — caught by -// `app.onError` below, preserving the full stack trace. -// (b) routes that try/catch internally and `return c.json(..., 500)` — the -// framework never sees the exception, so onError doesn't fire. The -// post-response middleware below catches anything with `status >= 500` -// so silent 500s still reach Sentry, even if the stack is gone. -// -// Either layer marks the request as reported so we don't double-count when -// both paths converge (e.g. an inner catch already called captureServerError). -app.use('*', async (c, next) => { - await next(); - if (c.res.status >= 500 && !isSentryReported(c)) { - let body: unknown = null; - try { - body = await c.res.clone().json(); - } catch { - // response wasn't JSON; ignore - } - const message = - (body && typeof body === 'object' && 'error' in body && typeof (body as { error?: unknown }).error === 'string' - ? (body as { error: string }).error - : null) ?? `HTTP ${c.res.status} from ${c.req.method} ${c.req.path}`; - Sentry.captureMessage(message, { - level: 'error', - tags: { - source: 'http_response', - http_method: c.req.method, - http_status: String(c.res.status), - }, - extra: { - path: c.req.path, - url: c.req.url, - response_body: body, - }, - }); - markSentryReported(c); - } -}); - -// Catch-all error handler for thrown exceptions that bubble past route catches. -// Preserves the original stack trace and returns a generic 500 so handlers -// don't have to remember to wrap themselves. -app.onError((err, c) => { - if (!isSentryReported(c)) { - Sentry.captureException(err, { - tags: { - source: 'app_onError', - http_method: c.req.method, - }, - extra: { - path: c.req.path, - url: c.req.url, - }, - }); - markSentryReported(c); - } - // sentryReported:true tells the pino → Sentry forwarder in logger.ts - // to skip — Sentry already has this exception via the explicit - // captureException above. Without the marker, we'd send the same - // event twice. - logger.error({ err, path: c.req.path, sentryReported: true }, 'Unhandled error in HTTP handler'); - return c.json({ error: 'Internal server error' }, 500); -}); - -/** - * Main server startup - */ -async function main() { - const databaseUrl = process.env.DATABASE_URL?.trim(); - if (!databaseUrl) { - throw new Error( - 'DATABASE_URL is required. Use a PostgreSQL connection string (for local dev run: pnpm dev:all).' - ); - } - process.env.DATABASE_URL = databaseUrl; - - // Refuse to boot if the image expects a migration the database hasn't - // applied. Skippable via SKIP_SCHEMA_VERSION_CHECK=1 for emergency - // forward-flight (e.g. rolling back to an older image whose migrations - // dir is a strict prefix of what's already applied). See - // utils/schema-version-check.ts for the 2026-05-16 incident this guards. - if (process.env.SKIP_SCHEMA_VERSION_CHECK !== '1') { - const migrationsDir = - process.env.LOBU_MIGRATIONS_DIR?.trim() || path.join(PACKAGE_REPO_ROOT, 'db', 'migrations'); - await assertSchemaUpToDate(getDb(), { migrationsDir }); - } else { - logger.warn('[schema-check] SKIP_SCHEMA_VERSION_CHECK=1 — skipping boot-time assertion'); - } - - // Verify LISTEN/NOTIFY actually delivers. This is a *detector*, not a gate: - // the runs-queue has a 200ms SKIP-LOCKED poll fallback that keeps the queue - // correct even when LISTEN is silently dropped (transaction-mode pgbouncer, - // RDS Proxy, etc.). Failing the probe just means wakeup latency degrades to - // the poll interval — not an outage. Log loudly so ops can fix the pooler - // config, but do not refuse to boot. - if (process.env.SKIP_LISTEN_NOTIFY_PROBE !== '1') { - try { - await probeListenNotify(); - logger.info('[DB] LISTEN/NOTIFY probe ok'); - } catch (err) { - logger.warn( - { err }, - '[DB] LISTEN/NOTIFY probe failed — runs-queue will fall back to 200ms poll. Fix the pooler config to restore real-time wakeups.' - ); - } - } - - // Initialize workspace provider - await initWorkspaceProvider(); - - // Initialize embedded Lobu gateway (requires DATABASE_URL) - const lobuApp = await initLobuGateway(); - if (lobuApp) { - app.route('/lobu', lobuApp); - } - - // Mount the main app after any embedded sub-app routes are registered. - app.route('/', mainApp); - - // Boot the unified task scheduler. Every periodic platform-internal job — - // token refresh, MCP DB cleanup, watcher automation, etc. — runs as a row - // in `public.runs` (run_type='task') with cron-driven self-rescheduling. - // Cross-pod coordination is the runs-queue claim path. - const taskScheduler = await bootTaskScheduler(getLobuCoreServices(), env); - - // 30s interval that reaps connector runs whose worker missed heartbeat past - // RUNS_REAPER_STALE_AFTER_SECONDS. Cross-pod coordinated via advisory lock. - // The TaskScheduler cron also calls reapStaleRuns() every 5min as a - // backstop — the lock keeps the two cadences from double-failing rows. - const stopReaper = startStaleRunReaper(); - - // Embedded connector-worker daemon — polls our own `/api/workers/poll` - // and executes claimed runs in-process. Started AFTER `listen()` below - // so the daemon's boot-time health check resolves. Prod deployments - // running a separate connector-worker pod should set - // `LOBU_DISABLE_EMBEDDED_WORKER=1`. - const { startEmbeddedConnectorWorker } = await import( - './scheduled/embedded-connector-worker' - ); - let embeddedWorker: ReturnType = null; - - const port = parseInt(process.env.PORT || '8787', 10); - const host = process.env.HOST?.trim() || '0.0.0.0'; - - const honoListener = getRequestListener(app.fetch); - const httpServer = http.createServer(); - // Increase keep-alive timeout so SSE streams (MCP) survive idle periods. - // Node.js defaults to 5 s, which kills SSE GET connections before async - // 202 tool-call responses can be delivered back via the stream. - httpServer.keepAliveTimeout = 75_000; // 75 s — above typical 60 s LB idle timeout - httpServer.headersTimeout = 76_000; // must be strictly > keepAliveTimeout - - // In development this attaches a Vite dev server (middleware mode, HMR) and - // returns it; in prod (or if Vite fails) it returns null and Hono handles - // every request directly. - const vite = await mountViteDev(httpServer, honoListener); - if (!vite) { - httpServer.on('request', honoListener); - } - - // Graceful shutdown - const shutdown = async (signal: string) => { - logger.info({ signal }, 'Received shutdown signal, stopping gracefully...'); - if (embeddedWorker) { - embeddedWorker.stop(); - await embeddedWorker.wait(15_000); - } - await vite?.close(); - stopReaper(); - taskScheduler.stop(); - await stopLobuGateway(); - await closeDbSingleton(); - httpServer.close(); - process.exit(0); - }; - process.on('SIGTERM', () => shutdown('SIGTERM')); - process.on('SIGINT', () => shutdown('SIGINT')); - - // SIGUSR2 → V8 heap snapshot. Off by default because snapshots contain - // in-memory secrets (DB URL, OAuth tokens, secret-proxy cache) and - // workers spawn as the same Linux UID. Operator opts in by setting - // ALLOW_HEAP_SNAPSHOT=1 on the pod, sends `kubectl exec ... kill -USR2 1`, - // copies the file out, then unsets the env / rolls the pod. - // - // Blocks the event loop for ~seconds (proportional to heap size) and - // requires ~heap-size extra memory while writing. Don't trigger from a - // pod close to the cgroup limit or it will OOM mid-snapshot. Trigger - // also blocks /health/ready (DB SELECT 1) — drain via Service first if - // multi-replica. - // - // Single-flight + fixed filename: subsequent signals while a snapshot is - // in progress are ignored, and the path is `/tmp/lobu.heapsnapshot` - // (overwritten each time) so a stuck-on flag can't fill the tmpfs. - if (process.env.ALLOW_HEAP_SNAPSHOT === '1') { - const SNAPSHOT_PATH = '/tmp/lobu.heapsnapshot'; - let inProgress = false; - process.on('SIGUSR2', () => { - if (inProgress) { - logger.warn('[heap] SIGUSR2 ignored — snapshot already in progress'); - return; - } - inProgress = true; - logger.warn( - { path: SNAPSHOT_PATH }, - '[heap] SIGUSR2 received — writing heap snapshot (blocks event loop)' - ); - try { - v8.writeHeapSnapshot(SNAPSHOT_PATH); - logger.warn({ path: SNAPSHOT_PATH }, '[heap] snapshot written'); - } catch (err) { - logger.error({ err }, '[heap] writeHeapSnapshot failed'); - } finally { - inProgress = false; - } - }); - logger.warn( - '[heap] ALLOW_HEAP_SNAPSHOT=1 — SIGUSR2 will write heap dumps to ' + - SNAPSHOT_PATH + - '. Unset and roll the pod when done; snapshots contain secrets.' - ); - } - - // Start HTTP server - logger.info({ port }, 'Starting server'); - - httpServer.listen(port, host, () => { - logger.info({ host, port }, `Server running at http://${host}:${port}`); - // Crash loud if the runtime image is missing any connector external dep, - // instead of letting every feed silently fail with "Missing npm - // dependency: X" hours later. Run this after listen() so the synchronous - // require.resolve walk doesn't add to cold-boot/readiness latency. - try { - assertExternalDepsResolvable(createRequire(import.meta.url).resolve); - } catch (err) { - logger.error({ err }, 'Connector external dependency check failed'); - process.exit(1); - } - // Embedded daemon waits for the listener because its boot health check - // hits `/api/health` on this same process. - const daemonHost = host === '0.0.0.0' ? '127.0.0.1' : host; - embeddedWorker = startEmbeddedConnectorWorker(env, `http://${daemonHost}:${port}`); - }); -} - -/** - * Defensive error → plain-object serializer for the top-level boot catch. - * - * pino's logger registers `err` / `error` serializers (see utils/logger.ts), - * but `JSON.stringify(new Error('boom'))` returns `{}` because Error's own - * properties are non-enumerable. If anything ever drops the pino serializer - * config (older image, bundler tree-shake, etc.), Docker users see only - * `"error":{}` with zero signal — exactly what issue #766 reported. Walk - * the error manually so the log line always carries message + stack - * regardless of pino config, ZodError `issues`, AggregateError children, - * or wrapped `cause` chains. - */ -function serializeBootError(err: unknown): Record { - if (err === null || err === undefined) return { value: String(err) }; - if (typeof err !== 'object') return { value: String(err), type: typeof err }; - const e = err as Error & { - code?: unknown; - cause?: unknown; - issues?: unknown; - errors?: unknown; - }; - const out: Record = { - type: e?.constructor?.name ?? 'Error', - message: typeof e.message === 'string' ? e.message : String(e), - }; - if (typeof e.stack === 'string') out.stack = e.stack; - if (e.code !== undefined) out.code = e.code; - // ZodError surfaces failing field paths in `issues`; preserve them so the - // log shows *which* env var or config field failed validation. - if (Array.isArray(e.issues)) out.issues = e.issues; - // AggregateError stashes children in `errors`. - if (Array.isArray(e.errors)) { - out.errors = e.errors.map((child) => serializeBootError(child)); - } - if (e.cause !== undefined && e.cause !== err) { - out.cause = serializeBootError(e.cause); - } - return out; +applyDevProjectPathDefault(PACKAGE_REPO_ROOT); + +async function main(): Promise { + const databaseUrl = process.env.DATABASE_URL?.trim(); + if (!databaseUrl) { + throw new Error( + "DATABASE_URL is required. Use a PostgreSQL connection string (for local dev run: pnpm dev:all).", + ); + } + process.env.DATABASE_URL = databaseUrl; + + const env = getEnvFromProcess(); + const port = parseInt(process.env.PORT || "8787", 10); + const host = process.env.HOST?.trim() || "0.0.0.0"; + + const databaseReadiness = async (): Promise => { + // Refuse to boot if the image expects a migration the database hasn't + // applied. Skippable via SKIP_SCHEMA_VERSION_CHECK=1 for emergency + // forward-flight (e.g. rolling back to an older image whose migrations + // dir is a strict prefix of what's already applied). See + // utils/schema-version-check.ts for the 2026-05-16 incident this guards. + if (process.env.SKIP_SCHEMA_VERSION_CHECK !== "1") { + const migrationsDir = + process.env.LOBU_MIGRATIONS_DIR?.trim() || + path.join(PACKAGE_REPO_ROOT, "db", "migrations"); + await assertSchemaUpToDate(getDb(), { migrationsDir }); + } else { + logger.warn( + "[schema-check] SKIP_SCHEMA_VERSION_CHECK=1 — skipping boot-time assertion", + ); + } + + // Verify LISTEN/NOTIFY actually delivers. This is a *detector*, not a + // gate: the runs-queue has a 200ms SKIP-LOCKED poll fallback that keeps + // the queue correct even when LISTEN is silently dropped (transaction-mode + // pgbouncer, RDS Proxy, etc.). Failing the probe just means wakeup + // latency degrades to the poll interval — not an outage. + if (process.env.SKIP_LISTEN_NOTIFY_PROBE !== "1") { + try { + await probeListenNotify(); + logger.info("[DB] LISTEN/NOTIFY probe ok"); + } catch (err) { + logger.warn( + { err }, + "[DB] LISTEN/NOTIFY probe failed — runs-queue will fall back to 200ms poll. Fix the pooler config to restore real-time wakeups.", + ); + } + } + }; + + const lifecycle = createServerLifecycle({ + mode: "postgres", + env, + host, + port, + databaseReadiness, + // Crash loud if the runtime image is missing any connector external dep, + // instead of letting every feed silently fail with "Missing npm + // dependency: X" hours later. Run after listen() so the synchronous + // require.resolve walk doesn't add to cold-boot/readiness latency. + postListenHooks: [ + () => { + try { + assertExternalDepsResolvable(createRequire(import.meta.url).resolve); + } catch (err) { + logger.error({ err }, "Connector external dependency check failed"); + process.exit(1); + } + }, + ], + }); + + await lifecycle.start(); } -// Start the server -main().catch((error) => { - const serialized = serializeBootError(error); - // Log under both `err` (pino convention) and `error` (legacy key) so the - // line is useful whether or not the pino error serializer is wired. - logger.error({ err: serialized, error: serialized }, 'Failed to start server'); - // Also print a plain-text line to stderr as a last-resort fallback for - // anyone whose log shipper drops structured fields — `docker logs` users - // hit this in #766. - process.stderr.write( - `Failed to start server: ${serialized.type ?? 'Error'}: ${serialized.message ?? ''}\n` - ); - if (typeof serialized.stack === 'string') { - process.stderr.write(`${serialized.stack}\n`); - } - process.exit(1); -}); +main().catch(reportBootFailure); diff --git a/packages/server/src/start-local.ts b/packages/server/src/start-local.ts index 28e86d33c..66fa21c34 100644 --- a/packages/server/src/start-local.ts +++ b/packages/server/src/start-local.ts @@ -1,604 +1,447 @@ /** - * Local Server Entry Point (PGlite) + * Local Server Entry Point (PGlite mode). * - * Runs the full Lobu stack in a single command: - * - PGlite (WASM Postgres with pgvector + pg_trgm) — in-process - * - Hono HTTP server — in-process - * - Embeddings service — child process on port 8790 - * - Maintenance scheduler — in-process + * Mode-specific bootstrap only: + * - apply user-config / forced env-var writes BEFORE anything reads env + * - start PGlite + socket server + run migrations + * - fork embeddings child + * - hand off to `createServerLifecycle()` for the shared spine * - * Data stored at ~/.lobu/data/ (configurable via LOBU_DATA_DIR). + * The shared spine (Hono wrapper, middleware, route mounts, httpServer + * timeouts, Vite, scheduler boot, signal handlers, shutdown ordering) lives + * in `./server-lifecycle.ts`. DO NOT add `new Hono`, `app.use`, `app.route`, + * `http.createServer`, or `process.on('SIGTERM' | 'SIGINT', …)` here. */ // Refuse to boot under an unsupported Node major (isolated-vm gate). Module // asserts on load, so this must be the first import; see assert-node-version.ts. -import './utils/assert-node-version'; +import "./utils/assert-node-version"; // Sentry must init before any other imports for auto-instrumentation // (postgres.js, http, etc.). No-op when SENTRY_DSN is unset, which is the // common case for `lobu run` installs — the import is cheap. -import './instrument'; +import "./instrument"; -import { fork } from 'node:child_process'; -import { randomBytes } from 'node:crypto'; -import { existsSync, mkdirSync } from 'node:fs'; -import http from 'node:http'; -import { createRequire } from 'node:module'; -import { homedir } from 'node:os'; -import { dirname, join } from 'node:path'; -import { fileURLToPath } from 'node:url'; +import { fork } from "node:child_process"; +import { randomBytes } from "node:crypto"; +import { existsSync, mkdirSync } from "node:fs"; +import http from "node:http"; +import { createRequire } from "node:module"; +import { homedir } from "node:os"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; -import dotenv from 'dotenv'; +import dotenv from "dotenv"; dotenv.config(); -import { applyUserServerConfigToEnv } from './utils/user-config'; +import { applyUserServerConfigToEnv } from "./utils/user-config"; // After dotenv (project .env) so .env wins; before the module-level DATA_DIR // / PORT / HOST reads below so user-config overrides from // ~/.config/lobu/config.json land in time. // // DATABASE_URL is also filled in, but this bundle always boots PGlite and -// overwrites it (line ~141). External-Postgres routing happens upstream in +// overwrites it below. External-Postgres routing happens upstream in // `lobu run` (packages/cli/src/commands/dev.ts), which switches bundles when // the user config or env pins DATABASE_URL. So in practice only LOBU_DATA_DIR // / PORT / HOST flow through this call. applyUserServerConfigToEnv(); -import { ensureDefaultAgent } from './auth/default-provisioning'; -import { ensureInstallOperator } from './auth/install-operator'; - -import { PGlite } from '@electric-sql/pglite'; -import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm'; -import { vector } from '@electric-sql/pglite/vector'; -import { PGLiteSocketServer } from '@electric-sql/pglite-socket'; -import { getRequestListener } from '@hono/node-server'; -import { assertExternalDepsResolvable } from '@lobu/connector-worker/compile'; -import * as Sentry from '@sentry/node'; -import { Hono } from 'hono'; -import { closeDbSingleton } from './db/client'; -import { listMigrationFiles, loadMigrationUpSection } from './db/migration-loader'; -import type { Env } from './index'; -import { isSentryReported, markSentryReported } from './sentry'; -import { getEnvFromProcess } from './utils/env'; -import logger from './utils/logger'; - -const DATA_DIR = process.env.LOBU_DATA_DIR || join(homedir(), '.lobu', 'data'); -const PORT = parseInt(process.env.PORT || '8787', 10); +import { PGlite } from "@electric-sql/pglite"; +import { pg_trgm } from "@electric-sql/pglite/contrib/pg_trgm"; +import { vector } from "@electric-sql/pglite/vector"; +import { PGLiteSocketServer } from "@electric-sql/pglite-socket"; +import { ensureDefaultAgent } from "./auth/default-provisioning"; +import { ensureInstallOperator } from "./auth/install-operator"; +import { + listMigrationFiles, + loadMigrationUpSection, +} from "./db/migration-loader"; +import { getEnvFromProcess } from "./utils/env"; +import logger from "./utils/logger"; + +const DATA_DIR = process.env.LOBU_DATA_DIR || join(homedir(), ".lobu", "data"); +const PORT = parseInt(process.env.PORT || "8787", 10); // Loopback-only by default: the embedded local-runner ships a // loopback-trust endpoint (`POST /api/local-init`) that mints worker-scoped // PATs for the bootstrap user with no auth challenge. Binding to 0.0.0.0 // would expose that to anyone on the LAN. Operators who explicitly want // LAN/WAN reachability must set `HOST=0.0.0.0` themselves. -const HOST = process.env.HOST?.trim() || '127.0.0.1'; -const EMBEDDINGS_PORT = parseInt(process.env.EMBEDDINGS_PORT || '0', 10); -const APP_ROOT = join(fileURLToPath(new URL('.', import.meta.url)), '..'); -const PACKAGE_REPO_ROOT = join(APP_ROOT, '..', '..'); +const HOST = process.env.HOST?.trim() || "127.0.0.1"; +const EMBEDDINGS_PORT = parseInt(process.env.EMBEDDINGS_PORT || "0", 10); +const APP_ROOT = join(fileURLToPath(new URL(".", import.meta.url)), ".."); +const PACKAGE_REPO_ROOT = join(APP_ROOT, "..", ".."); const require = createRequire(import.meta.url); -// Mirror server.ts: downstream `buildGatewayConfig()` in the embedded -// gateway derives worker paths from LOBU_DEV_PROJECT_PATH. Without this -// fallback, users running `lobu run` from a project subdir get a wrong -// cwd-relative resolve (see CLAUDE.md: "lobu run from a project subdir"). -if (!process.env.LOBU_DEV_PROJECT_PATH) { - process.env.LOBU_DEV_PROJECT_PATH = PACKAGE_REPO_ROOT; -} - -function resolveExistingPath(...candidates: Array): string | null { - for (const candidate of candidates) { - if (candidate && existsSync(candidate)) { - return candidate; - } - } - return null; +function resolveExistingPath( + ...candidates: Array +): string | null { + for (const candidate of candidates) { + if (candidate && existsSync(candidate)) { + return candidate; + } + } + return null; } function readPositiveIntEnv(name: string, fallback: number): number { - const raw = process.env[name]?.trim(); - if (!raw) return fallback; - const parsed = Number.parseInt(raw, 10); - return Number.isFinite(parsed) && parsed >= 0 ? parsed : fallback; + const raw = process.env[name]?.trim(); + if (!raw) return fallback; + const parsed = Number.parseInt(raw, 10); + return Number.isFinite(parsed) && parsed >= 0 ? parsed : fallback; } function isTruthyEnv(name: string): boolean { - return /^(1|true|yes|on)$/i.test(process.env[name]?.trim() ?? ''); + return /^(1|true|yes|on)$/i.test(process.env[name]?.trim() ?? ""); } -async function main() { - mkdirSync(DATA_DIR, { recursive: true }); - - // Set all env vars FIRST — before any imports that might read them - if (!process.env.BETTER_AUTH_SECRET) { - process.env.BETTER_AUTH_SECRET = randomBytes(32).toString('base64'); - logger.info('Generated ephemeral BETTER_AUTH_SECRET — set in .env to persist sessions'); - } - if (!process.env.JWT_SECRET) { - process.env.JWT_SECRET = randomBytes(32).toString('base64'); - } - if (!process.env.PUBLIC_WEB_URL) { - process.env.PUBLIC_WEB_URL = `http://localhost:${PORT}`; - } - if (!process.env.NODE_ENV) { - process.env.NODE_ENV = 'development'; - } - process.env.PGSSLMODE = 'disable'; - process.env.LOBU_DISABLE_PREPARE = '1'; - // Single-user mode default: the embedded runner spawns its own PGlite, - // seeds a single bootstrap user, and is expected to be used by exactly - // one operator on one machine. Block additional sign-ups so the - // operator can't accidentally fork into a second account (one for the - // Mac app + CLI, one for the web UI) by visiting /sign-up. Operators - // who actually want multi-user mode set LOBU_SINGLE_USER=0 explicitly. - if (process.env.LOBU_SINGLE_USER === undefined) { - process.env.LOBU_SINGLE_USER = '1'; - } - - // ─── PGlite ────────────────────────────────────────────────── - - logger.info({ dataDir: DATA_DIR }, 'Starting PGlite'); - const db = await PGlite.create({ - dataDir: DATA_DIR, - extensions: { vector, pg_trgm }, - }); - - // ─── PGlite Socket Server ──────────────────────────────────── - // Start socket FIRST, then run everything (including migrations) - // through it. No direct PGlite access after this point. - - const pgSocketPort = parseInt(process.env.PG_SOCKET_PORT || '0', 10); - const socketServer = new PGLiteSocketServer({ - db, - port: pgSocketPort, - maxConnections: readPositiveIntEnv('LOBU_PGLITE_SOCKET_MAX_CONNECTIONS', 64), - idleTimeout: readPositiveIntEnv('LOBU_PGLITE_SOCKET_IDLE_TIMEOUT_MS', 0), - debug: isTruthyEnv('LOBU_PGLITE_SOCKET_DEBUG'), - }); - socketServer.addEventListener('error', (event: Event) => { - logger.error({ error: (event as CustomEvent).detail }, 'PGlite socket server error'); - }); - socketServer.addEventListener('close', () => { - logger.warn('PGlite socket server closed'); - }); - // Wait for listening event to get the actual port (especially when port=0) - const actualPgPort = await new Promise((resolve) => { - socketServer.addEventListener('listening', (e: Event) => { - resolve((e as CustomEvent).detail?.port ?? pgSocketPort); - }); - socketServer.start(); - }); - // sslmode=disable is required — PGlite socket doesn't support SSL negotiation - const dbUrl = `postgresql://postgres@127.0.0.1:${actualPgPort}/postgres?sslmode=disable`; - process.env.DATABASE_URL = dbUrl; - logger.info({ port: actualPgPort }, 'PGlite socket server ready'); - - // Run migrations through the socket (not direct PGlite) - await runMigrations(dbUrl); - - // ─── Embeddings Service (child process) ────────────────────── - - const embeddingsChild = await startEmbeddings(); - - // ─── App Server ────────────────────────────────────────────── - - const { app: mainApp } = await import('./index'); - const { initWorkspaceProvider } = await import('./workspace'); - const { initLobuGateway, getLobuCoreServices, stopLobuGateway } = await import('./lobu/gateway'); - const { bootTaskScheduler } = await import('./scheduled/jobs'); - - await initWorkspaceProvider(); - const lobuApp = await initLobuGateway(); - - const env = getEnvFromProcess(); - const taskScheduler = await bootTaskScheduler(getLobuCoreServices(), env); - const stopScheduler = () => taskScheduler.stop(); - - // 30s connector-run heartbeat-lost reaper (see check-stalled-executions.ts). - // Same module used by the production server entrypoint; advisory lock makes - // it safe to also have the 5min TaskScheduler cron firing the same sweep. - const { startStaleRunReaper } = await import('./scheduled/check-stalled-executions'); - const stopReaper = startStaleRunReaper(); - - // Embedded connector-worker daemon — same process executes - // `runs(run_type='sync')` by polling our own `/api/workers/poll`. - // Started AFTER `listen()` so the daemon's boot-time health check - // can resolve. Opt-out: `LOBU_DISABLE_EMBEDDED_WORKER=1`. - const { startEmbeddedConnectorWorker } = await import( - './scheduled/embedded-connector-worker' - ); - let embeddedWorker: ReturnType = null; - - const wrapper = new Hono<{ Bindings: Env }>(); - wrapper.use('*', async (c, next) => { - // Stash the peer TCP remote-address so handlers that need to enforce - // a loopback-peer trust boundary (e.g. `/api/local-init`) can read it - // from c.var. `Object.assign(c.env, env)` below preserves - // `c.env.incoming` (the IncomingMessage Hono's Node adapter set), so - // we read from there — same path `getConnInfo` uses. - const incoming = (c.env as { incoming?: { socket?: { remoteAddress?: string } } })?.incoming; - const peerRemoteAddress = incoming?.socket?.remoteAddress ?? null; - if (peerRemoteAddress) c.set('peerRemoteAddress', peerRemoteAddress); - Object.assign(c.env, env); - return next(); - }); - - // Server-error capture. Mirrors server.ts: routes either throw (caught by - // onError below, full stack preserved) or try/catch and return c.json(..., - // 500) (only the response status is visible, so capture from the post- - // response middleware). Either branch marks the request reported so the - // pino → Sentry forwarder doesn't double-count. No-op when SENTRY_DSN is - // unset because Sentry.init was a no-op (`./instrument`). - wrapper.use('*', async (c, next) => { - await next(); - if (c.res.status >= 500 && !isSentryReported(c)) { - let body: unknown = null; - try { - body = await c.res.clone().json(); - } catch { - // response wasn't JSON; ignore - } - const message = - (body && typeof body === 'object' && 'error' in body && typeof (body as { error?: unknown }).error === 'string' - ? (body as { error: string }).error - : null) ?? `HTTP ${c.res.status} from ${c.req.method} ${c.req.path}`; - Sentry.captureMessage(message, { - level: 'error', - tags: { - source: 'http_response', - http_method: c.req.method, - http_status: String(c.res.status), - }, - extra: { - path: c.req.path, - url: c.req.url, - response_body: body, - }, - }); - markSentryReported(c); - } - }); - - wrapper.onError((err, c) => { - if (!isSentryReported(c)) { - Sentry.captureException(err, { - tags: { - source: 'app_onError', - http_method: c.req.method, - }, - extra: { - path: c.req.path, - url: c.req.url, - }, - }); - markSentryReported(c); - } - logger.error({ err, path: c.req.path, sentryReported: true }, 'Unhandled error in HTTP handler'); - return c.json({ error: 'Internal server error' }, 500); - }); - - // Mount the embedded Lobu gateway under /lobu (mirrors server.ts:199-202). - // Without this, the public Agent API (`/lobu/api/v1/agents/*`) and bundled - // docs are 404 in PGlite mode — only the org-scoped REST app at `/` works. - // This was the missing piece behind PR #637, which only fixed the Postgres - // entrypoint. The Sentry middleware above wraps both this and the `/` mount. - if (lobuApp) { - wrapper.route('/lobu', lobuApp); - } - wrapper.route('/', mainApp); - - const honoListener = getRequestListener(wrapper.fetch); - const httpServer = http.createServer(); - // SSE streams (MCP) must survive idle periods — Node defaults to 5s. - httpServer.keepAliveTimeout = 75_000; - httpServer.headersTimeout = 76_000; - - // In development, serve the SPA with Vite HMR (middleware mode); otherwise - // Hono handles every request directly. Dynamically imported so this entry - // keeps its lazy-load discipline (assert-node-version / instrument first). - const { mountViteDev } = await import('./dev-vite'); - const vite = await mountViteDev(httpServer, honoListener); - if (!vite) { - httpServer.on('request', honoListener); - } - - // ─── Graceful Shutdown ─────────────────────────────────────── - - const shutdown = async (signal: string) => { - logger.info({ signal }, 'Shutting down'); - if (embeddedWorker) { - embeddedWorker.stop(); - // Best-effort drain; don't block shutdown forever on a stuck connector. - await embeddedWorker.wait(15_000); - } - stopReaper(); - stopScheduler(); - await vite?.close(); - // Drain MCP sessions / DB listeners / secret-proxy before tearing down - // PGlite-owned resources below — gateway holds postgres.js connections - // that talk to the socket server. - await stopLobuGateway(); - // Close the postgres.js singleton pool before tearing down the socket - // server underneath it; otherwise pooled connections hang on EPIPE. - await closeDbSingleton(); - httpServer.close(); - embeddingsChild?.kill(); - await socketServer.stop(); - await db.close(); - process.exit(0); - }; - process.on('SIGTERM', () => shutdown('SIGTERM')); - process.on('SIGINT', () => shutdown('SIGINT')); - - // ─── Install operator ──────────────────────────────────────── - // Runs BEFORE listen so headless installs (CI, containers, /tmp scaffolds - // without a browser) can sign in via better-auth without a chicken-and-egg - // /sign-up step. Provisions a synthetic `install_operator` user whose - // password is the install's ENCRYPTION_KEY. Idempotent — re-running on a - // boot where the operator already exists is a no-op. See - // `docs/install-operator-bootstrap.md`. - // - // Carve-outs in auth/index.tsx + auth/config.ts exclude this row from - // every human-discovery surface (signup count, member list, password - // reset, magic link, OAuth account-linking) so the operator never - // collides with real human users. - try { - await ensureInstallOperator(); - } catch (err) { - logger.error({ err }, 'Install-operator provisioning failed'); - // Don't crash the server — the operator only matters for headless - // installs; a browser-based signup still works. But log it loudly. - } - - // ─── Default agent (Mac-app onboarding) ────────────────────── - // Default-agent provisioning is deferred to first-user creation. The - // `databaseHooks.user.create.after` hook in auth/index.tsx provisions the - // personal org; ensureDefaultAgent runs the next time `lobu run` boots - // after the user exists. - try { - const personalOrgRows = (await import('postgres')).default(dbUrl, { max: 1 }); - try { - const rows = - (await personalOrgRows`SELECT id FROM "organization" WHERE (metadata::jsonb)->>'personal_org_for_user_id' IS NOT NULL ORDER BY "createdAt" ASC LIMIT 1`) as unknown as Array<{ id: string }>; - const orgId = rows[0]?.id; - if (orgId) await ensureDefaultAgent(orgId); - } finally { - await personalOrgRows.end({ timeout: 1 }); - } - } catch (err) { - logger.warn({ err }, 'Default-agent provisioning failed'); - } - - // ─── Listen ────────────────────────────────────────────────── - - httpServer.listen(PORT, HOST, () => { - logger.info(`Lobu running at http://${HOST}:${PORT}`); - logger.info(`Data: ${DATA_DIR}`); - // Crash loud if the runtime image is missing any connector external dep, - // instead of letting every feed silently fail with "Missing npm - // dependency: X" hours later. Run after listen() so the synchronous - // require.resolve walk doesn't add to cold-boot latency. - try { - assertExternalDepsResolvable(require.resolve); - } catch (err) { - logger.error({ err }, 'Connector external dependency check failed'); - process.exit(1); - } - // Embedded daemon must wait for the listener — its boot-time - // health check hits `/api/health` on this same process. - embeddedWorker = startEmbeddedConnectorWorker(env, `http://${HOST}:${PORT}`); - }); +async function main(): Promise { + mkdirSync(DATA_DIR, { recursive: true }); + + // Set all env vars FIRST — before any imports that might read them. The + // server-lifecycle module is imported dynamically below for exactly this + // reason: its transitive imports (`./index`, gateway, scheduler) read env + // at module-evaluation time, and pglite socket setup must finish first. + if (!process.env.BETTER_AUTH_SECRET) { + process.env.BETTER_AUTH_SECRET = randomBytes(32).toString("base64"); + logger.info( + "Generated ephemeral BETTER_AUTH_SECRET — set in .env to persist sessions", + ); + } + if (!process.env.JWT_SECRET) { + process.env.JWT_SECRET = randomBytes(32).toString("base64"); + } + if (!process.env.PUBLIC_WEB_URL) { + process.env.PUBLIC_WEB_URL = `http://localhost:${PORT}`; + } + if (!process.env.NODE_ENV) { + process.env.NODE_ENV = "development"; + } + process.env.PGSSLMODE = "disable"; + process.env.LOBU_DISABLE_PREPARE = "1"; + // Single-user mode default: the embedded runner spawns its own PGlite, + // seeds a single bootstrap user, and is expected to be used by exactly + // one operator on one machine. Block additional sign-ups so the + // operator can't accidentally fork into a second account (one for the + // Mac app + CLI, one for the web UI) by visiting /sign-up. Operators + // who actually want multi-user mode set LOBU_SINGLE_USER=0 explicitly. + if (process.env.LOBU_SINGLE_USER === undefined) { + process.env.LOBU_SINGLE_USER = "1"; + } + + if (!process.env.LOBU_DEV_PROJECT_PATH) { + // Mirror server.ts: downstream `buildGatewayConfig()` derives worker + // paths from LOBU_DEV_PROJECT_PATH. Without this fallback, users running + // `lobu run` from a project subdir get a wrong cwd-relative resolve. + process.env.LOBU_DEV_PROJECT_PATH = PACKAGE_REPO_ROOT; + } + + // ─── PGlite ────────────────────────────────────────────────── + + logger.info({ dataDir: DATA_DIR }, "Starting PGlite"); + const db = await PGlite.create({ + dataDir: DATA_DIR, + extensions: { vector, pg_trgm }, + }); + + // ─── PGlite Socket Server ──────────────────────────────────── + // Start socket FIRST, then run everything (including migrations) + // through it. No direct PGlite access after this point. + + const pgSocketPort = parseInt(process.env.PG_SOCKET_PORT || "0", 10); + const socketServer = new PGLiteSocketServer({ + db, + port: pgSocketPort, + maxConnections: readPositiveIntEnv( + "LOBU_PGLITE_SOCKET_MAX_CONNECTIONS", + 64, + ), + idleTimeout: readPositiveIntEnv("LOBU_PGLITE_SOCKET_IDLE_TIMEOUT_MS", 0), + debug: isTruthyEnv("LOBU_PGLITE_SOCKET_DEBUG"), + }); + socketServer.addEventListener("error", (event: Event) => { + logger.error( + { error: (event as CustomEvent).detail }, + "PGlite socket server error", + ); + }); + socketServer.addEventListener("close", () => { + logger.warn("PGlite socket server closed"); + }); + // Wait for listening event to get the actual port (especially when port=0) + const actualPgPort = await new Promise((resolve) => { + socketServer.addEventListener("listening", (e: Event) => { + resolve((e as CustomEvent).detail?.port ?? pgSocketPort); + }); + socketServer.start(); + }); + // sslmode=disable is required — PGlite socket doesn't support SSL negotiation + const dbUrl = `postgresql://postgres@127.0.0.1:${actualPgPort}/postgres?sslmode=disable`; + process.env.DATABASE_URL = dbUrl; + logger.info({ port: actualPgPort }, "PGlite socket server ready"); + + // ─── Embeddings Service (child process) ────────────────────── + + const embeddingsChild = await startEmbeddings(); + + // ─── Lifecycle ─────────────────────────────────────────────── + // Dynamic import: env mutation above must land before the lifecycle's + // transitive imports (gateway, scheduler, ./index) evaluate at module load. + // This collapses the previous fan-out of seven `await import(...)` sites + // (one per helper) into a single boundary. + const { createServerLifecycle, reportBootFailure } = await import( + "./server-lifecycle" + ); + + const env = getEnvFromProcess(); + + // Personal-org id for default-agent provisioning. Resolved once during the + // pre-listen phase rather than per-call, so the dynamic postgres import + // happens with a hot DATABASE_URL. + let personalOrgId: string | null = null; + + const lifecycle = createServerLifecycle({ + mode: "pglite", + env, + host: HOST, + port: PORT, + databaseReadiness: () => runMigrations(dbUrl), + preListenHooks: [ + // Runs BEFORE listen so headless installs (CI, containers, /tmp + // scaffolds without a browser) can sign in via better-auth without + // a chicken-and-egg /sign-up step. Provisions a synthetic + // `install_operator` user whose password is the install's + // ENCRYPTION_KEY. Idempotent — re-running on a boot where the + // operator already exists is a no-op. See + // `docs/install-operator-bootstrap.md`. + async () => { + try { + await ensureInstallOperator(); + } catch (err) { + logger.error({ err }, "Install-operator provisioning failed"); + // Don't crash the server — the operator only matters for headless + // installs; a browser-based signup still works. + } + }, + // Default-agent provisioning. Deferred to first-user creation in the + // `databaseHooks.user.create.after` hook; this resolves the personal + // org id on each boot so a returning user picks up the default agent. + async () => { + try { + const personalOrgRows = (await import("postgres")).default(dbUrl, { + max: 1, + }); + try { + const rows = (await personalOrgRows` + SELECT id FROM "organization" + WHERE (metadata::jsonb)->>'personal_org_for_user_id' IS NOT NULL + ORDER BY "createdAt" ASC LIMIT 1 + `) as unknown as Array<{ id: string }>; + personalOrgId = rows[0]?.id ?? null; + if (personalOrgId) await ensureDefaultAgent(personalOrgId); + } finally { + await personalOrgRows.end({ timeout: 1 }); + } + } catch (err) { + logger.warn({ err }, "Default-agent provisioning failed"); + } + }, + ], + // PGlite-specific teardown — runs after stopLobuGateway + closeDbSingleton + // so gateway's postgres.js connections release before the socket goes + // away underneath them. + extraTeardown: [ + () => { + embeddingsChild?.kill(); + }, + () => socketServer.stop(), + () => db.close(), + ], + }); + + try { + await lifecycle.start(); + logger.info(`Data: ${DATA_DIR}`); + } catch (err) { + // Bridge to reportBootFailure so PGlite-mode boot crashes get the same + // structured + plain-text fallback logging as Postgres mode. + reportBootFailure(err); + } } // ─── Migrations ────────────────────────────────────────────────── -async function runMigrations(dbUrl: string) { - // Embedded boot runs the same migrations dbmate uses for prod, applied - // unconditionally. After the schema squash (2026-05-19), the migrations - // dir is a single baseline + any forward deltas; both are idempotent - // enough to replay on a pre-initialized DB: - // - The baseline starts with `CREATE TABLE` against a fresh schema - // and is gated by a `schema_migrations` row insertion. On a DB that - // has the baseline applied, dbmate-style version tracking skips the - // file; we do the same below. - // - Forward deltas use `IF NOT EXISTS` discipline so re-application - // against an already-migrated DB is a no-op. - const pg = await import('postgres'); - const sql = pg.default(dbUrl, { max: 1 }); - - try { - const migrationsDir = resolveExistingPath( - // Published @lobu/cli copies migrations next to start-local.bundle.mjs - // under dist/db/migrations. - join(fileURLToPath(new URL('.', import.meta.url)), 'db', 'migrations'), - join(APP_ROOT, 'db', 'migrations'), - // Monorepo `bun run --filter @lobu/server dev:local`: APP_ROOT is - // packages/server/, so the migrations live two levels up at repo root. - join(APP_ROOT, '..', '..', 'db', 'migrations'), - join(process.cwd(), 'db', 'migrations'), - join(process.cwd(), '..', '..', 'db', 'migrations') - ); - if (!migrationsDir) { - throw new Error('Migrations directory not found.'); - } - - // Make sure the `schema_migrations` ledger exists before we read it. - await sql.unsafe(` +async function runMigrations(dbUrl: string): Promise { + // Embedded boot runs the same migrations dbmate uses for prod, applied + // unconditionally. After the schema squash (2026-05-19), the migrations + // dir is a single baseline + any forward deltas; both are idempotent + // enough to replay on a pre-initialized DB: + // - The baseline starts with `CREATE TABLE` against a fresh schema + // and is gated by a `schema_migrations` row insertion. On a DB that + // has the baseline applied, dbmate-style version tracking skips the + // file; we do the same below. + // - Forward deltas use `IF NOT EXISTS` discipline so re-application + // against an already-migrated DB is a no-op. + const pg = await import("postgres"); + const sql = pg.default(dbUrl, { max: 1 }); + + try { + const migrationsDir = resolveExistingPath( + // Published @lobu/cli copies migrations next to start-local.bundle.mjs + // under dist/db/migrations. + join(fileURLToPath(new URL(".", import.meta.url)), "db", "migrations"), + join(APP_ROOT, "db", "migrations"), + // Monorepo `bun run --filter @lobu/server dev:local`: APP_ROOT is + // packages/server/, so the migrations live two levels up at repo root. + join(APP_ROOT, "..", "..", "db", "migrations"), + join(process.cwd(), "db", "migrations"), + join(process.cwd(), "..", "..", "db", "migrations"), + ); + if (!migrationsDir) { + throw new Error("Migrations directory not found."); + } + + // Make sure the `schema_migrations` ledger exists before we read it. + await sql.unsafe(` CREATE TABLE IF NOT EXISTS public.schema_migrations ( version character varying(128) NOT NULL PRIMARY KEY ) `); - const appliedRows = (await sql.unsafe( - `SELECT version FROM public.schema_migrations` - )) as Array<{ version: string }>; - const applied = new Set(appliedRows.map((r) => r.version)); - - // Versions whose contents are known to be fully covered by an existing - // schema (i.e. the squashed baseline). When one of these errors with a - // duplicate-object SQLSTATE the DB is already at the target state and we - // can safely record the version as applied. This is intentionally narrow: - // any future delta migration must use `IF NOT EXISTS` discipline rather - // than relying on this fallback, or its mid-file failures could mask - // schema drift. - const IDEMPOTENT_BASELINE_VERSIONS = new Set(['00000000000000']); - - logger.info('Running migrations...'); - for (const file of listMigrationFiles(migrationsDir)) { - // Filename convention is `_.sql`; the version is the - // leading underscore-separated prefix. - const version = file.split('_')[0] ?? ''; - if (applied.has(version)) { - continue; - } - const migrationSql = loadMigrationUpSection(migrationsDir, file); - if (!migrationSql) continue; - - await sql.unsafe('SET search_path TO public'); - try { - await sql.unsafe(migrationSql); - } catch (err) { - // The squashed baseline uses plain `CREATE FUNCTION` / `CREATE TABLE` - // for cleanliness, so replaying it against a DB that already has the - // schema raises `42723` (duplicate function) / `42P07` (duplicate - // table) / `42710` (duplicate object). When the failing file is the - // baseline, that's exactly the no-op case `lobu run` should treat as - // success. For any other migration the duplicate error is surfaced - // unchanged so partial failures cannot silently advance the ledger - // (see `IDEMPOTENT_BASELINE_VERSIONS` above). - const code = (err as { code?: string } | null)?.code; - const isDuplicateObject = - code === '42723' || code === '42P07' || code === '42710'; - if (!isDuplicateObject || !IDEMPOTENT_BASELINE_VERSIONS.has(version)) { - throw err; - } - logger.info( - { migration: file, version, pgErrorCode: code }, - 'Migration already applied (idempotent skip)' - ); - } - await sql` + const appliedRows = (await sql.unsafe( + `SELECT version FROM public.schema_migrations`, + )) as Array<{ version: string }>; + const applied = new Set(appliedRows.map((r) => r.version)); + + // Versions whose contents are known to be fully covered by an existing + // schema (i.e. the squashed baseline). When one of these errors with a + // duplicate-object SQLSTATE the DB is already at the target state and we + // can safely record the version as applied. This is intentionally narrow: + // any future delta migration must use `IF NOT EXISTS` discipline rather + // than relying on this fallback, or its mid-file failures could mask + // schema drift. + const IDEMPOTENT_BASELINE_VERSIONS = new Set(["00000000000000"]); + + logger.info("Running migrations..."); + for (const file of listMigrationFiles(migrationsDir)) { + // Filename convention is `_.sql`; the version is the + // leading underscore-separated prefix. + const version = file.split("_")[0] ?? ""; + if (applied.has(version)) { + continue; + } + const migrationSql = loadMigrationUpSection(migrationsDir, file); + if (!migrationSql) continue; + + await sql.unsafe("SET search_path TO public"); + try { + await sql.unsafe(migrationSql); + } catch (err) { + // The squashed baseline uses plain `CREATE FUNCTION` / `CREATE TABLE` + // for cleanliness, so replaying it against a DB that already has the + // schema raises `42723` (duplicate function) / `42P07` (duplicate + // table) / `42710` (duplicate object). When the failing file is the + // baseline, that's exactly the no-op case `lobu run` should treat as + // success. For any other migration the duplicate error is surfaced + // unchanged so partial failures cannot silently advance the ledger + // (see `IDEMPOTENT_BASELINE_VERSIONS` above). + const code = (err as { code?: string } | null)?.code; + const isDuplicateObject = + code === "42723" || code === "42P07" || code === "42710"; + if (!isDuplicateObject || !IDEMPOTENT_BASELINE_VERSIONS.has(version)) { + throw err; + } + logger.info( + { migration: file, version, pgErrorCode: code }, + "Migration already applied (idempotent skip)", + ); + } + await sql` INSERT INTO public.schema_migrations (version) VALUES (${version}) ON CONFLICT DO NOTHING `; - } + } - logger.info('Migrations complete'); - } finally { - await sql.end(); - } + logger.info("Migrations complete"); + } finally { + await sql.end(); + } } - // ─── Embeddings (child process) ────────────────────────────────── function findFreePort(): Promise { - return new Promise((resolve, reject) => { - const srv = http.createServer(); - srv.listen(0, '127.0.0.1', () => { - const addr = srv.address(); - const port = typeof addr === 'object' && addr ? addr.port : 0; - srv.close(() => resolve(port)); - }); - srv.on('error', reject); - }); + return new Promise((resolve, reject) => { + const srv = http.createServer(); + srv.listen(0, "127.0.0.1", () => { + const addr = srv.address(); + const port = typeof addr === "object" && addr ? addr.port : 0; + srv.close(() => resolve(port)); + }); + srv.on("error", reject); + }); } async function startEmbeddings(): Promise | null> { - const publishedServerPath = (() => { - try { - return fileURLToPath(import.meta.resolve('@lobu/embeddings/server')); - } catch { - return null; - } - })(); - const serverPath = resolveExistingPath( - join(APP_ROOT, 'packages', 'embeddings', 'src', 'server.ts'), - join(process.cwd(), 'packages', 'embeddings', 'src', 'server.ts'), - ...(publishedServerPath ? [publishedServerPath] : []) - ); - if (!serverPath) { - logger.warn('Embeddings service not found — embedding generation will not be available'); - return null; - } - - const port = EMBEDDINGS_PORT || (await findFreePort()); - const isTypescriptServer = serverPath.endsWith('.ts'); - let execArgv: string[] = []; - if (isTypescriptServer) { - const tsxPackageJson = require.resolve('tsx/package.json'); - const tsxLoaderPath = join(dirname(tsxPackageJson), 'dist', 'loader.mjs'); - execArgv = ['--import', tsxLoaderPath]; - } - - const child = fork(serverPath, [], { - execArgv, - env: { ...process.env, PORT: String(port) }, - stdio: ['ignore', 'pipe', 'pipe', 'ipc'], - }); - - process.env.EMBEDDINGS_SERVICE_URL = `http://127.0.0.1:${port}`; - - child.stdout?.on('data', (data: Buffer) => { - const msg = data.toString().trim(); - if (msg) logger.info({ service: 'embeddings' }, msg); - }); - - child.stderr?.on('data', (data: Buffer) => { - const msg = data.toString().trim(); - if (msg) logger.warn({ service: 'embeddings' }, msg); - }); - - child.on('exit', (code) => { - if (code !== 0 && code !== null) { - logger.warn({ code }, 'Embeddings service exited'); - } - }); - - return child; -} - -/** - * Defensive error → plain-object serializer for the top-level boot catch. - * - * Mirrors server.ts's helper (see #766): `JSON.stringify(new Error('boom'))` - * returns `{}` because Error's own properties are non-enumerable. Walks the - * error manually so a boot failure (ZodError, AggregateError, wrapped cause - * chain, etc.) always carries message + stack regardless of pino serializer - * config or log-shipper field stripping. - */ -function serializeBootError(err: unknown): Record { - if (err === null || err === undefined) return { value: String(err) }; - if (typeof err !== 'object') return { value: String(err), type: typeof err }; - const e = err as Error & { - code?: unknown; - cause?: unknown; - issues?: unknown; - errors?: unknown; - }; - const out: Record = { - type: e?.constructor?.name ?? 'Error', - message: typeof e.message === 'string' ? e.message : String(e), - }; - if (typeof e.stack === 'string') out.stack = e.stack; - if (e.code !== undefined) out.code = e.code; - if (Array.isArray(e.issues)) out.issues = e.issues; - if (Array.isArray(e.errors)) { - out.errors = e.errors.map((child) => serializeBootError(child)); - } - if (e.cause !== undefined && e.cause !== err) { - out.cause = serializeBootError(e.cause); - } - return out; + const publishedServerPath = (() => { + try { + return fileURLToPath(import.meta.resolve("@lobu/embeddings/server")); + } catch { + return null; + } + })(); + const serverPath = resolveExistingPath( + join(APP_ROOT, "packages", "embeddings", "src", "server.ts"), + join(process.cwd(), "packages", "embeddings", "src", "server.ts"), + ...(publishedServerPath ? [publishedServerPath] : []), + ); + if (!serverPath) { + logger.warn( + "Embeddings service not found — embedding generation will not be available", + ); + return null; + } + + const port = EMBEDDINGS_PORT || (await findFreePort()); + const isTypescriptServer = serverPath.endsWith(".ts"); + let execArgv: string[] = []; + if (isTypescriptServer) { + const tsxPackageJson = require.resolve("tsx/package.json"); + const tsxLoaderPath = join(dirname(tsxPackageJson), "dist", "loader.mjs"); + execArgv = ["--import", tsxLoaderPath]; + } + + const child = fork(serverPath, [], { + execArgv, + env: { ...process.env, PORT: String(port) }, + stdio: ["ignore", "pipe", "pipe", "ipc"], + }); + + process.env.EMBEDDINGS_SERVICE_URL = `http://127.0.0.1:${port}`; + + child.stdout?.on("data", (data: Buffer) => { + const msg = data.toString().trim(); + if (msg) logger.info({ service: "embeddings" }, msg); + }); + + child.stderr?.on("data", (data: Buffer) => { + const msg = data.toString().trim(); + if (msg) logger.warn({ service: "embeddings" }, msg); + }); + + child.on("exit", (code) => { + if (code !== 0 && code !== null) { + logger.warn({ code }, "Embeddings service exited"); + } + }); + + return child; } -main().catch((error) => { - const serialized = serializeBootError(error); - logger.error({ err: serialized, error: serialized }, 'Failed to start'); - // Plain-text stderr fallback for log shippers that drop structured fields. - process.stderr.write( - `Failed to start: ${serialized.type ?? 'Error'}: ${serialized.message ?? ''}\n` - ); - if (typeof serialized.stack === 'string') { - process.stderr.write(`${serialized.stack}\n`); - } - process.exit(1); +main().catch(async (error) => { + // Imported lazily so a crash in the env-setup block above (which runs + // before the lifecycle import) still reaches stderr. + const { reportBootFailure } = await import("./server-lifecycle"); + reportBootFailure(error); }); From 02f66b2096d2a9589ecef45f8d6017b8cbc39d70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 02:26:18 +0100 Subject: [PATCH 2/3] fix(server-lifecycle): tighten mountedMainApp param type to Hono<{Bindings:Env}> Per-package tsc (cd packages/server && bunx tsc --noEmit) rejects the contravariant assignment of a typed mainApp to a parameter declared as plain Hono. The root bun run typecheck is permissive enough to miss this; CI runs both. --- packages/server/src/server-lifecycle.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/server-lifecycle.ts b/packages/server/src/server-lifecycle.ts index bda1fa08a..972dde177 100644 --- a/packages/server/src/server-lifecycle.ts +++ b/packages/server/src/server-lifecycle.ts @@ -158,7 +158,7 @@ export function reportBootFailure(err: unknown): never { export function buildWrapperApp( env: Env, lobuApp: Hono | null, - mountedMainApp: Hono = mainApp, + mountedMainApp: Hono<{ Bindings: Env }> = mainApp, ): Hono<{ Bindings: Env }> { const wrapper = new Hono<{ Bindings: Env }>(); From 4b76d89986e46a7c4344e558704df47e0dbe20ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 02:37:22 +0100 Subject: [PATCH 3/3] =?UTF-8?q?fix(server-lifecycle):=20close=20pi-review?= =?UTF-8?q?=20gaps=20=E2=80=94=20PGlite=20deps=20check,=20shutdown=20safet?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. PGlite was missing assertExternalDepsResolvable() in postListenHooks; adding it back closes the exact drift this refactor exists to prevent. Caught by pi review on #951. 2. Wrap every shutdown step in a safe() helper so one rejecting teardown (e.g. stopLobuGateway) can no longer block the rest from running and leave the process stuck with a bound listener. 3. Single-flight guard on the shutdown handler: SIGTERM+SIGINT (or a double-tap on either) used to race gateway-stop / extraTeardown / process.exit. The guard short-circuits the second entry. Strengthens the lifecycle contract test with assertions for the safe() wrapper count and the single-flight guard so future refactors can't silently re-introduce either gap. --- .../src/__tests__/server-lifecycle.test.ts | 35 +++++++++--- packages/server/src/server-lifecycle.ts | 57 ++++++++++++++----- packages/server/src/start-local.ts | 17 ++++++ 3 files changed, 87 insertions(+), 22 deletions(-) diff --git a/packages/server/src/__tests__/server-lifecycle.test.ts b/packages/server/src/__tests__/server-lifecycle.test.ts index c79ec09df..65ea9a4ba 100644 --- a/packages/server/src/__tests__/server-lifecycle.test.ts +++ b/packages/server/src/__tests__/server-lifecycle.test.ts @@ -321,13 +321,16 @@ describe("createServerLifecycle (source-level contract)", () => { }); it("shuts down in the documented order", () => { - const worker = indexOf("embeddedWorker.stop()"); - const vite = indexOf("await vite?.close()"); - const reaper = indexOf("stopReaper()"); - const scheduler = indexOf("taskScheduler.stop()"); - const gateway = indexOf("await stopLobuGateway()"); - const db = indexOf("await closeDbSingleton()"); - const extra = indexOf("for (const teardown of extraTeardown)"); + // Each step is wrapped in `safe("", …)` so a failing teardown + // can't block the rest. Order-check by the step label which is stable + // across refactors of the wrapper. + const worker = indexOf('safe("embeddedWorker.stop"'); + const vite = indexOf('safe("vite.close"'); + const reaper = indexOf('safe("stopReaper"'); + const scheduler = indexOf('safe("taskScheduler.stop"'); + const gateway = indexOf('safe("stopLobuGateway"'); + const db = indexOf('safe("closeDbSingleton"'); + const extra = indexOf("safe(`extraTeardown["); const close = indexOf("httpServer.close();"); expect(worker).toBeLessThan(vite); @@ -339,6 +342,24 @@ describe("createServerLifecycle (source-level contract)", () => { expect(extra).toBeLessThan(close); }); + it("wraps every shutdown step in a safe() helper (one failing step does not skip the rest)", () => { + // The `safe()` wrapper is what guarantees that — for example — a + // rejecting `stopLobuGateway()` doesn't leave the listener bound and + // the process pinned. If a future refactor inlines a raw `await` for + // any step, this assertion catches it. + const safeCalls = LIFECYCLE_SOURCE.match(/safe\((`extraTeardown\[|")/g); + expect(safeCalls?.length ?? 0).toBeGreaterThanOrEqual(7); + }); + + it("single-flights concurrent shutdown signals", () => { + // SIGTERM and SIGINT can both arrive (or one can fire twice during a + // supervisor restart). The guard short-circuits the second entry so + // gateway-stop / extraTeardown / process.exit don't race. + expect(LIFECYCLE_SOURCE).toContain("let shutdownStarted = false"); + expect(LIFECYCLE_SOURCE).toContain("if (shutdownStarted)"); + expect(LIFECYCLE_SOURCE).toContain("shutdownStarted = true"); + }); + it("registers SIGTERM and SIGINT handlers", () => { // Accept either quote style — biome may rewrite ' → " on save. expect(/process\.on\(['"]SIGTERM['"]/.test(LIFECYCLE_SOURCE)).toBe(true); diff --git a/packages/server/src/server-lifecycle.ts b/packages/server/src/server-lifecycle.ts index 972dde177..5cb090d61 100644 --- a/packages/server/src/server-lifecycle.ts +++ b/packages/server/src/server-lifecycle.ts @@ -375,41 +375,68 @@ export function createServerLifecycle( { signal, mode }, "Received shutdown signal, stopping gracefully...", ); + // Each step is wrapped in try/catch so one failing teardown can't + // block the rest — we still want the listener closed and the + // process gone, even if (say) the gateway drain rejects. Catch + + // log + continue. + const safe = async (step: string, fn: () => Promise | void) => { + try { + await fn(); + } catch (err) { + logger.error({ err, step, mode }, "Shutdown step failed; continuing"); + } + }; // Order matters: // a. Stop accepting new work from the embedded connector worker. if (embeddedWorker) { - embeddedWorker.stop(); - await embeddedWorker.wait(15_000); + const worker = embeddedWorker; + await safe("embeddedWorker.stop", async () => { + worker.stop(); + await worker.wait(15_000); + }); } // b. Close Vite (HMR sockets) before tearing down the http server // so dev-mode listeners detach cleanly. - await vite?.close(); + await safe("vite.close", async () => { + await vite?.close(); + }); // c. Stop the reaper poll loop. - stopReaper(); + await safe("stopReaper", () => stopReaper()); // d. Stop the task scheduler dispatch loop. - taskScheduler.stop(); + await safe("taskScheduler.stop", () => taskScheduler.stop()); // e. Drain MCP sessions / DB listeners / secret-proxy. Gateway // holds postgres.js connections that must be released before // mode-specific db teardown runs. - await stopLobuGateway(); + await safe("stopLobuGateway", () => stopLobuGateway()); // f. Close the postgres.js singleton pool. - await closeDbSingleton(); + await safe("closeDbSingleton", () => closeDbSingleton()); // g. Mode-specific teardown (PGlite kills embeddings child, stops // socket server, closes the in-process db). - for (const teardown of extraTeardown) { - await teardown(); + for (let i = 0; i < extraTeardown.length; i++) { + await safe(`extraTeardown[${i}]`, extraTeardown[i]); } // h. Finally, close the listener. Matches the historical behavior of // not awaiting (server.ts:260; start-local.ts:322). httpServer.close(); process.exit(0); }; - process.on("SIGTERM", () => { - void shutdown("SIGTERM"); - }); - process.on("SIGINT", () => { - void shutdown("SIGINT"); - }); + // Single-flight guard: SIGTERM+SIGINT or a double-tap on either must + // not run shutdown concurrently — concurrent gateway-stop calls race + // the secret-proxy close, and concurrent process.exit calls leak. + let shutdownStarted = false; + const onSignal = (signal: string) => { + if (shutdownStarted) { + logger.warn( + { signal, mode }, + "Shutdown already in progress; ignoring signal", + ); + return; + } + shutdownStarted = true; + void shutdown(signal); + }; + process.on("SIGTERM", () => onSignal("SIGTERM")); + process.on("SIGINT", () => onSignal("SIGINT")); // 10. Optional heap-snapshot wiring (gated on ALLOW_HEAP_SNAPSHOT=1). maybeWireHeapSnapshot(); diff --git a/packages/server/src/start-local.ts b/packages/server/src/start-local.ts index 66fa21c34..acd8f7297 100644 --- a/packages/server/src/start-local.ts +++ b/packages/server/src/start-local.ts @@ -52,6 +52,7 @@ import { PGlite } from "@electric-sql/pglite"; import { pg_trgm } from "@electric-sql/pglite/contrib/pg_trgm"; import { vector } from "@electric-sql/pglite/vector"; import { PGLiteSocketServer } from "@electric-sql/pglite-socket"; +import { assertExternalDepsResolvable } from "@lobu/connector-worker/compile"; import { ensureDefaultAgent } from "./auth/default-provisioning"; import { ensureInstallOperator } from "./auth/install-operator"; import { @@ -248,6 +249,22 @@ async function main(): Promise { } }, ], + // Mirror server.ts: crash loud if the runtime image is missing any + // connector external dep, instead of letting each feed silently fail + // with "Missing npm dependency: X" hours later. Runs after listen() so + // the sync require.resolve walk doesn't add to cold-boot latency. + // Without this hook, PGlite mode silently re-introduces the drift the + // refactor exists to prevent — flagged by pi review on #951. + postListenHooks: [ + () => { + try { + assertExternalDepsResolvable(require.resolve); + } catch (err) { + logger.error({ err }, "Connector external dependency check failed"); + process.exit(1); + } + }, + ], // PGlite-specific teardown — runs after stopLobuGateway + closeDbSingleton // so gateway's postgres.js connections release before the socket goes // away underneath them.