diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b93af865..614255caf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -192,9 +192,21 @@ jobs: run: bun install - name: Build packages server depends on + # embeddings → connector-worker order matters: connector-worker's + # src/embeddings.ts imports `@lobu/embeddings` and resolves it via + # the workspace symlink → dist. connector-worker itself is required + # because `packages/server/src/utils/connector-catalog.ts` imports + # `@lobu/connector-worker/compile` (the shared compile pipeline), + # which resolves via the package's `exports` field to + # `./dist/compile/index.js`. Vitest's vite-node resolver follows + # the `import` condition and fails to load if dist is absent — + # which transitively breaks every integration file whose import + # graph touches `queue-helpers` / `worker-api` / `connector-catalog`. run: | cd packages/core && bun run build && cd ../.. cd packages/connector-sdk && bun run build && cd ../.. + cd packages/embeddings && bun run build && cd ../.. + cd packages/connector-worker && bun run build && cd ../.. - name: Verify Postgres health (fail fast if pgvector setup is broken) run: | diff --git a/bun.lock b/bun.lock index ecbc11110..28766dad8 100644 --- a/bun.lock +++ b/bun.lock @@ -340,6 +340,7 @@ "@hono/node-server": "^1.13.7", "@hono/zod-openapi": "^1.2.1", "@lobu/connector-sdk": "workspace:*", + "@lobu/connector-worker": "workspace:*", "@lobu/core": "workspace:*", "@mariozechner/pi-ai": "^0.51.6", "@modelcontextprotocol/sdk": "^1.27.1", diff --git a/packages/cli/src/commands/_lib/connector-loader.ts b/packages/cli/src/commands/_lib/connector-loader.ts index 5421f87ee..991591bc2 100644 --- a/packages/cli/src/commands/_lib/connector-loader.ts +++ b/packages/cli/src/commands/_lib/connector-loader.ts @@ -1,33 +1,22 @@ /** * Connector source resolution + compilation for the CLI. * - * Mirrors `packages/server/src/utils/connector-catalog.ts` (the server-side - * versions of these helpers) but inlined here so the CLI doesn't depend on - * @lobu/server — that package is private and never published, while the CLI - * is what end users install from npm. - * * Lookup order for bundled connector source: * 1. dist/connectors/ next to this file (published CLI runtime — see * packages/cli/scripts/build.cjs which copies packages/connectors/src * there at build time). * 2. ../../../connectors/src relative to this file (monorepo dev). * 3. process.cwd()/packages/connectors/src (running from a parent dir). + * + * The resolver + esbuild bundle pipeline themselves live in + * `@lobu/connector-worker/compile` so the three call-sites (worker, CLI, + * server) share one implementation. */ -import { existsSync } from "node:fs"; -import { mkdtemp, readFile, rm, stat } from "node:fs/promises"; -import { createRequire } from "node:module"; -import { tmpdir } from "node:os"; -import { join, resolve } from "node:path"; -import { build, type Plugin } from "esbuild"; - -const require_ = createRequire(import.meta.url); -const SDK_ENTRY = require_.resolve("@lobu/connector-sdk"); - -// Single source of truth lives in @lobu/connector-worker. Duplicated here as -// a flat string array to keep this file self-contained — drift cost is low -// (the list is tiny and rarely changes; if a new external dep is added the -// CLI surfaces a clear bundling error and we update both places). -const EXTERNAL_RUNTIME_DEPS = ["playwright", "sharp", "jimp"] as const; +import { resolve } from "node:path"; +import { + createConnectorCompiler, + findBundledConnectorFile as findInDirs, +} from "@lobu/connector-worker/compile"; const SOURCE_DIR_CANDIDATES = [ // Published CLI runtime: packages/cli/scripts/build.cjs copies the @@ -44,92 +33,11 @@ const bundledFileCache = new Map(); export function findBundledConnectorFile(key: string): string | null { const cached = bundledFileCache.get(key); if (cached !== undefined) return cached; - // Mirror the resolver in @lobu/connector-worker's compile-connector.ts: - // subdir layout (`browser.evaluate` → `browser/evaluate.ts`) first, then - // the flat underscore convention (`chrome.tabs` → `chrome_tabs.ts`). - const candidates = [ - `${key.replace(/\./g, "/")}.ts`, - `${key.replace(/\./g, "_")}.ts`, - ]; - let found: string | null = null; - outer: for (const dir of SOURCE_DIR_CANDIDATES) { - for (const fileName of candidates) { - const filePath = resolve(dir, fileName); - if (!filePath.startsWith(`${dir}/`)) continue; - if (existsSync(filePath)) { - found = filePath; - break outer; - } - } - } + const found = findInDirs(key, SOURCE_DIR_CANDIDATES); bundledFileCache.set(key, found); return found; } -// Connectors import npm deps with the `npm:` prefix. Strip it so esbuild -// resolves the bare specifier against the local node_modules; mark unresolved -// ones external so the CLI bundle still produces (the runtime will fail loud -// if a missing dep is actually used). -const npmSpecifierPlugin: Plugin = { - name: "npm-specifier", - setup(b) { - b.onResolve({ filter: /^npm:/ }, async (args) => { - const bare = args.path - .slice(4) - .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, "$1") - .replace(/^([^/@]+)@[^/]*/, "$1"); - const resolved = await b.resolve(bare, { - resolveDir: args.resolveDir, - kind: args.kind, - }); - if (resolved.errors.length > 0) { - return { path: bare, external: true, errors: [], warnings: [] }; - } - return resolved; - }); - }, -}; - -const compiledFileCache = new Map(); +const compiler = createConnectorCompiler(); -export async function compileConnectorFromFile( - filePath: string -): Promise { - let mtimeMs: number | null = null; - try { - mtimeMs = (await stat(filePath)).mtimeMs; - const cached = compiledFileCache.get(filePath); - if (cached && cached.mtimeMs === mtimeMs) return cached.code; - } catch { - // stat failed — fall through and let the build surface the real error. - } - - const tmpDir = await mkdtemp(join(tmpdir(), "lobu-cli-connector-")); - const outPath = join(tmpDir, "out.mjs"); - - try { - await build({ - entryPoints: [filePath], - outfile: outPath, - bundle: true, - format: "esm", - platform: "node", - target: "node20", - alias: { lobu: SDK_ENTRY, "@lobu/connector-sdk": SDK_ENTRY }, - banner: { - js: "import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);", - }, - plugins: [npmSpecifierPlugin], - external: [...EXTERNAL_RUNTIME_DEPS], - write: true, - minify: false, - sourcemap: false, - }); - - const code = await readFile(outPath, "utf-8"); - if (mtimeMs !== null) compiledFileCache.set(filePath, { mtimeMs, code }); - return code; - } finally { - await rm(tmpDir, { recursive: true, force: true }); - } -} +export const compileConnectorFromFile = compiler.compileConnectorFromFile; diff --git a/packages/cli/src/commands/_lib/connector-run-cmd.ts b/packages/cli/src/commands/_lib/connector-run-cmd.ts index 9e8e936a3..75957cc95 100644 --- a/packages/cli/src/commands/_lib/connector-run-cmd.ts +++ b/packages/cli/src/commands/_lib/connector-run-cmd.ts @@ -26,6 +26,7 @@ import { homedir } from "node:os"; import { join } from "node:path"; import { printText } from "../memory/_lib/output.js"; +import type { EventEnvelope } from "@lobu/connector-sdk"; import { resolveContext } from "../../internal/context.js"; import { getUsableToken, resolveOrg } from "../memory/_lib/openclaw-auth.js"; import { @@ -340,7 +341,7 @@ export async function connectorRun( printText(`Compiling ${connectorKey} from ${sourcePath}...`); const compiledCode = await compileConnectorFromFile(sourcePath); - // Build the SyncContext shape that executeCompiledConnector expects. + // Build the ExecutorJob shape that executeCompiledConnector expects. // For mirror profiles we layer two acquisition paths: // 1. DevToolsActivePort lookup against the source Chrome's // user-data root. If the file is there, Chrome is exposing a @@ -422,31 +423,37 @@ export async function connectorRun( process.once("SIGINT", () => onSignal("SIGINT")); process.once("SIGTERM", () => onSignal("SIGTERM")); - // Stream progress to stderr (so --json output to stdout stays clean). - let streamedCount = 0; - const onContentChunk = (items: any[]) => { - streamedCount += items.length; - process.stderr.write(` ... ${streamedCount} events so far\n`); + // Sync events stream via the onEventChunk hook now (the executor used to + // collect them into result.contents for us — that path is gone). Collect + // locally so the artifact/--json output still has the full payload. + const collectedEvents: EventEnvelope[] = []; + const onEventChunk = (events: EventEnvelope[]) => { + for (const event of events) collectedEvents.push(event); + process.stderr.write(` ... ${collectedEvents.length} events so far\n`); }; printText(`Running ${connectorKey} (subprocess executor)...`); const startMs = Date.now(); - const result = await (executeCompiledConnector as any)({ - mode: "sync", + const result = await executeCompiledConnector({ compiledCode, - config: mergedConfig, - checkpoint, - env: process.env as Record, - connectionCredentials: {}, - sessionState, - credentials: {}, - feedKey: feed?.feed_key ?? `${connectorKey}-cli-run`, - entityIds: [], - apiType: "browser" as const, - hooks: { onContentChunk, collectContents: true }, + job: { + mode: "sync", + config: mergedConfig, + checkpoint, + env: process.env as Record, + sessionState, + credentials: null, + feedKey: feed?.feed_key ?? `${connectorKey}-cli-run`, + entityIds: [], + }, + hooks: { onEventChunk }, }); const durationMs = Date.now() - startMs; + if (result.mode !== "sync") { + throw new Error(`Expected sync result, got mode=${result.mode}`); + } + // Save artifact for debugging / sharing. ~/.lobu/cache/connector-runs/.json. const cacheDir = join(homedir(), ".lobu", "cache", "connector-runs"); mkdirSync(cacheDir, { recursive: true }); @@ -459,9 +466,9 @@ export async function connectorRun( config: mergedConfig, input_checkpoint: checkpoint, duration_ms: durationMs, - event_count: result.contents.length, + event_count: collectedEvents.length, next_checkpoint: result.checkpoint, - events: result.contents, + events: collectedEvents, metadata: result.metadata ?? {}, }; await writeFile(artifactPath, JSON.stringify(artifact, null, 2), "utf-8"); @@ -471,10 +478,10 @@ export async function connectorRun( } else { printText(""); printText(`✓ Completed in ${(durationMs / 1000).toFixed(1)}s`); - printText(` Events: ${result.contents.length}`); - if (result.contents.length > 0) { + printText(` Events: ${collectedEvents.length}`); + if (collectedEvents.length > 0) { printText(" Sample:"); - printText(summarizeEvents(result.contents)); + printText(summarizeEvents(collectedEvents)); } if (result.checkpoint) { printText(` Next checkpoint: ${JSON.stringify(result.checkpoint)}`); diff --git a/packages/connector-sdk/src/connector-runtime.ts b/packages/connector-sdk/src/connector-runtime.ts index dc4e0203b..93e218df9 100644 --- a/packages/connector-sdk/src/connector-runtime.ts +++ b/packages/connector-sdk/src/connector-runtime.ts @@ -21,7 +21,11 @@ import type { * Subclasses must: * - Set `definition` with connector metadata * - Implement `sync()` for feed data ingestion - * - Implement `execute()` for action execution + * + * Subclasses may optionally override `execute()` and `authenticate()`; both + * have safe defaults (action rejected with `{ success: false, ... }`, auth + * throws). Connectors that don't declare any `actions` in their definition + * need not override `execute()`. * * @example * ```ts @@ -59,11 +63,18 @@ export abstract class ConnectorRuntime { * Execute an action on the connected service. * * Called either inline (low-risk) or by the worker (high-risk with approval). + * Default implementation rejects with "Actions not supported" — connectors + * that don't declare any `actions` in their definition need not override. + * The `ctx` parameter is part of the public contract (subclasses overriding + * this method receive the full `ActionContext`); the base impl ignores it. * * @param ctx - Action context with action key, input, and credentials * @returns Action result with output data */ - abstract execute(ctx: ActionContext): Promise; + // biome-ignore lint/correctness/noUnusedFunctionParameters: contract signature — subclasses receive the full ActionContext + async execute(ctx: ActionContext): Promise { + return { success: false, error: 'Actions not supported' }; + } /** * Run an interactive authentication flow that produces credentials for the diff --git a/packages/connector-sdk/src/connector-types.ts b/packages/connector-sdk/src/connector-types.ts index 92062a4f1..69441559d 100644 --- a/packages/connector-sdk/src/connector-types.ts +++ b/packages/connector-sdk/src/connector-types.ts @@ -315,13 +315,6 @@ export const IDENTITY = { export type IdentityNamespace = (typeof IDENTITY)[keyof typeof IDENTITY]; -export enum FeedMode { - /** Connector code runs on worker, syncs data */ - sync = 'sync', - /** Virtual feed backed by saved queries (future) */ - virtual = 'virtual', -} - // ============================================================================= // Action Definition // ============================================================================= diff --git a/packages/connector-sdk/src/event-taxonomy.ts b/packages/connector-sdk/src/event-taxonomy.ts deleted file mode 100644 index a401a6f89..000000000 --- a/packages/connector-sdk/src/event-taxonomy.ts +++ /dev/null @@ -1,31 +0,0 @@ -export const SOURCE_NATIVE_EVENT_TYPES = [ - 'article', - 'ask_hn', - 'comment', - 'commit', - 'discussion', - 'email', - 'file', - 'issue', - 'issue_comment', - 'message', - 'photo', - 'post', - 'pr_comment', - 'pull_request', - 'reply', - 'repository', - 'review', - 'section', - 'show_hn', - 'story', - 'thread', - 'tweet', - 'video', -] as const; - -const SOURCE_NATIVE_EVENT_TYPE_SET = new Set(SOURCE_NATIVE_EVENT_TYPES); - -export function isSourceNativeEventType(value: string | null | undefined): boolean { - return typeof value === 'string' && SOURCE_NATIVE_EVENT_TYPE_SET.has(value); -} diff --git a/packages/connector-sdk/src/index.ts b/packages/connector-sdk/src/index.ts index 90dbe2acc..95e54a5e8 100644 --- a/packages/connector-sdk/src/index.ts +++ b/packages/connector-sdk/src/index.ts @@ -38,7 +38,6 @@ export type { EventEnvelope, Feed, FeedDefinition, - FeedMode, IdentityNamespace, Run, RunStatus, @@ -62,10 +61,8 @@ export { DerivedRelationshipMetadata, FactEventMetadata, IDENTITY_FACT_SEMANTIC_TYPE, - MatchStrategy, RelationshipTypeIdentityMetadata, } from './identity-types.js'; -export { isSourceNativeEventType, SOURCE_NATIVE_EVENT_TYPES } from './event-taxonomy.js'; export { normalizeAuthUserId, normalizeEmail, @@ -108,36 +105,16 @@ export type { AcquireBrowserOptions, AcquiredBrowser } from './browser/acquire.j export { acquireBrowser, BrowserAuthCascadeError } from './browser/acquire.js'; export type { CdpVersionInfo, ResolveCdpOptions } from './browser/cdp.js'; export { - discoverChromeListeningPorts, - discoverChromeProcessCdpUrls, fetchCdpVersionInfo, - normalizeCdpUrl, resolveCdpUrl, - tryWebSocketCdp, } from './browser/cdp.js'; export { CdpPage } from './browser/cdp-page.js'; export type { BrowserLaunchOptions, EnhancedBrowser } from './browser/launcher.js'; export { captureErrorArtifacts, launchBrowser, - withErrorCapture, } from './browser/launcher.js'; -export type { StealthBrowser, StealthBrowserOptions } from './browser/stealth.js'; -export { - getRandomDelay, - humanWait, - launchStealthBrowser, - randomScroll, - testBotDetection, -} from './browser/stealth.js'; export type { BrowserNetworkConfig, BrowserNetworkResult } from './browser-network.js'; export { browserNetworkSync } from './browser-network.js'; -export type { ReactionContext, ReactionEntity } from './reaction-sdk.js'; -export type { - Checkpoint, - Content, - Env, - FeedOptions, - FeedSyncResult, - SessionState, -} from './types.js'; +export type { ReactionContext } from './reaction-sdk.js'; +export type { Env } from './types.js'; diff --git a/packages/connector-sdk/src/types.ts b/packages/connector-sdk/src/types.ts index 98f8a89f9..91ba79d27 100644 --- a/packages/connector-sdk/src/types.ts +++ b/packages/connector-sdk/src/types.ts @@ -1,87 +1,7 @@ -/** - * Checkpoint data structure for tracking feed sync state - */ -export interface Checkpoint { - // Required for all feeds - used to filter content by time - last_timestamp?: Date; - - // Metadata - updated_at: Date; - total_items_processed?: number; - - // Platform-specific fields should extend this interface -} - -/** - * Sync result containing extracted content and updated checkpoint - * - * Note: checkpoint can be null for feeds that use incremental checkpoint - * updates via updateCheckpointFn during pagination (e.g., Reddit) - */ -export interface FeedSyncResult { - contents: Content[]; - checkpoint: Checkpoint | null; - metadata?: { - items_found: number; - items_skipped: number; - rate_limit_remaining?: number; - next_sync_recommended_at?: Date; - parent_map?: Record; // For hierarchical content (e.g., GitHub comments -> issues) - [key: string]: any; // Allow additional feed-specific metadata - }; - /** - * Auth state to persist after sync (browser cookies, etc.) - * Will be saved back to the linked auth profile for browser-based connectors. - */ - auth_update?: Record; -} - -/** - * Extracted content from platform - */ -export interface Content { - origin_id: string; // Platform's unique ID - payload_text: string; // Main text content - title?: string; // Title of content (e.g., post title, issue title, review subject) - author_name?: string; // Username/display name - source_url: string; // Link to original content - occurred_at: Date; // When content was posted - - // Source-native item type (e.g. 'thread', 'message', 'email', 'issue', 'review') - origin_type?: string; - - // Semantic type inside Lobu (defaults to 'content' for raw connector ingests) - semantic_type?: string; - - // Calculated engagement score (0-100, calculated by feed implementation) - score: number; - - // Optional parent reference for hierarchical content - origin_parent_id?: string | null; - - // Metadata including engagement metrics (platform-specific) - // Engagement fields: score, upvotes, downvotes, rating, helpful_count, reply_count, likes, views, retweets, replies, comments - // Platform fields: post_id, parent_id, etc. - metadata?: Record; -} - -/** - * Feed options passed from MCP tool - */ -export interface FeedOptions { - /** - * Number of days to look back when collecting historical data - * Default: 365 (1 year) - */ - lookback_days?: number; - - // Platform-specific options defined in each feed - [key: string]: any; -} - /** * Consolidated environment bindings used across the platform. - * This is the single source of truth for environment variable types. + * This is the single source of truth for environment variable types + * read by the gateway, worker, and connector code. */ export interface Env { // Environment @@ -154,11 +74,3 @@ export interface Env { // Allow any other env vars accessed via c.env[key] [key: string]: string | undefined; } - -/** - * Base session state type - feeds define their own specific types - * Values can come from env vars (defaults) or DB (per-connection overrides) - * At runtime, DB values override env defaults - */ -export type SessionState = Record; - diff --git a/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts b/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts index eb03dbbde..3bdee9591 100644 --- a/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts +++ b/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts @@ -17,23 +17,28 @@ * natively, so the import works as-is. */ import { describe, expect, test } from 'bun:test'; -import type { SyncContext } from '../src/executor/interface.ts'; +import type { ExecutorJob } from '../src/executor/interface.ts'; import { SubprocessError, SubprocessExecutor } from '../src/executor/subprocess.ts'; -const BASE_CONTEXT: SyncContext = { - options: {} as any, +// Minimal V1 ExecutorJob — see subprocess.test.ts for shape rationale. +const BASE_JOB: ExecutorJob = { + mode: 'sync', + feedKey: 'integration-test', + config: {}, checkpoint: null, + entityIds: [], + credentials: null, + sessionState: null, env: {}, - apiType: 'api', }; function compiled(body: string): string { return ` class ConnectorRuntime { - async sync(_ctx, _hooks) { + async sync(_ctx) { ${body} } - async execute() { return { contents: [], checkpoint: null }; } + async execute() { return { success: false, error: 'no actions' }; } } module.exports = { ConnectorRuntime }; `; @@ -52,7 +57,7 @@ describe('SubprocessExecutor (source-mode, Bun runtime)', () => { console.log('source-mode child ran'); process.exit(1); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; diff --git a/packages/connector-worker/integration-tests/subprocess.test.ts b/packages/connector-worker/integration-tests/subprocess.test.ts index 7937ad138..2bc8077b3 100644 --- a/packages/connector-worker/integration-tests/subprocess.test.ts +++ b/packages/connector-worker/integration-tests/subprocess.test.ts @@ -12,23 +12,36 @@ * runs that haven't built the workspace. */ import { describe, expect, test } from 'bun:test'; -import type { SyncContext } from '../dist/executor/interface.js'; +import type { ExecutorJob } from '../dist/executor/interface.js'; import { SubprocessError, SubprocessExecutor } from '../dist/executor/subprocess.js'; -const BASE_CONTEXT: SyncContext = { - options: {} as any, +// Minimal V1 ExecutorJob — the subprocess executor only reads what each +// connector body needs; the diagnostic tests below exercise the crash / +// throw / redact paths, which don't touch most fields, but the shape +// itself has to be valid `ExecutorJob` so the parent's IPC envelope is +// well-formed. +const BASE_JOB: ExecutorJob = { + mode: 'sync', + feedKey: 'integration-test', + config: {}, checkpoint: null, + entityIds: [], + credentials: null, + sessionState: null, env: {}, - apiType: 'api', }; +// Synthetic connector source. The class needs `sync` and `execute` on its +// prototype for `findRuntimeClass` to accept it. `sync` receives a V1 +// `SyncContext` (with `emitEvents` / `updateCheckpoint` hooks), `execute` +// returns a V1 `ActionResult` — both unused by these crash-path tests. function compiled(body: string): string { return ` class ConnectorRuntime { - async sync(_ctx, _hooks) { + async sync(_ctx) { ${body} } - async execute() { return { contents: [], checkpoint: null }; } + async execute() { return { success: false, error: 'no actions' }; } } module.exports = { ConnectorRuntime }; `; @@ -45,7 +58,7 @@ describe('SubprocessExecutor diagnostic capture', () => { console.log('about to die hard'); process.exit(1); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -65,7 +78,7 @@ describe('SubprocessExecutor diagnostic capture', () => { compiled(` throw new Error('connector blew up'); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -84,7 +97,7 @@ describe('SubprocessExecutor diagnostic capture', () => { setTimeout(() => { throw new Error('async tick throw'); }, 0); await new Promise(() => {}); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -103,7 +116,7 @@ describe('SubprocessExecutor diagnostic capture', () => { Promise.reject(new Error('dangling rejection')); await new Promise(() => {}); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -123,7 +136,7 @@ describe('SubprocessExecutor diagnostic capture', () => { console.error('CH_API_KEY=longvaluesecret789'); process.exit(1); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -142,7 +155,7 @@ describe('SubprocessExecutor diagnostic capture', () => { compiled(` throw new Error('upstream failed: api_key=sk_live_abcdefghijklmn123'); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -163,7 +176,7 @@ describe('SubprocessExecutor diagnostic capture', () => { compiled(` await new Promise(() => {}); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; diff --git a/packages/connector-worker/package.json b/packages/connector-worker/package.json index 2aaff17e8..59e4568f2 100644 --- a/packages/connector-worker/package.json +++ b/packages/connector-worker/package.json @@ -23,6 +23,10 @@ "./executor/interface": { "import": "./dist/executor/interface.js", "types": "./dist/executor/interface.d.ts" + }, + "./compile": { + "import": "./dist/compile/index.js", + "types": "./dist/compile/index.d.ts" } }, "files": [ diff --git a/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts b/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts index c1e805ead..950cf20d3 100644 --- a/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts +++ b/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts @@ -20,8 +20,8 @@ import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from 'bun: type AnyFn = (...args: any[]) => any; const executeCompiledConnectorMock = mock(async () => ({ - contents: [], - checkpoint: null, + mode: 'action', + output: { ok: true }, })); const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => @@ -30,8 +30,6 @@ const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => mock.module('../executor/runtime.js', () => ({ executeCompiledConnector: executeCompiledConnectorMock, - getActionOutput: () => ({ ok: true }), - normalizeEventEnvelope: (e: Record) => e, })); mock.module('../embeddings.js', () => ({ @@ -133,7 +131,7 @@ describe('executor heartbeats (lobu#860)', () => { // have fired — simulates the action body taking 60+ seconds. executeCompiledConnectorMock.mockImplementationOnce(async () => { await fireIntervalTicks(2); - return { contents: [{ result: { ok: true } }], checkpoint: null }; + return { mode: 'action', output: { ok: true } }; }); const job = { @@ -185,8 +183,8 @@ describe('executor heartbeats (lobu#860)', () => { test('heartbeat interval is cleared after a successful action run', async () => { const client = makeStubClient(); executeCompiledConnectorMock.mockImplementationOnce(async () => ({ - contents: [{ result: { ok: true } }], - checkpoint: null, + mode: 'action', + output: { ok: true }, })); const job = { diff --git a/packages/connector-worker/src/compile-connector.ts b/packages/connector-worker/src/compile-connector.ts index 00b5840ad..d357c60ad 100644 --- a/packages/connector-worker/src/compile-connector.ts +++ b/packages/connector-worker/src/compile-connector.ts @@ -1,41 +1,35 @@ /** - * Worker-side connector compiler. + * Worker-side connector resolver + compile entry point. * - * Until lobu#771's perf brainstorm, the gateway compiled connector bundles - * via esbuild and shipped the ~13 MB output inline in every `/api/workers/poll` - * response. The gateway pod held all ~29 connector bundles in its - * compile cache (~384 MB, dominant heap occupant under the 1 GiB limit; - * see lobu#771 for the heap-snapshot trail). + * Until lobu#771, the gateway compiled connector bundles via esbuild and + * shipped the ~13 MB output inline in every `/api/workers/poll` response. + * The gateway pod held all ~29 connector bundles in its compile cache + * (~384 MB, dominant heap occupant under the 1 GiB limit). For fleet + * workers (and embedded mode where worker + gateway share a host), the + * bundled connector .ts source is on disk in both pods — the gateway + * doesn't need to compile or ship it. The gateway now sends only the + * `connector_key` and this module compiles locally in the worker process. * - * For fleet workers (and embedded mode where worker + gateway share a host), - * the bundled connector .ts source is on disk in both pods — the gateway - * doesn't need to compile or ship it. The gateway now sends - * `connector_source_path` (the absolute path that `findBundledConnectorFile` - * resolved) and this module compiles locally in the worker process. + * Device workers (Lobu Mac Bridge, etc.) keep getting `compiled_code` + * inline — they don't have the connectors directory on disk. * - * Device workers (Lobu Mac Bridge, etc.) keep getting `compiled_code` inline - * — they don't have the connectors directory on disk. - * - * This file mirrors the server's `utils/connector-catalog.ts` compile pipeline. - * Kept here (rather than shared) so the connector-worker package doesn't take - * a runtime dep on the server package's bundle layout. + * The resolver + esbuild bundle pipeline themselves are owned by the + * shared `./compile` module so the gateway and CLI sides don't drift. + * This file just supplies the worker-image-specific candidate dirs. */ -import { existsSync } from 'node:fs'; -import { mkdtemp, readFile, rm, stat } from 'node:fs/promises'; -import { createRequire } from 'node:module'; -import { tmpdir } from 'node:os'; -import { join, resolve } from 'node:path'; +import { resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; -import { build, type Plugin } from 'esbuild'; -import { EXTERNAL_RUNTIME_DEPS } from './runtime-deps.js'; +import { + createConnectorCompiler, + findBundledConnectorFile as findInDirs, +} from './compile/index.js'; // Worker-side resolver for the bundled connectors directory. The gateway's -// `findBundledConnectorFile` resolves to paths that exist in the gateway -// image (e.g. /app/packages/server/dist/connectors); those paths do not -// exist in the worker image, which has the sources at -// /app/packages/connectors. Each side resolves locally instead of trusting -// gateway-supplied absolute paths. +// resolver targets paths that exist in the gateway image +// (e.g. /app/packages/server/dist/connectors); those paths do not exist +// in the worker image, which has the sources at /app/packages/connectors. +// Each side resolves locally instead of trusting gateway-supplied paths. const HERE = fileURLToPath(new URL('.', import.meta.url)); const WORKER_CONNECTOR_DIR_CANDIDATES = [ // Monorepo: workspace package at packages/connector-worker/src/ → @@ -58,120 +52,10 @@ const WORKER_CONNECTOR_DIR_CANDIDATES = [ resolve(process.cwd(), 'connectors'), ]; -// Strict regex for connector_key: lowercase letters/digits, optional dots -// for namespacing, underscores for word separators. Defense-in-depth even -// though keys come from a trusted DB column — we're about to use the value -// to construct a filesystem path. -const CONNECTOR_KEY_RE = /^[a-z][a-z0-9]*(?:[._][a-z0-9]+)*$/; - export function findBundledConnectorFile(key: string): string | null { - if (!CONNECTOR_KEY_RE.test(key)) return null; - // Two filename conventions: - // - Subdirectory layout (preferred for grouped primitives): the dot in - // `browser.evaluate` maps to `browser/evaluate.ts`. Lets us co-locate - // related connectors without renaming the key. - // - Flat-with-underscores (existing convention): `chrome.tabs` → - // `chrome_tabs.ts`, `apple_health` → `apple_health.ts`. - // Try subdirectory first so newer primitives win when both happen to exist. - const candidates = [ - `${key.replace(/\./g, '/')}.ts`, - `${key.replace(/\./g, '_')}.ts`, - ]; - for (const dir of WORKER_CONNECTOR_DIR_CANDIDATES) { - for (const fileName of candidates) { - const filePath = resolve(dir, fileName); - // Belt-and-braces: the resolved path must stay under the candidate - // dir. CONNECTOR_KEY_RE already forbids `..`, but the regex doesn't - // know about our path-joining choices. - if (!filePath.startsWith(`${dir}/`)) continue; - if (existsSync(filePath)) return filePath; - } - } - return null; -} - -const require_ = createRequire(import.meta.url); -const SDK_ENTRY = require_.resolve('@lobu/connector-sdk'); - -// Connectors declare runtime npm deps via `import x from 'npm:foo@1.2.3'`. -// Strip the prefix so esbuild resolves the bare name against node_modules; -// when the package isn't installed here, mark it external so the bundle -// still emits. The worker image is expected to provide it. -const npmSpecifierPlugin: Plugin = { - name: 'npm-specifier', - setup(b) { - b.onResolve({ filter: /^npm:/ }, async (args) => { - const bare = args.path - .slice(4) - .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, '$1') - .replace(/^([^/@]+)@[^/]*/, '$1'); - const resolved = await b.resolve(bare, { - resolveDir: args.resolveDir, - kind: args.kind, - }); - if (resolved.errors.length > 0) { - return { path: bare, external: true, errors: [], warnings: [] }; - } - return resolved; - }); - }, -}; - -// LRU-capped cache. Worker daemon is long-lived; one entry per recently-used -// connector. See the matching cache in -// `packages/server/src/utils/connector-catalog.ts` for the cap rationale. -const COMPILED_FILE_CACHE_MAX = 8; -const compiledFileCache = new Map(); - -function touchCacheEntry(filePath: string, entry: { mtimeMs: number; code: string }): void { - compiledFileCache.delete(filePath); - compiledFileCache.set(filePath, entry); - while (compiledFileCache.size > COMPILED_FILE_CACHE_MAX) { - const oldest = compiledFileCache.keys().next().value; - if (oldest === undefined) break; - compiledFileCache.delete(oldest); - } + return findInDirs(key, WORKER_CONNECTOR_DIR_CANDIDATES); } -export async function compileConnectorFromFile(filePath: string): Promise { - let mtimeMs: number | null = null; - try { - mtimeMs = (await stat(filePath)).mtimeMs; - const cached = compiledFileCache.get(filePath); - if (cached && cached.mtimeMs === mtimeMs) { - touchCacheEntry(filePath, cached); - return cached.code; - } - } catch { - // stat failed — let the build surface the real error. - } - - const tmpDir = await mkdtemp(join(tmpdir(), 'lobu-connector-worker-')); - const outPath = join(tmpDir, 'out.mjs'); +const compiler = createConnectorCompiler(); - try { - await build({ - entryPoints: [filePath], - outfile: outPath, - bundle: true, - format: 'esm', - platform: 'node', - target: 'node20', - alias: { lobu: SDK_ENTRY, '@lobu/connector-sdk': SDK_ENTRY }, - banner: { - js: `import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);`, - }, - plugins: [npmSpecifierPlugin], - external: [...EXTERNAL_RUNTIME_DEPS], - write: true, - minify: false, - sourcemap: false, - }); - - const code = await readFile(outPath, 'utf-8'); - if (mtimeMs !== null) touchCacheEntry(filePath, { mtimeMs, code }); - return code; - } finally { - await rm(tmpDir, { recursive: true, force: true }); - } -} +export const compileConnectorFromFile = compiler.compileConnectorFromFile; diff --git a/packages/connector-worker/src/compile/index.ts b/packages/connector-worker/src/compile/index.ts new file mode 100644 index 000000000..f0a9b4b0e --- /dev/null +++ b/packages/connector-worker/src/compile/index.ts @@ -0,0 +1,229 @@ +/** + * Shared connector compile pipeline. + * + * Three packages (`@lobu/connector-worker` itself, `@lobu/cli`, and + * `@lobu/server`) each used to ship their own near-identical copies of: + * + * - `findBundledConnectorFile(key)` — walks a list of candidate dirs + * trying both filename conventions (`browser.evaluate → browser/evaluate.ts` + * and `chrome.tabs → chrome_tabs.ts`). + * - `compileConnectorFromFile(filePath)` — esbuild bundle with the + * `npm:` specifier plugin, the `lobu` / `@lobu/connector-sdk` aliases, + * `EXTERNAL_RUNTIME_DEPS` externalised, and an mtime-keyed LRU cache. + * - The `npm:` specifier resolver plugin. + * - The `EXTERNAL_RUNTIME_DEPS` constant. + * + * Three copies meant three "keep these in sync" comments and three places + * to fix every esbuild-flag or candidate-dir change. This module is the + * one place that owns those mechanics; each caller supplies its own + * candidate-dir list (and optional warn hook) since those are genuinely + * environment-specific (gateway pod vs worker pod vs npm-installed CLI). + */ + +import { existsSync } from 'node:fs'; +import { mkdtemp, readFile, rm, stat } from 'node:fs/promises'; +import { createRequire } from 'node:module'; +import { tmpdir } from 'node:os'; +import { join, resolve } from 'node:path'; +import { build, type Plugin } from 'esbuild'; +import { EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; + +export { assertExternalDepsResolvable, EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; + +// Strict regex for connector_key: lowercase letters/digits, optional dots +// for namespacing, underscores for word separators. Defense-in-depth even +// though keys come from a trusted DB column — we're about to use the value +// to construct a filesystem path. +const CONNECTOR_KEY_RE = /^[a-z][a-z0-9]*(?:[._][a-z0-9]+)*$/; + +/** + * Resolve a connector_key to a `.ts` source file under one of the supplied + * candidate directories. + * + * Tries two filename conventions in order: + * - subdirectory layout: `browser.evaluate` → `browser/evaluate.ts` + * (lets us group related primitives without renaming the key); + * - flat-with-underscores: `chrome.tabs` → `chrome_tabs.ts` + * (existing convention). + * + * Returns the absolute path of the first match, or `null` if none exists. + * Performs no caching of its own — callers that hit this on a hot path + * (gateway worker-poll, CLI compile loop) can layer their own memo on + * top, since the right TTL depends on whether they expect new connector + * files to appear at runtime. + */ +export function findBundledConnectorFile( + key: string, + candidateDirs: readonly string[] +): string | null { + if (!CONNECTOR_KEY_RE.test(key)) return null; + const candidates = [ + `${key.replace(/\./g, '/')}.ts`, + `${key.replace(/\./g, '_')}.ts`, + ]; + for (const dir of candidateDirs) { + for (const fileName of candidates) { + const filePath = resolve(dir, fileName); + // Belt-and-braces: the resolved path must stay under the candidate + // dir. CONNECTOR_KEY_RE already forbids `..`, but the regex doesn't + // know about our path-joining choices. + if (!filePath.startsWith(`${dir}/`)) continue; + if (existsSync(filePath)) return filePath; + } + } + return null; +} + +/** + * Resolve the `@lobu/connector-sdk` module entry from this module's + * perspective. Used as the esbuild `alias` target so connector code that + * imports `from 'lobu'` or `from '@lobu/connector-sdk'` resolves to the + * same physical file the runtime will import — avoiding the + * `instanceof ConnectorRuntime` cross-realm trap. + */ +function resolveSdkEntry(): string { + const require_ = createRequire(import.meta.url); + return require_.resolve('@lobu/connector-sdk'); +} + +interface NpmSpecifierPluginOptions { + /** + * Called when a `npm:foo@1.2.3` import resolves to a package that's not + * installed in the current environment. The plugin externalises the + * import (so the bundle still emits) and the runtime must supply it. + * Use this hook to log / surface the externalisation. + */ + onUnresolved?: (info: { bareSpecifier: string; importer: string }) => void; +} + +/** + * esbuild plugin that strips the `npm:` prefix from connector imports + * (`import x from 'npm:foo@1.2.3'`) and resolves the bare specifier + * against node_modules. When the package isn't installed in the build + * environment, externalises so the bundle still produces — the runtime + * is expected to provide it. + */ +export function createNpmSpecifierPlugin(options?: NpmSpecifierPluginOptions): Plugin { + return { + name: 'npm-specifier', + setup(b) { + b.onResolve({ filter: /^npm:/ }, async (args) => { + const bare = args.path + .slice(4) + .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, '$1') + .replace(/^([^/@]+)@[^/]*/, '$1'); + const resolved = await b.resolve(bare, { + resolveDir: args.resolveDir, + kind: args.kind, + }); + if (resolved.errors.length > 0) { + options?.onUnresolved?.({ bareSpecifier: bare, importer: args.importer }); + return { path: bare, external: true, errors: [], warnings: [] }; + } + return resolved; + }); + }, + }; +} + +interface CompileOptions { + /** + * Max entries kept in the mtime-keyed LRU. Each entry is the full + * compiled bundle (~13 MB today, smaller as transitive deps are + * externalised). Cap default 8 keeps memory bounded; pass a smaller + * value in memory-constrained environments. + * @default 8 + */ + cacheMax?: number; + /** + * Override the `@lobu/connector-sdk` entry esbuild aliases against. + * Defaults to the SDK resolved from this module's `require.resolve`. + * Overriding is only useful when the caller knows of a more + * appropriate physical file (e.g. a server bundle that wants to point + * back at a sibling dist file). + */ + sdkEntry?: string; + /** + * Hook fired when `npm:` specifiers fail to resolve and the import is + * externalised. Forwarded to `createNpmSpecifierPlugin`. + */ + onUnresolvedNpm?: NpmSpecifierPluginOptions['onUnresolved']; +} + +const DEFAULT_CACHE_MAX = 8; + +/** + * Compile a single connector source file to an ESM bundle string, + * suitable for the executor's subprocess `import()` step. + * + * The returned bundle: + * - is ESM (`format: 'esm'`, `target: 'node20'`); + * - aliases `lobu` and `@lobu/connector-sdk` to the SDK entry so + * connectors targeting either specifier resolve to the same module; + * - has a banner injecting a CJS-compatible `require` shim; + * - externalises `EXTERNAL_RUNTIME_DEPS` (native deps + Playwright); + * - is mtime-cached: a repeat call with the same `filePath` whose + * mtime hasn't changed returns the cached bundle without hitting + * esbuild. + */ +export function createConnectorCompiler(options?: CompileOptions) { + const cacheMax = options?.cacheMax ?? DEFAULT_CACHE_MAX; + const sdkEntry = options?.sdkEntry ?? resolveSdkEntry(); + const compiledFileCache = new Map(); + const plugin = createNpmSpecifierPlugin({ onUnresolved: options?.onUnresolvedNpm }); + + function touchCacheEntry(filePath: string, entry: { mtimeMs: number; code: string }): void { + compiledFileCache.delete(filePath); + compiledFileCache.set(filePath, entry); + while (compiledFileCache.size > cacheMax) { + const oldest = compiledFileCache.keys().next().value; + if (oldest === undefined) break; + compiledFileCache.delete(oldest); + } + } + + async function compileConnectorFromFile(filePath: string): Promise { + let mtimeMs: number | null = null; + try { + mtimeMs = (await stat(filePath)).mtimeMs; + const cached = compiledFileCache.get(filePath); + if (cached && cached.mtimeMs === mtimeMs) { + touchCacheEntry(filePath, cached); + return cached.code; + } + } catch { + // stat failed — let the build surface the real error. + } + + const tmpDir = await mkdtemp(join(tmpdir(), 'lobu-connector-')); + const outPath = join(tmpDir, 'out.mjs'); + + try { + await build({ + entryPoints: [filePath], + outfile: outPath, + bundle: true, + format: 'esm', + platform: 'node', + target: 'node20', + alias: { lobu: sdkEntry, '@lobu/connector-sdk': sdkEntry }, + banner: { + js: `import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);`, + }, + plugins: [plugin], + external: [...EXTERNAL_RUNTIME_DEPS], + write: true, + minify: false, + sourcemap: false, + }); + + const code = await readFile(outPath, 'utf-8'); + if (mtimeMs !== null) touchCacheEntry(filePath, { mtimeMs, code }); + return code; + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } + } + + return { compileConnectorFromFile }; +} diff --git a/packages/connector-worker/src/daemon/executor.ts b/packages/connector-worker/src/daemon/executor.ts index 75c15400d..0ce3cc420 100644 --- a/packages/connector-worker/src/daemon/executor.ts +++ b/packages/connector-worker/src/daemon/executor.ts @@ -5,14 +5,10 @@ * Generates embeddings and streams results. */ -import type { Checkpoint, Content, Env } from '@lobu/connector-sdk'; +import type { Env, EventEnvelope } from '@lobu/connector-sdk'; import { compileConnectorFromFile, findBundledConnectorFile } from '../compile-connector.js'; import { batchGenerateEmbeddings, generateEmbedding } from '../embeddings.js'; -import { - executeCompiledConnector, - getActionOutput, - normalizeEventEnvelope, -} from '../executor/runtime.js'; +import { executeCompiledConnector } from '../executor/runtime.js'; import { SubprocessExecutor } from '../executor/subprocess.js'; import type { ContentItem, ExecutorClient, PollResponse } from './client.js'; @@ -207,23 +203,21 @@ async function executeSyncRun( }; const result = await executeCompiledConnector({ - mode: 'sync', compiledCode: compiled_code, - config: (feedConfig ?? {}) as Record, - checkpoint: checkpoint as unknown as Checkpoint | null, - env, - connectionCredentials: ((job.connection_credentials as Record) ?? - null) as Record | null, - sessionState: (job.session_state ?? null) as Record | null, - credentials, - feedKey: feed_key, - entityIds: job.entity_ids ?? [], - apiType: 'api', executor: subprocessExecutor, + job: { + mode: 'sync', + config: mergeEnv(env, job.connection_credentials, feedConfig), + checkpoint: checkpoint as Record | null, + env, + sessionState: (job.session_state ?? null) as Record | null, + credentials: credentials ?? null, + feedKey: feed_key, + entityIds: job.entity_ids ?? [], + }, hooks: { - collectContents: false, onCheckpointUpdate: async (nextCheckpoint) => { - lastCheckpoint = nextCheckpoint as Record | null; + lastCheckpoint = nextCheckpoint; if (!lastCheckpoint) return; try { await client.stream({ @@ -236,9 +230,9 @@ async function executeSyncRun( console.error('[executor] Checkpoint flush failed:', err); } }, - onContentChunk: async (items) => { - for (const item of items) { - const contentItem = await processContent(item, cfg.generateEmbeddings); + onEventChunk: async (events) => { + for (const event of events) { + const contentItem = await processEvent(event, cfg.generateEmbeddings); batch.push(contentItem); itemsCollectedSoFar++; @@ -250,7 +244,10 @@ async function executeSyncRun( }, }); - lastCheckpoint = result.checkpoint as unknown as Record | null; + if (result.mode !== 'sync') { + throw new Error(`Expected sync result, got mode=${result.mode}`); + } + lastCheckpoint = result.checkpoint; await flushBatch(); @@ -369,20 +366,23 @@ async function executeActionRun( try { const result = await executeCompiledConnector({ - mode: 'action', compiledCode: compiled_code, - actionKey: action_key, - actionInput: (action_input ?? {}) as Record, - env, - connectionCredentials: ((job.connection_credentials as Record) ?? - null) as Record | null, - credentials, - apiType: 'api', executor: subprocessExecutor, + job: { + mode: 'action', + actionKey: action_key, + actionInput: (action_input ?? {}) as Record, + config: mergeEnv(env, job.connection_credentials, null), + env, + sessionState: null, + credentials: credentials ?? null, + }, }); - // For actions, the "contents" array may contain a single result envelope - const actionOutput = getActionOutput(result); + if (result.mode !== 'action') { + throw new Error(`Expected action result, got mode=${result.mode}`); + } + const actionOutput = result.output; await client.completeAction({ run_id, @@ -459,14 +459,15 @@ async function executeAuthRun( try { const result = await executeCompiledConnector({ - mode: 'authenticate', compiledCode: compiled_code, - previousCredentials: previous_credentials ?? null, - env, executor: subprocessExecutor, - apiType: 'api', + job: { + mode: 'authenticate', + config: {}, + previousCredentials: previous_credentials ?? null, + env, + }, hooks: { - collectContents: false, onAuthArtifact: async (artifact) => { try { await client.emitAuthArtifact({ @@ -498,7 +499,7 @@ async function executeAuthRun( clearInterval(heartbeatInterval); - if (!result.auth_result?.credentials) { + if (result.mode !== 'authenticate' || !result.auth?.credentials) { await client.completeAuth({ run_id, worker_id: client.id, @@ -512,8 +513,8 @@ async function executeAuthRun( run_id, worker_id: client.id, status: 'success', - credentials: result.auth_result.credentials, - metadata: result.auth_result.metadata, + credentials: result.auth.credentials, + metadata: result.auth.metadata, }); console.error(`[executor] Auth run ${run_id} completed`); @@ -675,30 +676,53 @@ async function executeEmbedBackfillRun( } /** - * Process a content item - convert to ContentItem and optionally generate embedding + * Merge the run-level env, the per-connection stored credentials, and the + * per-feed config into the single `config` object that the connector's + * `sync()` / `execute()` sees. Connection credentials override env (per-conn + * trumps fleet-wide); feed config wins last (per-feed trumps connection). */ -async function processContent(item: Content, generateEmbeddings: boolean): Promise { - const normalized = normalizeEventEnvelope(item as Record); +function mergeEnv( + env: Env, + connectionCredentials: Record | undefined | null, + feedConfig: Record | undefined | null +): Record { + return { + ...(env as unknown as Record), + ...((connectionCredentials ?? {}) as Record), + ...((feedConfig ?? {}) as Record), + }; +} + +/** + * Convert a V1 EventEnvelope (the SDK's standard sync output) into the + * gateway-bound ContentItem shape, optionally generating an embedding. + */ +async function processEvent( + event: EventEnvelope, + generateEmbeddings: boolean +): Promise { + const occurredAtIso = + event.occurred_at instanceof Date + ? event.occurred_at.toISOString() + : (event.occurred_at as unknown as string); + const contentItem: ContentItem = { - id: normalized.origin_id, - title: normalized.title, - payload_text: normalized.payload_text, - author_name: normalized.author_name, - occurred_at: - normalized.occurred_at instanceof Date - ? normalized.occurred_at.toISOString() - : (normalized.occurred_at as unknown as string), - source_url: normalized.source_url, - score: normalized.score, - metadata: normalized.metadata, - origin_parent_id: normalized.origin_parent_id || undefined, - origin_type: normalized.origin_type, - semantic_type: normalized.semantic_type, + id: event.origin_id, + title: event.title, + payload_text: event.payload_text, + author_name: event.author_name, + occurred_at: occurredAtIso, + source_url: event.source_url ?? undefined, + score: typeof event.score === 'number' ? event.score : 0, + metadata: event.metadata ?? {}, + origin_parent_id: event.origin_parent_id ?? undefined, + origin_type: event.origin_type, + semantic_type: event.semantic_type ?? event.origin_type, }; if (generateEmbeddings) { try { - const textForEmbedding = [normalized.title, normalized.payload_text] + const textForEmbedding = [event.title, event.payload_text] .filter(Boolean) .join(' ') .trim(); @@ -706,7 +730,7 @@ async function processContent(item: Content, generateEmbeddings: boolean): Promi contentItem.embedding = await generateEmbedding(textForEmbedding); } } catch (err) { - console.error(`[executor] Embedding generation failed for ${normalized.origin_id}:`, err); + console.error(`[executor] Embedding generation failed for ${event.origin_id}:`, err); } } diff --git a/packages/connector-worker/src/executor/child-runner.ts b/packages/connector-worker/src/executor/child-runner.ts index 7268ec966..6ab5514d6 100644 --- a/packages/connector-worker/src/executor/child-runner.ts +++ b/packages/connector-worker/src/executor/child-runner.ts @@ -1,40 +1,23 @@ /** * Child Runner - Entry point for forked subprocess * - * Receives compiled connector code + context via IPC, - * dynamically imports the ConnectorRuntime class, and executes sync()/execute(). - * Streams normalized content chunks and checkpoint updates back via IPC. + * Receives compiled connector code + an ExecutorJob via IPC, dynamically + * imports the ConnectorRuntime class, and dispatches to sync() / execute() / + * authenticate() using the V1 SDK shapes directly — no magic-key adapter. */ import { randomBytes } from 'node:crypto'; import { rm, writeFile } from 'node:fs/promises'; import { join } from 'node:path'; import { pathToFileURL } from 'node:url'; -import type { SyncResult } from '@lobu/connector-sdk'; -import type { FeedSyncResult } from './interface.js'; -import { normalizeEventEnvelope } from './runtime.js'; +import type { EventEnvelope, SyncResult } from '@lobu/connector-sdk'; +import type { ExecutorJob, ExecutorResult } from './interface.js'; -const CONTENT_CHUNK_SIZE = 100; +const EVENT_CHUNK_SIZE = 100; interface ChildMessage { compiledCode: string; - context: { - options: Record; - checkpoint: any; - env: Record; - sessionState?: Record | null; - apiType: 'api' | 'browser'; - }; -} - -function stripInternalOptions(options: Record): Record { - const result: Record = {}; - for (const [key, value] of Object.entries(options)) { - if (!key.startsWith('__')) { - result[key] = value; - } - } - return result; + job: ExecutorJob; } function findRuntimeClass(mod: Record) { @@ -101,23 +84,14 @@ function awaitAuthSignal( }); } -async function executeConnectorRuntime(instance: any, context: ChildMessage['context']) { - const isAction = typeof context.options?.__action_key === 'string'; - const isAuth = context.options?.__auth_mode === true; - const credentials = context.sessionState?.oauth ?? null; - const publicConfig = stripInternalOptions(context.options ?? {}); - const runtimeConfig = { - ...(context.env ?? {}), - ...publicConfig, - }; - - if (isAuth) { +async function executeConnectorRuntime( + instance: any, + job: ExecutorJob +): Promise { + if (job.mode === 'authenticate') { const authResult = await instance.authenticate({ - config: (context.options?.__auth_config ?? {}) as Record, - previousCredentials: (context.options?.__auth_previous_credentials ?? null) as Record< - string, - unknown - > | null, + config: job.config, + previousCredentials: job.previousCredentials, emit: async (artifact: Record) => { await sendIPC({ type: 'auth_artifact', artifact }); }, @@ -129,57 +103,33 @@ async function executeConnectorRuntime(instance: any, context: ChildMessage['con throw new Error('authenticate() returned no credentials'); } - const result: FeedSyncResult = { - contents: [], - checkpoint: null, - auth_result: { - credentials: authResult.credentials, - metadata: authResult.metadata, - }, + return { + mode: 'authenticate', + auth: { credentials: authResult.credentials, metadata: authResult.metadata }, }; - return result; } - if (isAction) { - const actionKey = context.options.__action_key; - const actionInput = (context.options.__action_input ?? {}) as Record; + if (job.mode === 'action') { const actionResult = await instance.execute({ - actionKey, - input: actionInput, - credentials, - config: runtimeConfig, + actionKey: job.actionKey, + input: job.actionInput, + credentials: job.credentials, + config: { ...job.env, ...job.config }, }); if (!actionResult?.success) { - throw new Error(actionResult?.error || `Action '${actionKey}' failed`); + throw new Error(actionResult?.error || `Action '${job.actionKey}' failed`); } - const result: FeedSyncResult = { - contents: [ - { - origin_id: `action-${actionKey}-${Date.now()}`, - payload_text: '', - source_url: '', - occurred_at: new Date(), - score: 0, - metadata: actionResult.output ?? {}, - }, - ], - checkpoint: context.checkpoint ?? null, - metadata: { - items_found: 0, - items_skipped: 0, - }, - }; - return result; + return { mode: 'action', output: actionResult.output ?? {} }; } - const emitEvents = async (events: unknown[]) => { - const normalized = events.map((event: any) => normalizeEventEnvelope(event)); - for (let index = 0; index < normalized.length; index += CONTENT_CHUNK_SIZE) { + // mode === 'sync' + const emitEvents = async (events: EventEnvelope[]) => { + for (let index = 0; index < events.length; index += EVENT_CHUNK_SIZE) { await sendIPC({ - type: 'content_chunk', - items: normalized.slice(index, index + CONTENT_CHUNK_SIZE), + type: 'event_chunk', + events: events.slice(index, index + EVENT_CHUNK_SIZE), }); } }; @@ -189,27 +139,33 @@ async function executeConnectorRuntime(instance: any, context: ChildMessage['con }; const syncResult = (await instance.sync({ - feedKey: context.options?.__feed_key, - config: runtimeConfig, - checkpoint: context.checkpoint ?? null, - credentials, - entityIds: (context.options?.__entity_ids as number[] | undefined) ?? [], - sessionState: context.sessionState ?? null, + feedKey: job.feedKey, + config: { ...job.env, ...job.config }, + checkpoint: job.checkpoint, + credentials: job.credentials, + entityIds: job.entityIds, + sessionState: job.sessionState, emitEvents, updateCheckpoint, })) as SyncResult; - const events = Array.isArray(syncResult?.events) ? syncResult.events : []; - await emitEvents(events); - const result: FeedSyncResult = { - contents: [], - checkpoint: (syncResult?.checkpoint ?? null) as any, - auth_update: syncResult?.auth_update ?? undefined, + // Sync is streaming-only on the executor boundary: connectors that build + // the full list before returning still arrive here as `syncResult.events`, + // we just forward them through the same `emitEvents` IPC channel so the + // parent sees one uniform stream regardless of whether the connector + // streamed incrementally or returned in one shot. + const trailingEvents = Array.isArray(syncResult?.events) ? syncResult.events : []; + await emitEvents(trailingEvents); + + return { + mode: 'sync', + checkpoint: (syncResult?.checkpoint ?? null) as Record | null, + auth_update: syncResult?.auth_update ?? null, metadata: { items_found: typeof syncResult?.metadata?.items_found === 'number' ? syncResult.metadata.items_found - : events.length, + : trailingEvents.length, items_skipped: typeof syncResult?.metadata?.items_skipped === 'number' ? syncResult.metadata.items_skipped @@ -217,7 +173,6 @@ async function executeConnectorRuntime(instance: any, context: ChildMessage['con ...(syncResult?.metadata ?? {}), }, }; - return result; } /** Send an IPC message and wait for it to be flushed to the parent. */ @@ -330,7 +285,7 @@ async function main() { ); try { - const { compiledCode, context } = msg as ChildMessage; + const { compiledCode, job } = msg as ChildMessage; // Write compiled code to temp file for dynamic import. // - `flag: 'wx'` fails if the file already exists (no symlink follow, @@ -352,7 +307,7 @@ async function main() { } const instance = new (RuntimeClass as new () => any)(); - const result = await executeConnectorRuntime(instance, context); + const result = await executeConnectorRuntime(instance, job); // Send result back to parent (wait for IPC flush before exiting) await sendIPC({ type: 'result', result }); diff --git a/packages/connector-worker/src/executor/interface.ts b/packages/connector-worker/src/executor/interface.ts index 7f8fb1187..771155bb5 100644 --- a/packages/connector-worker/src/executor/interface.ts +++ b/packages/connector-worker/src/executor/interface.ts @@ -1,39 +1,68 @@ -import type { Checkpoint, Content, FeedOptions, SessionState } from '@lobu/connector-sdk'; +import type { AuthResult, EventEnvelope, SyncCredentials } from '@lobu/connector-sdk'; /** - * Result shape returned by the subprocess executor. - * Mirrors the legacy SyncResult from the SDK (contents + checkpoint). + * Executor mode discriminator. The executor speaks the same V1 SDK shapes + * the connector code expects: `SyncContext` / `ActionContext` / `AuthContext` + * in, `SyncResult` / `ActionResult` / `AuthResult` out, no envelope. */ -export interface FeedSyncResult { - contents: Content[]; - checkpoint: Checkpoint | null; - metadata?: Record; - auth_update?: Record; - /** Set when the subprocess completed an authenticate() run. */ - auth_result?: { - credentials: Record; - metadata?: Record; - }; -} +export type ExecutorJob = + | { + mode: 'sync'; + feedKey?: string | null; + config: Record; + checkpoint: Record | null; + entityIds: number[]; + credentials: SyncCredentials | null; + sessionState: Record | null; + env: Record; + } + | { + mode: 'action'; + actionKey: string; + actionInput: Record; + config: Record; + credentials: SyncCredentials | null; + sessionState: Record | null; + env: Record; + } + | { + mode: 'authenticate'; + config: Record; + previousCredentials: Record | null; + env: Record; + }; /** - * Context passed to the executor for a single sync job + * Result shape returned by the executor. One discriminated union per mode + * mirrors the SDK's `ActionResult` / `AuthResult` directly. Sync is + * streaming-only: events leave via `hooks.onEventChunk`, never collected + * onto the result — callers that need a list build it themselves in the + * hook (see e.g. `packages/cli/src/commands/_lib/connector-run-cmd.ts`). */ -export interface SyncContext { - options: FeedOptions; - checkpoint: Checkpoint | null; - env: Record; - sessionState?: SessionState | null; - apiType: 'api' | 'browser'; -} +export type ExecutorResult = + | { + mode: 'sync'; + checkpoint: Record | null; + auth_update?: Record | null; + metadata?: Record; + } + | { + mode: 'action'; + output: Record; + } + | { + mode: 'authenticate'; + auth: AuthResult; + }; export interface ExecutionHooks { - onContentChunk?: (items: Content[]) => Promise | void; - onCheckpointUpdate?: (checkpoint: Checkpoint | null) => Promise | void; - collectContents?: boolean; - /** Auth runs: connector emits an artifact (QR/redirect/prompt/status). */ + /** Sync runs: connector streamed a chunk of events (and we should persist them). */ + onEventChunk?: (events: EventEnvelope[]) => Promise | void; + /** Sync runs: connector pushed an incremental checkpoint update. */ + onCheckpointUpdate?: (checkpoint: Record | null) => Promise | void; + /** Auth runs: connector emitted an artifact (QR/redirect/prompt/status). */ onAuthArtifact?: (artifact: Record) => Promise | void; - /** Auth runs: connector pauses until a named signal arrives. */ + /** Auth runs: connector paused until a named signal arrives. */ onAwaitAuthSignal?: ( name: string, options?: { timeoutMs?: number } @@ -41,13 +70,13 @@ export interface ExecutionHooks { } /** - * Pluggable executor interface - * Allows swapping between subprocess execution, direct execution, etc. + * Pluggable executor interface. The only implementation today is + * `SubprocessExecutor`; the seam stays around so tests can stub it. */ export interface SyncExecutor { execute( compiledCode: string, - context: SyncContext, + job: ExecutorJob, hooks?: ExecutionHooks - ): Promise; + ): Promise; } diff --git a/packages/connector-worker/src/executor/runtime.ts b/packages/connector-worker/src/executor/runtime.ts index 541b21b2e..27f2c9d3b 100644 --- a/packages/connector-worker/src/executor/runtime.ts +++ b/packages/connector-worker/src/executor/runtime.ts @@ -1,147 +1,17 @@ -import type { Checkpoint, Content, FeedOptions, SessionState } from '@lobu/connector-sdk'; -import type { ExecutionHooks, FeedSyncResult, SyncContext, SyncExecutor } from './interface.js'; +import type { ExecutionHooks, ExecutorJob, ExecutorResult, SyncExecutor } from './interface.js'; import { SubprocessExecutor } from './subprocess.js'; -interface ConnectorOAuthCredentials { - provider: string; - accessToken: string; - refreshToken?: string | null; - expiresAt?: string | null; - scope?: string | null; -} - -interface BaseExecutionParams { +/** + * Top-level entry point used by the daemon executor. Just delegates to a + * `SyncExecutor` implementation (defaults to `SubprocessExecutor`) with the + * V1 SDK shapes — no more magic-key adapter layer in between. + */ +export async function executeCompiledConnector(params: { compiledCode: string; - env?: Record; - connectionCredentials?: Record | null; - sessionState?: SessionState | null; - credentials?: ConnectorOAuthCredentials | null; - apiType?: 'api' | 'browser'; + job: ExecutorJob; executor?: SyncExecutor; hooks?: ExecutionHooks; -} - -interface SyncConnectorExecutionParams extends BaseExecutionParams { - mode: 'sync'; - config?: Record | null; - checkpoint?: Checkpoint | null; - feedKey?: string | null; - entityIds?: number[] | null; -} - -interface ActionConnectorExecutionParams extends BaseExecutionParams { - mode: 'action'; - actionKey: string; - actionInput?: Record | null; - checkpoint?: Checkpoint | null; -} - -interface AuthConnectorExecutionParams extends BaseExecutionParams { - mode: 'authenticate'; - /** Connector-specific auth input (rare). */ - config?: Record | null; - /** Existing credentials for re-auth flows. */ - previousCredentials?: Record | null; -} - -type ConnectorExecutionParams = - | SyncConnectorExecutionParams - | ActionConnectorExecutionParams - | AuthConnectorExecutionParams; - -function mergeExecutionEnv( - env?: Record, - connectionCredentials?: Record | null -): Record { - return { - ...(env ?? {}), - ...(connectionCredentials ?? {}), - }; -} - -function mergeExecutionSessionState( - sessionState?: SessionState | null, - credentials?: ConnectorOAuthCredentials | null -): SessionState | null { - if (!sessionState && !credentials) return null; - - return { - ...(sessionState ?? {}), - ...(credentials ? { oauth: credentials } : {}), - }; -} - -function buildConnectorExecutionContext(params: ConnectorExecutionParams): SyncContext { - const env = mergeExecutionEnv(params.env, params.connectionCredentials); - const sessionState = mergeExecutionSessionState(params.sessionState, params.credentials); - - if (params.mode === 'action') { - return { - options: { - __action_key: params.actionKey, - __action_input: params.actionInput ?? {}, - ...((params.actionInput ?? {}) as Record), - } as FeedOptions, - checkpoint: (params.checkpoint ?? null) as Checkpoint | null, - env, - sessionState, - apiType: params.apiType ?? 'api', - }; - } - - if (params.mode === 'authenticate') { - return { - options: { - __auth_mode: true, - __auth_config: params.config ?? {}, - __auth_previous_credentials: params.previousCredentials ?? null, - } as FeedOptions, - checkpoint: null, - env, - sessionState, - apiType: params.apiType ?? 'api', - }; - } - - return { - options: { - ...(params.config ?? {}), - __feed_key: params.feedKey, - __entity_ids: params.entityIds ?? [], - } as FeedOptions, - checkpoint: (params.checkpoint ?? null) as Checkpoint | null, - env, - sessionState, - apiType: params.apiType ?? 'api', - }; -} - -export async function executeCompiledConnector( - params: ConnectorExecutionParams -): Promise { +}): Promise { const executor = params.executor ?? new SubprocessExecutor(); - const context = buildConnectorExecutionContext(params); - return executor.execute(params.compiledCode, context, params.hooks); -} - -export function normalizeEventEnvelope(event: Record): Content { - const originType = event.origin_type; - const rawDate = event.occurred_at ?? event.published_at; - return { - origin_id: event.origin_id ?? event.external_id, - payload_text: event.payload_text ?? event.content ?? '', - title: event.title, - author_name: event.author_name ?? event.author, - source_url: event.source_url ?? event.url ?? '', - occurred_at: rawDate ? new Date(rawDate) : new Date(), - origin_type: originType, - semantic_type: event.semantic_type ?? originType, - score: typeof event.score === 'number' ? event.score : 0, - origin_parent_id: event.origin_parent_id ?? event.parent_external_id ?? null, - metadata: event.metadata ?? {}, - }; -} - -export function getActionOutput(result: FeedSyncResult): Record { - return (result.contents[0]?.metadata ?? {}) as Record; + return executor.execute(params.compiledCode, params.job, params.hooks); } diff --git a/packages/connector-worker/src/executor/subprocess.ts b/packages/connector-worker/src/executor/subprocess.ts index 36ddf098c..cb6ec225a 100644 --- a/packages/connector-worker/src/executor/subprocess.ts +++ b/packages/connector-worker/src/executor/subprocess.ts @@ -11,7 +11,8 @@ import { existsSync } from 'node:fs'; import { createRequire } from 'node:module'; import { dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; -import type { ExecutionHooks, FeedSyncResult, SyncContext, SyncExecutor } from './interface.js'; +import type { EventEnvelope } from '@lobu/connector-sdk'; +import type { ExecutionHooks, ExecutorJob, ExecutorResult, SyncExecutor } from './interface.js'; import { StreamRedactor, redactOutput } from './redact.js'; /** @@ -115,6 +116,10 @@ const DEFAULT_OPTIONS: SubprocessExecutorOptions = { maxOldSpaceSize: 512, }; +function jobEnv(job: ExecutorJob): Record { + return job.env; +} + export class SubprocessExecutor implements SyncExecutor { private options: SubprocessExecutorOptions; @@ -124,10 +129,10 @@ export class SubprocessExecutor implements SyncExecutor { async execute( compiledCode: string, - context: SyncContext, + job: ExecutorJob, hooks?: ExecutionHooks - ): Promise { - return new Promise((resolve, reject) => { + ): Promise { + return new Promise((resolve, reject) => { let childRunnerPath = join(__dirname, 'child-runner.js'); const childRunnerTsPath = join(__dirname, 'child-runner.ts'); @@ -163,18 +168,14 @@ export class SubprocessExecutor implements SyncExecutor { const child = fork(childRunnerPath, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'], execArgv, - env: { ...pickSystemEnv(), ...context.env } as NodeJS.ProcessEnv, + env: { ...pickSystemEnv(), ...jobEnv(job) } as NodeJS.ProcessEnv, }); let resolved = false; let terminalMessageReceived = false; let timedOut = false; - let latestCheckpoint = context.checkpoint; - let finalMetadata: Record | undefined; - let finalAuthUpdate: Record | undefined; - let finalAuthResult: FeedSyncResult['auth_result'] | undefined; - const collectedContents: FeedSyncResult['contents'] = []; - const collectContents = hooks?.collectContents !== false; + let latestCheckpoint = + job.mode === 'sync' ? job.checkpoint : null; let processingChain = Promise.resolve(); // Per-stream ring buffers — preserve the *tail* (most recent bytes), @@ -250,13 +251,10 @@ export class SubprocessExecutor implements SyncExecutor { // so validate shape at this trust boundary before dereferencing fields. const onMessage = (msg: any) => { if (!msg || typeof msg !== 'object' || typeof msg.type !== 'string') return; - if (msg.type === 'content_chunk') { - const items = Array.isArray(msg.items) ? msg.items : []; + if (msg.type === 'event_chunk') { + const events: EventEnvelope[] = Array.isArray(msg.events) ? msg.events : []; queueTask(async () => { - if (collectContents) { - collectedContents.push(...items); - } - await hooks?.onContentChunk?.(items); + await hooks?.onEventChunk?.(events); }); return; } @@ -319,29 +317,15 @@ export class SubprocessExecutor implements SyncExecutor { if (msg.type === 'result') { terminalMessageReceived = true; - const result = msg.result as FeedSyncResult; + const result = msg.result as ExecutorResult; queueTask(async () => { - finalMetadata = result.metadata; - finalAuthUpdate = result.auth_update; - finalAuthResult = result.auth_result; - latestCheckpoint = result.checkpoint; - - if (Array.isArray(result.contents) && result.contents.length > 0) { - if (collectContents) { - collectedContents.push(...result.contents); - } - await hooks?.onContentChunk?.(result.contents); + // For sync results, surface the trailing checkpoint to callers + // through the result; in-flight `checkpoint_update` messages have + // already been forwarded via the hook. + if (result.mode === 'sync') { + latestCheckpoint = result.checkpoint; } - - settle(() => - resolve({ - contents: collectedContents, - checkpoint: latestCheckpoint, - metadata: finalMetadata, - auth_update: finalAuthUpdate, - auth_result: finalAuthResult, - }) - ); + settle(() => resolve(result)); }); return; } @@ -440,20 +424,19 @@ export class SubprocessExecutor implements SyncExecutor { child.stdout?.on('data', onStdout); child.stderr?.on('data', onStderr); - // Send the compiled code and context to the child. Use the callback - // form so a failed send (e.g. child died before IPC handshake, or fork - // resolved to a non-existent file) rejects the executor promise + // Hint to keep linter quiet when latestCheckpoint isn't consumed in + // non-sync modes; it's the parent-side mirror of the sync checkpoint + // stream and only relevant when hooks.onCheckpointUpdate is wired. + void latestCheckpoint; + + // Send the compiled code and job descriptor to the child. Use the + // callback form so a failed send (e.g. child died before IPC handshake, + // or fork resolved to a non-existent file) rejects the executor promise // instead of going unhandled on the IPC channel. child.send( { compiledCode, - context: { - options: context.options, - checkpoint: context.checkpoint, - env: context.env, - sessionState: context.sessionState, - apiType: context.apiType, - }, + job, }, (err) => { if (err) { diff --git a/packages/connectors/src/README.md b/packages/connectors/src/README.md index dbf3584a7..a4da2ae8b 100644 --- a/packages/connectors/src/README.md +++ b/packages/connectors/src/README.md @@ -10,8 +10,6 @@ import { ConnectorRuntime, type SyncContext, type SyncResult, - type ActionContext, - type ActionResult, type EventEnvelope, } from '@lobu/connector-sdk'; @@ -63,10 +61,6 @@ export default class MyConnector extends ConnectorRuntime { metadata: { items_found: events.length }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } ``` @@ -350,13 +344,10 @@ async execute(ctx: ActionContext): Promise { } ``` -If your connector doesn't support actions: - -```typescript -async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; -} -``` +If your connector doesn't support actions, do nothing — the base +`ConnectorRuntime` class ships a default `execute()` that returns +`{ success: false, error: 'Actions not supported' }`. Omit the method +entirely. ## Options Schema @@ -502,12 +493,12 @@ Connectors can also be installed manually via `client.connections.installConnect ### How connector code runs -1. The server checks `connector_versions.compiled_code` — if present, uses it directly -2. If `compiled_code` is NULL (bundled connectors), it compiles from `connectors/{source_path}` on disk via esbuild -3. The compiled code is sent to the worker, written to a temp file (`.connector-child-{pid}.mjs`), and loaded via dynamic `import()` -4. Each sync/action runs in an **isolated child process** with a 10-minute timeout and 512MB memory limit -5. The child process has a restricted environment — only `PATH`, `HOME`, `TMPDIR`, `TZ`, `NODE_ENV`, `NODE_PATH`, and `PLAYWRIGHT_BROWSERS_PATH` are available as env vars -6. Secrets flow through `ctx.credentials` and `ctx.config`, not environment variables +1. For fleet workers and embedded-mode hosts (worker + gateway share a host), the gateway sends only `connector_key` in the worker-poll response — both pods have the `.ts` source on disk, and the worker compiles locally via the shared pipeline at `@lobu/connector-worker/compile`. For DB-only / device workers without source on disk, the gateway sends `compiled_code` inline. +2. The compiled bundle is written to a temp file (`.connector-child-{pid}-{rand}.mjs`) under cwd and loaded via dynamic `import()` inside a forked child process. +3. The parent and child speak `ExecutorJob` / `ExecutorResult` over IPC — the same V1 SDK shapes (`SyncContext` / `ActionContext` / `AuthContext` in, `SyncResult` / `ActionResult` / `AuthResult` out, no envelope). Sync events stream via `event_chunk` IPC messages as the connector emits them. +4. Each sync/action runs in an **isolated child process** with a 10-minute timeout and 512MB memory limit. +5. The child process has a restricted environment — only `PATH`, `HOME`, `TMPDIR`, `TZ`, `NODE_ENV`, `NODE_PATH`, and `PLAYWRIGHT_BROWSERS_PATH` are available as env vars. +6. Secrets flow through `ctx.credentials` and `ctx.config`, not environment variables. This means edits to `.ts` files in `connectors/` take effect on the next sync without reinstalling. diff --git a/packages/connectors/src/capterra.ts b/packages/connectors/src/capterra.ts index 8ad4be954..7cc0bcd00 100644 --- a/packages/connectors/src/capterra.ts +++ b/packages/connectors/src/capterra.ts @@ -4,8 +4,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -270,8 +268,4 @@ export default class CapterraConnector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/g2.ts b/packages/connectors/src/g2.ts index c74351c22..e596e8277 100644 --- a/packages/connectors/src/g2.ts +++ b/packages/connectors/src/g2.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -283,8 +281,4 @@ export default class G2Connector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/glassdoor.ts b/packages/connectors/src/glassdoor.ts index 527258181..396c778c7 100644 --- a/packages/connectors/src/glassdoor.ts +++ b/packages/connectors/src/glassdoor.ts @@ -6,8 +6,6 @@ import { createHash } from 'node:crypto'; import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -288,8 +286,4 @@ export default class GlassdoorConnector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/gmaps.ts b/packages/connectors/src/gmaps.ts index 8b7333438..1d90ee6fa 100644 --- a/packages/connectors/src/gmaps.ts +++ b/packages/connectors/src/gmaps.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -190,8 +188,4 @@ export default class GoogleMapsConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/google_play.ts b/packages/connectors/src/google_play.ts index 0f5db5083..516dde71e 100644 --- a/packages/connectors/src/google_play.ts +++ b/packages/connectors/src/google_play.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -362,8 +360,4 @@ export default class GooglePlayConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/hackernews.ts b/packages/connectors/src/hackernews.ts index 0d1873c27..04279291b 100644 --- a/packages/connectors/src/hackernews.ts +++ b/packages/connectors/src/hackernews.ts @@ -7,8 +7,6 @@ import TurndownService from 'turndown'; import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -325,11 +323,6 @@ export default class HackerNewsConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Transform helpers // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/ios_appstore.ts b/packages/connectors/src/ios_appstore.ts index dd08b13f5..6036ad5dd 100644 --- a/packages/connectors/src/ios_appstore.ts +++ b/packages/connectors/src/ios_appstore.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -219,8 +217,4 @@ export default class IOSAppStoreConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/linkedin.ts b/packages/connectors/src/linkedin.ts index a29fb3577..e7969dc65 100644 --- a/packages/connectors/src/linkedin.ts +++ b/packages/connectors/src/linkedin.ts @@ -9,8 +9,6 @@ */ import { - type ActionContext, - type ActionResult, browserNetworkSync, type ConnectorDefinition, ConnectorRuntime, @@ -487,8 +485,4 @@ export default class LinkedInConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/microsoft_outlook.ts b/packages/connectors/src/microsoft_outlook.ts index 432bc82e6..8fa5316f1 100644 --- a/packages/connectors/src/microsoft_outlook.ts +++ b/packages/connectors/src/microsoft_outlook.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -244,11 +242,6 @@ export default class MicrosoftOutlookConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Feed: messages // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/producthunt.ts b/packages/connectors/src/producthunt.ts index 66bb8c19b..356a66da0 100644 --- a/packages/connectors/src/producthunt.ts +++ b/packages/connectors/src/producthunt.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -396,11 +394,6 @@ export default class ProductHuntConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Transform helpers // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/reddit.ts b/packages/connectors/src/reddit.ts index 039d50b3c..3cf3ef159 100644 --- a/packages/connectors/src/reddit.ts +++ b/packages/connectors/src/reddit.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -393,11 +391,6 @@ export default class RedditConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // App-only OAuth // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/revolut.ts b/packages/connectors/src/revolut.ts index d14e43dcc..b0542fec6 100644 --- a/packages/connectors/src/revolut.ts +++ b/packages/connectors/src/revolut.ts @@ -19,8 +19,6 @@ */ import { - type ActionContext, - type ActionResult, browserNetworkSync, type ConnectorDefinition, ConnectorRuntime, @@ -562,8 +560,4 @@ export default class RevolutConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: "Actions not supported" }; - } } diff --git a/packages/connectors/src/rss.ts b/packages/connectors/src/rss.ts index 7a2d8f4a3..eebf654d8 100644 --- a/packages/connectors/src/rss.ts +++ b/packages/connectors/src/rss.ts @@ -7,8 +7,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -202,11 +200,6 @@ export default class RSSConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Feed fetching & parsing // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/spotify.ts b/packages/connectors/src/spotify.ts index e46e0467f..7c668d54e 100644 --- a/packages/connectors/src/spotify.ts +++ b/packages/connectors/src/spotify.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -303,11 +301,6 @@ export default class SpotifyConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Feed: saved_tracks // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/trustpilot.ts b/packages/connectors/src/trustpilot.ts index 1f95a16a1..9797b1852 100644 --- a/packages/connectors/src/trustpilot.ts +++ b/packages/connectors/src/trustpilot.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -207,8 +205,4 @@ export default class TrustpilotConnector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/website.ts b/packages/connectors/src/website.ts index f5442137c..106a0682c 100644 --- a/packages/connectors/src/website.ts +++ b/packages/connectors/src/website.ts @@ -10,8 +10,6 @@ import { createHash } from 'node:crypto'; import TurndownService from 'turndown'; import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -290,11 +288,6 @@ export default class WebsiteConnector extends ConnectorRuntime { metadata: { pages_scraped: urls.length, events_created: events.length }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - private async dismissOverlays(page: Page): Promise { const dismissLabels = [ 'Accept', diff --git a/packages/connectors/src/whatsapp.ts b/packages/connectors/src/whatsapp.ts index 28e1cf897..8b3817a00 100644 --- a/packages/connectors/src/whatsapp.ts +++ b/packages/connectors/src/whatsapp.ts @@ -11,8 +11,6 @@ */ import { - type ActionContext, - type ActionResult, type AuthContext, type AuthResult, type ConnectorDefinition, @@ -428,10 +426,6 @@ export default class WhatsAppConnector extends ConnectorRuntime { throw error; } } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported in v1.' }; - } } // --------------------------------------------------------------------------- diff --git a/packages/connectors/src/x.ts b/packages/connectors/src/x.ts index 30f1103b2..c1decc0b7 100644 --- a/packages/connectors/src/x.ts +++ b/packages/connectors/src/x.ts @@ -7,8 +7,6 @@ */ import { - type ActionContext, - type ActionResult, browserNetworkSync, type ConnectorDefinition, ConnectorRuntime, @@ -529,8 +527,4 @@ export default class XConnector extends ConnectorRuntime { return syncViaBrowser(ctx, config, checkpoint); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/youtube.ts b/packages/connectors/src/youtube.ts index c9bbfe197..b6b317406 100644 --- a/packages/connectors/src/youtube.ts +++ b/packages/connectors/src/youtube.ts @@ -7,8 +7,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -425,11 +423,6 @@ export default class YouTubeConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // YouTube API helpers // ------------------------------------------------------------------------- diff --git a/packages/server/package.json b/packages/server/package.json index 03251c6a6..c12ad5fe5 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -37,6 +37,7 @@ "@hono/zod-openapi": "^1.2.1", "@lobu/core": "workspace:*", "@lobu/connector-sdk": "workspace:*", + "@lobu/connector-worker": "workspace:*", "@mariozechner/pi-ai": "^0.51.6", "@modelcontextprotocol/sdk": "^1.27.1", "@polyglot-sql/sdk": "^0.1.13", diff --git a/packages/server/src/lib/feed-sync.ts b/packages/server/src/lib/feed-sync.ts index c6335f47f..3bee3a5ac 100644 --- a/packages/server/src/lib/feed-sync.ts +++ b/packages/server/src/lib/feed-sync.ts @@ -4,7 +4,7 @@ * Extracted from scripts/sync-local.ts for programmatic reuse. */ -import { executeCompiledConnector } from '../../../connector-worker/src/executor/runtime'; +import { executeCompiledConnector } from '@lobu/connector-worker/executor/runtime'; import { getDb, parsePgNumberArray } from '../db/client'; import { resolveConnectorCode } from '../utils/ensure-connector-installed'; import { mergeExecutionConfig, resolveExecutionAuth } from '../utils/execution-context'; @@ -112,20 +112,33 @@ export async function runFeed(feed: FeedRecord): Promise<{ itemCount: number }> logContext: { feedId: feed.id }, logMessage: 'Failed to resolve feed credentials', }); + let itemCount = 0; const result = await executeCompiledConnector({ - mode: 'sync', compiledCode, - config: mergeExecutionConfig(feed.connection_config, feed.config), - checkpoint: feed.checkpoint as any, - env: process.env as Record, - connectionCredentials, - sessionState, - credentials, - feedKey: feed.feed_key, - entityIds: feed.entity_ids, - apiType: (feed.api_type as 'api' | 'browser') || 'api', + job: { + mode: 'sync', + config: mergeExecutionConfig( + feed.connection_config, + connectionCredentials, + feed.config + ), + checkpoint: feed.checkpoint, + env: process.env as Record, + sessionState, + credentials, + feedKey: feed.feed_key, + entityIds: feed.entity_ids, + }, + hooks: { + onEventChunk: async (events) => { + itemCount += events.length; + }, + }, }); - const itemCount = result.contents.length; + + if (result.mode !== 'sync') { + throw new Error(`Expected sync result, got mode=${result.mode}`); + } logger.info({ feedId: feed.id, itemCount }, 'Feed sync completed'); return { itemCount }; diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index a5ef10e10..5c92caba2 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -41,7 +41,7 @@ import { import { startStaleRunReaper } from './scheduled/check-stalled-executions'; import { bootTaskScheduler } from './scheduled/jobs'; import * as Sentry from '@sentry/node'; -import { assertExternalDepsResolvable } from '../../connector-worker/src/runtime-deps'; +import { assertExternalDepsResolvable } from '@lobu/connector-worker/compile'; import { isSentryReported, markSentryReported } from './sentry'; import { getEnvFromProcess } from './utils/env'; import logger from './utils/logger'; diff --git a/packages/server/src/tools/admin/manage_operations.ts b/packages/server/src/tools/admin/manage_operations.ts index 5927c94e9..611e5c86f 100644 --- a/packages/server/src/tools/admin/manage_operations.ts +++ b/packages/server/src/tools/admin/manage_operations.ts @@ -204,25 +204,32 @@ async function executeLocalActionInline( }); try { - const { executeCompiledConnector, getActionOutput } = await import( - '../../../../connector-worker/src/executor/runtime' + const { executeCompiledConnector } = await import( + '@lobu/connector-worker/executor/runtime' + ); + const envStrings = Object.fromEntries( + Object.entries(env).filter(([, value]) => typeof value === 'string') ); const result = await executeCompiledConnector({ - mode: 'action', compiledCode, - actionKey: - operation.backend_config.backend === 'local_action' - ? operation.backend_config.actionKey - : operation.operation_key, - actionInput, - env: Object.fromEntries(Object.entries(env).filter(([, value]) => typeof value === 'string')), - connectionCredentials, - sessionState, - credentials, - apiType: 'api', + job: { + mode: 'action', + actionKey: + operation.backend_config.backend === 'local_action' + ? operation.backend_config.actionKey + : operation.operation_key, + actionInput, + config: { ...envStrings, ...connectionCredentials }, + env: envStrings, + sessionState, + credentials, + }, }); - const actionOutput = getActionOutput(result); + if (result.mode !== 'action') { + throw new Error(`Expected action result, got mode=${result.mode}`); + } + const actionOutput = result.output; await sql`UPDATE runs SET status = 'completed', completed_at = NOW(), action_output = ${sql.json(actionOutput)} WHERE id = ${runId} AND organization_id = ${organizationId}`; return { status: 'completed', output: actionOutput }; } catch (error) { diff --git a/packages/server/src/utils/connector-catalog.ts b/packages/server/src/utils/connector-catalog.ts index 88463d0fb..1f308caae 100644 --- a/packages/server/src/utils/connector-catalog.ts +++ b/packages/server/src/utils/connector-catalog.ts @@ -1,17 +1,14 @@ import { existsSync } from 'node:fs'; -import { mkdtemp, readdir, readFile, rm, stat } from 'node:fs/promises'; -import { createRequire } from 'node:module'; -import { tmpdir } from 'node:os'; -import { extname, join, relative, resolve } from 'node:path'; +import { readdir, stat } from 'node:fs/promises'; +import { extname, relative, resolve } from 'node:path'; import { fileURLToPath, pathToFileURL } from 'node:url'; -import { build, type Plugin } from 'esbuild'; -import { EXTERNAL_RUNTIME_DEPS } from '../../../connector-worker/src/runtime-deps'; +import { + createConnectorCompiler, + findBundledConnectorFile as findInDirs, +} from '@lobu/connector-worker/compile'; import { extractConnectorMetadata } from './connector-compiler'; import logger from './logger'; -const require_ = createRequire(import.meta.url); -const SDK_ENTRY = require_.resolve('@lobu/connector-sdk'); - const DEFAULT_CONNECTOR_DIR_CANDIDATES = [ // Published CLI runtime: packages/cli/scripts/build.cjs copies bundled // connector source files next to server.bundle.mjs at dist/connectors. @@ -26,42 +23,18 @@ const DEFAULT_CONNECTOR_DIR_CANDIDATES = [ resolve(process.cwd(), 'connectors'), ]; -// Connectors declare their npm deps via `import x from 'npm:foo@1.2.3'`. This -// plugin strips the prefix so esbuild can resolve the bare package against -// node_modules. When the package isn't installed in the gateway image (e.g. -// heavyweight deps like `baileys` that only the worker pod needs), we mark -// the import as external rather than failing the whole bundle. The bundle -// still emits `import 'baileys'` and the worker — which has those packages -// installed — can run it. Without this, one un-installable npm dep takes -// down the entire connector-catalog path, breaking /api/workers/poll for -// every worker. -const npmSpecifierPlugin: Plugin = { - name: 'npm-specifier', - setup(b) { - b.onResolve({ filter: /^npm:/ }, async (args) => { - const bare = args.path - .slice(4) - .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, '$1') - .replace(/^([^/@]+)@[^/]*/, '$1'); - const resolved = await b.resolve(bare, { - resolveDir: args.resolveDir, - kind: args.kind, - }); - if (resolved.errors.length > 0) { - // Package isn't installed in this image — externalize so the bundle - // still produces. Worker runtime supplies the implementation. - // Log so an actual typo or missing-dep regression is diagnosable - // rather than silently producing a bundle that crashes at runtime. - logger.warn( - { package: bare, importer: args.importer }, - 'externalising npm:* import — package not resolvable in gateway image (worker runtime must provide it)' - ); - return { path: bare, external: true, errors: [], warnings: [] }; - } - return resolved; - }); +const connectorCompiler = createConnectorCompiler({ + onUnresolvedNpm: ({ bareSpecifier, importer }) => { + // Package isn't installed in this image — externalize so the bundle + // still produces. Worker runtime supplies the implementation. + // Log so an actual typo or missing-dep regression is diagnosable + // rather than silently producing a bundle that crashes at runtime. + logger.warn( + { package: bareSpecifier, importer }, + 'externalising npm:* import — package not resolvable in gateway image (worker runtime must provide it)' + ); }, -}; +}); type CachedMetadata = | { @@ -134,22 +107,7 @@ const bundledFileCache = new Map(); export function findBundledConnectorFile(key: string): string | null { const cached = bundledFileCache.get(key); if (cached !== undefined) return cached; - // Mirror the worker-side resolver in compile-connector.ts: try the - // subdir layout first (`browser.evaluate` → `browser/evaluate.ts`) and - // fall back to the flat underscore convention (`chrome.tabs` → - // `chrome_tabs.ts`). Keep these in sync if either side changes. - const candidates = [`${key.replace(/\./g, '/')}.ts`, `${key.replace(/\./g, '_')}.ts`]; - let found: string | null = null; - outer: for (const dir of DEFAULT_CONNECTOR_DIR_CANDIDATES) { - for (const fileName of candidates) { - const filePath = resolve(dir, fileName); - if (!filePath.startsWith(`${dir}/`)) continue; - if (existsSync(filePath)) { - found = filePath; - break outer; - } - } - } + const found = findInDirs(key, DEFAULT_CONNECTOR_DIR_CANDIDATES); bundledFileCache.set(key, found); return found; } @@ -222,81 +180,11 @@ export function getConfiguredConnectorCatalogUris(rawUris?: string): string[] { return [...normalized]; } -// Compiling a connector spawns esbuild; on the hot paths (feed sync, worker -// poll) the same bundled .ts file is recompiled repeatedly. Cache the output -// keyed by file mtime so a recompile only happens when the source actually -// changes. Process restart (= deploy, = SDK rebuild) clears it. -// -// LRU-capped: each entry is the full compiled bundle (~13 MB today, smaller -// once non-essential transitive deps are externalized — see -// EXTERNAL_RUNTIME_DEPS). Before the cap, prod was seeing the cache hold -// every connector's bundle indefinitely (~29 × 13 MB = 384 MB resident, the -// dominant heap occupant under the 1 GiB pod limit; see lobu#771 postmortem -// for the heap-snapshot trail). The cap keeps the cache to the working set -// of recently-used connectors and lets V8 reclaim the rest. -const COMPILED_FILE_CACHE_MAX = 8; -const compiledFileCache = new Map(); - -function touchCacheEntry(filePath: string, entry: { mtimeMs: number; code: string }): void { - compiledFileCache.delete(filePath); - compiledFileCache.set(filePath, entry); - while (compiledFileCache.size > COMPILED_FILE_CACHE_MAX) { - const oldest = compiledFileCache.keys().next().value; - if (oldest === undefined) break; - compiledFileCache.delete(oldest); - } -} - -export async function compileConnectorFromFile(filePath: string): Promise { - let mtimeMs: number | null = null; - try { - mtimeMs = (await stat(filePath)).mtimeMs; - const cached = compiledFileCache.get(filePath); - if (cached && cached.mtimeMs === mtimeMs) { - // Move to end of insertion order = mark as most-recently-used. - touchCacheEntry(filePath, cached); - return cached.code; - } - } catch { - // stat failed — fall through and let the build surface the real error. - } - - const tmpDir = await mkdtemp(join(tmpdir(), 'lobu-connector-')); - const outPath = join(tmpDir, 'out.mjs'); - - try { - await build({ - entryPoints: [filePath], - outfile: outPath, - bundle: true, - format: 'esm', - platform: 'node', - target: 'node20', - alias: { lobu: SDK_ENTRY, '@lobu/connector-sdk': SDK_ENTRY }, - banner: { - js: `import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);`, - }, - plugins: [npmSpecifierPlugin], - // Single source of truth: EXTERNAL_RUNTIME_DEPS in - // packages/connector-worker/src/runtime-deps.ts. Only natives / - // runtime-installed deps belong there (playwright ships browsers via - // `npx playwright install`). Pure JS deps (pino, link-preview-js) must be - // bundled — externalising them previously caused every connector run to - // fail with "Cannot find package 'pino'" because the worker image didn't - // ship them. - external: [...EXTERNAL_RUNTIME_DEPS], - write: true, - minify: false, - sourcemap: false, - }); - - const code = await readFile(outPath, 'utf-8'); - if (mtimeMs !== null) touchCacheEntry(filePath, { mtimeMs, code }); - return code; - } finally { - await rm(tmpDir, { recursive: true, force: true }); - } -} +// `compileConnectorFromFile` is owned by `@lobu/connector-worker/compile` +// (LRU-capped at 8 entries, keyed by file mtime, identical to the previous +// implementation that lived here — see lobu#771 for the cap rationale). +// Re-exported here so existing server callers keep their import paths. +export const compileConnectorFromFile = connectorCompiler.compileConnectorFromFile; async function extractConnectorCatalogMetadata( filePath: string diff --git a/packages/server/src/utils/connector-compiler.ts b/packages/server/src/utils/connector-compiler.ts index 5ac6b3ee9..fc2b616aa 100644 --- a/packages/server/src/utils/connector-compiler.ts +++ b/packages/server/src/utils/connector-compiler.ts @@ -5,7 +5,7 @@ * and metadata extraction (finds ConnectorRuntime subclass with sync()+execute()). */ -import { EXTERNAL_RUNTIME_DEPS } from '../../../connector-worker/src/runtime-deps'; +import { EXTERNAL_RUNTIME_DEPS } from '@lobu/connector-worker/compile'; import { type CompileResult, compileSource, extractMetadata } from './compiler-core'; export interface ConnectorMetadata {