From edfb77b195695cbd0cf41520ae1f9568cfba8a89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 17:43:20 +0100 Subject: [PATCH 01/13] wip(spike): prune dead exports from @lobu/connector-sdk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes re-exports/types with zero consumers across packages/, examples/, scripts/, config/ outside of the SDK itself: - FeedMode enum (`'sync' | 'virtual'`) — only ever lived in the SDK - MatchStrategy — used internally inside identity-types but never imported externally; keep the value, drop the re-export - isSourceNativeEventType / SOURCE_NATIVE_EVENT_TYPES + event-taxonomy.ts — whole file existed only to be re-exported, no consumers anywhere - ReactionEntity — embedded in ReactionContext; nothing imports it directly - Stealth/CDP browser re-exports (humanWait, randomScroll, getRandomDelay, testBotDetection, StealthBrowser, launchStealthBrowser, withErrorCapture, withHttpRetry, BrowserAuthCascadeError, BrowserNetworkConfig, discoverChromeListeningPorts, discoverChromeProcessCdpUrls, normalizeCdpUrl, tryWebSocketCdp) — kept internal to the SDK where launcher/acquire use them; the index just stops claiming they're public surface. Rationale: SDK is the public surface third-party connectors consume. Every re-export is a backwards-compat obligation. Verified zero external uses with `rg -l "\bSYM\b" packages/ --type=ts | grep -v connector-sdk/src` per symbol before deletion. Typecheck clean: packages/connector-sdk, packages/connectors (modulo pre-existing whatsapp baileys-typings drift), packages/connector-worker, packages/cli, packages/server (modulo pre-existing guardrails/aggregator.ts SkillPreToolGuardrail breakage on main). --- packages/connector-sdk/src/connector-types.ts | 7 ----- packages/connector-sdk/src/event-taxonomy.ts | 31 ------------------- packages/connector-sdk/src/index.ts | 18 +---------- 3 files changed, 1 insertion(+), 55 deletions(-) delete mode 100644 packages/connector-sdk/src/event-taxonomy.ts diff --git a/packages/connector-sdk/src/connector-types.ts b/packages/connector-sdk/src/connector-types.ts index 92062a4f1..69441559d 100644 --- a/packages/connector-sdk/src/connector-types.ts +++ b/packages/connector-sdk/src/connector-types.ts @@ -315,13 +315,6 @@ export const IDENTITY = { export type IdentityNamespace = (typeof IDENTITY)[keyof typeof IDENTITY]; -export enum FeedMode { - /** Connector code runs on worker, syncs data */ - sync = 'sync', - /** Virtual feed backed by saved queries (future) */ - virtual = 'virtual', -} - // ============================================================================= // Action Definition // ============================================================================= diff --git a/packages/connector-sdk/src/event-taxonomy.ts b/packages/connector-sdk/src/event-taxonomy.ts deleted file mode 100644 index a401a6f89..000000000 --- a/packages/connector-sdk/src/event-taxonomy.ts +++ /dev/null @@ -1,31 +0,0 @@ -export const SOURCE_NATIVE_EVENT_TYPES = [ - 'article', - 'ask_hn', - 'comment', - 'commit', - 'discussion', - 'email', - 'file', - 'issue', - 'issue_comment', - 'message', - 'photo', - 'post', - 'pr_comment', - 'pull_request', - 'reply', - 'repository', - 'review', - 'section', - 'show_hn', - 'story', - 'thread', - 'tweet', - 'video', -] as const; - -const SOURCE_NATIVE_EVENT_TYPE_SET = new Set(SOURCE_NATIVE_EVENT_TYPES); - -export function isSourceNativeEventType(value: string | null | undefined): boolean { - return typeof value === 'string' && SOURCE_NATIVE_EVENT_TYPE_SET.has(value); -} diff --git a/packages/connector-sdk/src/index.ts b/packages/connector-sdk/src/index.ts index 90dbe2acc..c9f70ba8f 100644 --- a/packages/connector-sdk/src/index.ts +++ b/packages/connector-sdk/src/index.ts @@ -38,7 +38,6 @@ export type { EventEnvelope, Feed, FeedDefinition, - FeedMode, IdentityNamespace, Run, RunStatus, @@ -62,10 +61,8 @@ export { DerivedRelationshipMetadata, FactEventMetadata, IDENTITY_FACT_SEMANTIC_TYPE, - MatchStrategy, RelationshipTypeIdentityMetadata, } from './identity-types.js'; -export { isSourceNativeEventType, SOURCE_NATIVE_EVENT_TYPES } from './event-taxonomy.js'; export { normalizeAuthUserId, normalizeEmail, @@ -108,31 +105,18 @@ export type { AcquireBrowserOptions, AcquiredBrowser } from './browser/acquire.j export { acquireBrowser, BrowserAuthCascadeError } from './browser/acquire.js'; export type { CdpVersionInfo, ResolveCdpOptions } from './browser/cdp.js'; export { - discoverChromeListeningPorts, - discoverChromeProcessCdpUrls, fetchCdpVersionInfo, - normalizeCdpUrl, resolveCdpUrl, - tryWebSocketCdp, } from './browser/cdp.js'; export { CdpPage } from './browser/cdp-page.js'; export type { BrowserLaunchOptions, EnhancedBrowser } from './browser/launcher.js'; export { captureErrorArtifacts, launchBrowser, - withErrorCapture, } from './browser/launcher.js'; -export type { StealthBrowser, StealthBrowserOptions } from './browser/stealth.js'; -export { - getRandomDelay, - humanWait, - launchStealthBrowser, - randomScroll, - testBotDetection, -} from './browser/stealth.js'; export type { BrowserNetworkConfig, BrowserNetworkResult } from './browser-network.js'; export { browserNetworkSync } from './browser-network.js'; -export type { ReactionContext, ReactionEntity } from './reaction-sdk.js'; +export type { ReactionContext } from './reaction-sdk.js'; export type { Checkpoint, Content, From a159f230104ae7e11eb1891d4c1d432ac481b58b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 17:49:47 +0100 Subject: [PATCH 02/13] wip(spike): collapse legacy executor adapter, speak V1 SDK shapes over IPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The connector-worker executor stack was a multi-hop adapter that repackaged the V1 ConnectorRuntime contract (SyncContext/ActionContext/ AuthContext + EventEnvelope) into a legacy "feed/options" shape with magic underscore-prefixed keys (__action_key, __auth_mode, __feed_key, __entity_ids, __auth_config, __auth_previous_credentials), serialised that over IPC, unpacked it back to V1 in the child runner, then packed the result into a legacy `FeedSyncResult { contents: Content[] }`, serialised that, unpacked it in the daemon executor, and re-normalised each item a second time inside `processContent`. Concretely deleted: - `Content`, `Checkpoint`, `FeedOptions`, `FeedSyncResult`, `SessionState` from `packages/connector-sdk/src/types.ts` (only `Env` remains — it has 20+ downstream consumers in the gateway). The SDK now exposes one taxonomy: `SyncContext` / `ActionContext` / `AuthContext` / `EventEnvelope` from `connector-types.ts`. - `buildConnectorExecutionContext`, `mergeExecutionSessionState`, `mergeExecutionEnv`, `normalizeEventEnvelope`, `getActionOutput` from `connector-worker/src/executor/runtime.ts` (~120 LOC of pack/unpack). - The `__action_key` / `__auth_mode` / `__feed_key` / `__entity_ids` / `__auth_config` / `__auth_previous_credentials` / `stripInternalOptions` protocol on the IPC boundary. - The double `normalizeEventEnvelope` call (child runner already normalises, daemon's `processContent` then normalised again on the resulting `Content` — a no-op pass on already-normalised fields). The new contract: `ExecutorJob` is a discriminated union (`mode: 'sync' | 'action' | 'authenticate'`) carrying the V1 SDK shapes verbatim, and `ExecutorResult` mirrors `SyncResult` / `ActionResult` / `AuthResult` the same way. `child-runner.ts` reads the job and dispatches; the parent's `subprocess.ts` is mode-agnostic (it just plumbs events, checkpoints, and auth artifacts through hooks). Net diff: -242 LOC across the in-scope packages (sdk + worker), and the result/return paths from `executeSyncRun`, `executeActionRun`, `executeAuthRun` collapse from "consume legacy FeedSyncResult and unpack the right slot" to mode-narrowed direct access. Typecheck status: - packages/connector-sdk: clean - packages/connector-worker: clean - packages/connectors: clean (pre-existing whatsapp baileys-typings drift in src/whatsapp.ts only) - packages/cli: clean - packages/server: two BREAKING call-sites under packages/server (out of spike scope): src/lib/feed-sync.ts (executeCompiledConnector still passing the flat legacy params) src/tools/admin/manage_operations.ts (getActionOutput re-export gone) These reach across the package boundary into connector-worker internals via relative path / dynamic import — flagged as architectural leak in REPORT.md. Updating them is a one-screen rewrite to the new `{job: {mode, ...}}` signature plus `result.output` / `result.events` mode narrowing. Left for a focused follow-up PR so the spike stays within the read-only constraint on out-of-scope packages. The legacy test (`executor-heartbeat.test.ts`) was updated to the new shape; the underlying `bun test` integration cannot run in this worktree (unrelated `@lobu/embeddings` module-resolution issue that reproduces on the pre-spike commit too, recorded in REPORT.md). --- packages/connector-sdk/src/index.ts | 9 +- packages/connector-sdk/src/types.ts | 92 +---------- .../src/__tests__/executor-heartbeat.test.ts | 12 +- .../connector-worker/src/daemon/executor.ts | 144 ++++++++++------- .../src/executor/child-runner.ts | 135 +++++----------- .../src/executor/interface.ts | 89 +++++++---- .../connector-worker/src/executor/runtime.ts | 150 ++---------------- .../src/executor/subprocess.ts | 79 ++++----- 8 files changed, 234 insertions(+), 476 deletions(-) diff --git a/packages/connector-sdk/src/index.ts b/packages/connector-sdk/src/index.ts index c9f70ba8f..95e54a5e8 100644 --- a/packages/connector-sdk/src/index.ts +++ b/packages/connector-sdk/src/index.ts @@ -117,11 +117,4 @@ export { export type { BrowserNetworkConfig, BrowserNetworkResult } from './browser-network.js'; export { browserNetworkSync } from './browser-network.js'; export type { ReactionContext } from './reaction-sdk.js'; -export type { - Checkpoint, - Content, - Env, - FeedOptions, - FeedSyncResult, - SessionState, -} from './types.js'; +export type { Env } from './types.js'; diff --git a/packages/connector-sdk/src/types.ts b/packages/connector-sdk/src/types.ts index 98f8a89f9..91ba79d27 100644 --- a/packages/connector-sdk/src/types.ts +++ b/packages/connector-sdk/src/types.ts @@ -1,87 +1,7 @@ -/** - * Checkpoint data structure for tracking feed sync state - */ -export interface Checkpoint { - // Required for all feeds - used to filter content by time - last_timestamp?: Date; - - // Metadata - updated_at: Date; - total_items_processed?: number; - - // Platform-specific fields should extend this interface -} - -/** - * Sync result containing extracted content and updated checkpoint - * - * Note: checkpoint can be null for feeds that use incremental checkpoint - * updates via updateCheckpointFn during pagination (e.g., Reddit) - */ -export interface FeedSyncResult { - contents: Content[]; - checkpoint: Checkpoint | null; - metadata?: { - items_found: number; - items_skipped: number; - rate_limit_remaining?: number; - next_sync_recommended_at?: Date; - parent_map?: Record; // For hierarchical content (e.g., GitHub comments -> issues) - [key: string]: any; // Allow additional feed-specific metadata - }; - /** - * Auth state to persist after sync (browser cookies, etc.) - * Will be saved back to the linked auth profile for browser-based connectors. - */ - auth_update?: Record; -} - -/** - * Extracted content from platform - */ -export interface Content { - origin_id: string; // Platform's unique ID - payload_text: string; // Main text content - title?: string; // Title of content (e.g., post title, issue title, review subject) - author_name?: string; // Username/display name - source_url: string; // Link to original content - occurred_at: Date; // When content was posted - - // Source-native item type (e.g. 'thread', 'message', 'email', 'issue', 'review') - origin_type?: string; - - // Semantic type inside Lobu (defaults to 'content' for raw connector ingests) - semantic_type?: string; - - // Calculated engagement score (0-100, calculated by feed implementation) - score: number; - - // Optional parent reference for hierarchical content - origin_parent_id?: string | null; - - // Metadata including engagement metrics (platform-specific) - // Engagement fields: score, upvotes, downvotes, rating, helpful_count, reply_count, likes, views, retweets, replies, comments - // Platform fields: post_id, parent_id, etc. - metadata?: Record; -} - -/** - * Feed options passed from MCP tool - */ -export interface FeedOptions { - /** - * Number of days to look back when collecting historical data - * Default: 365 (1 year) - */ - lookback_days?: number; - - // Platform-specific options defined in each feed - [key: string]: any; -} - /** * Consolidated environment bindings used across the platform. - * This is the single source of truth for environment variable types. + * This is the single source of truth for environment variable types + * read by the gateway, worker, and connector code. */ export interface Env { // Environment @@ -154,11 +74,3 @@ export interface Env { // Allow any other env vars accessed via c.env[key] [key: string]: string | undefined; } - -/** - * Base session state type - feeds define their own specific types - * Values can come from env vars (defaults) or DB (per-connection overrides) - * At runtime, DB values override env defaults - */ -export type SessionState = Record; - diff --git a/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts b/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts index c1e805ead..950cf20d3 100644 --- a/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts +++ b/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts @@ -20,8 +20,8 @@ import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from 'bun: type AnyFn = (...args: any[]) => any; const executeCompiledConnectorMock = mock(async () => ({ - contents: [], - checkpoint: null, + mode: 'action', + output: { ok: true }, })); const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => @@ -30,8 +30,6 @@ const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => mock.module('../executor/runtime.js', () => ({ executeCompiledConnector: executeCompiledConnectorMock, - getActionOutput: () => ({ ok: true }), - normalizeEventEnvelope: (e: Record) => e, })); mock.module('../embeddings.js', () => ({ @@ -133,7 +131,7 @@ describe('executor heartbeats (lobu#860)', () => { // have fired — simulates the action body taking 60+ seconds. executeCompiledConnectorMock.mockImplementationOnce(async () => { await fireIntervalTicks(2); - return { contents: [{ result: { ok: true } }], checkpoint: null }; + return { mode: 'action', output: { ok: true } }; }); const job = { @@ -185,8 +183,8 @@ describe('executor heartbeats (lobu#860)', () => { test('heartbeat interval is cleared after a successful action run', async () => { const client = makeStubClient(); executeCompiledConnectorMock.mockImplementationOnce(async () => ({ - contents: [{ result: { ok: true } }], - checkpoint: null, + mode: 'action', + output: { ok: true }, })); const job = { diff --git a/packages/connector-worker/src/daemon/executor.ts b/packages/connector-worker/src/daemon/executor.ts index 75c15400d..0ce3cc420 100644 --- a/packages/connector-worker/src/daemon/executor.ts +++ b/packages/connector-worker/src/daemon/executor.ts @@ -5,14 +5,10 @@ * Generates embeddings and streams results. */ -import type { Checkpoint, Content, Env } from '@lobu/connector-sdk'; +import type { Env, EventEnvelope } from '@lobu/connector-sdk'; import { compileConnectorFromFile, findBundledConnectorFile } from '../compile-connector.js'; import { batchGenerateEmbeddings, generateEmbedding } from '../embeddings.js'; -import { - executeCompiledConnector, - getActionOutput, - normalizeEventEnvelope, -} from '../executor/runtime.js'; +import { executeCompiledConnector } from '../executor/runtime.js'; import { SubprocessExecutor } from '../executor/subprocess.js'; import type { ContentItem, ExecutorClient, PollResponse } from './client.js'; @@ -207,23 +203,21 @@ async function executeSyncRun( }; const result = await executeCompiledConnector({ - mode: 'sync', compiledCode: compiled_code, - config: (feedConfig ?? {}) as Record, - checkpoint: checkpoint as unknown as Checkpoint | null, - env, - connectionCredentials: ((job.connection_credentials as Record) ?? - null) as Record | null, - sessionState: (job.session_state ?? null) as Record | null, - credentials, - feedKey: feed_key, - entityIds: job.entity_ids ?? [], - apiType: 'api', executor: subprocessExecutor, + job: { + mode: 'sync', + config: mergeEnv(env, job.connection_credentials, feedConfig), + checkpoint: checkpoint as Record | null, + env, + sessionState: (job.session_state ?? null) as Record | null, + credentials: credentials ?? null, + feedKey: feed_key, + entityIds: job.entity_ids ?? [], + }, hooks: { - collectContents: false, onCheckpointUpdate: async (nextCheckpoint) => { - lastCheckpoint = nextCheckpoint as Record | null; + lastCheckpoint = nextCheckpoint; if (!lastCheckpoint) return; try { await client.stream({ @@ -236,9 +230,9 @@ async function executeSyncRun( console.error('[executor] Checkpoint flush failed:', err); } }, - onContentChunk: async (items) => { - for (const item of items) { - const contentItem = await processContent(item, cfg.generateEmbeddings); + onEventChunk: async (events) => { + for (const event of events) { + const contentItem = await processEvent(event, cfg.generateEmbeddings); batch.push(contentItem); itemsCollectedSoFar++; @@ -250,7 +244,10 @@ async function executeSyncRun( }, }); - lastCheckpoint = result.checkpoint as unknown as Record | null; + if (result.mode !== 'sync') { + throw new Error(`Expected sync result, got mode=${result.mode}`); + } + lastCheckpoint = result.checkpoint; await flushBatch(); @@ -369,20 +366,23 @@ async function executeActionRun( try { const result = await executeCompiledConnector({ - mode: 'action', compiledCode: compiled_code, - actionKey: action_key, - actionInput: (action_input ?? {}) as Record, - env, - connectionCredentials: ((job.connection_credentials as Record) ?? - null) as Record | null, - credentials, - apiType: 'api', executor: subprocessExecutor, + job: { + mode: 'action', + actionKey: action_key, + actionInput: (action_input ?? {}) as Record, + config: mergeEnv(env, job.connection_credentials, null), + env, + sessionState: null, + credentials: credentials ?? null, + }, }); - // For actions, the "contents" array may contain a single result envelope - const actionOutput = getActionOutput(result); + if (result.mode !== 'action') { + throw new Error(`Expected action result, got mode=${result.mode}`); + } + const actionOutput = result.output; await client.completeAction({ run_id, @@ -459,14 +459,15 @@ async function executeAuthRun( try { const result = await executeCompiledConnector({ - mode: 'authenticate', compiledCode: compiled_code, - previousCredentials: previous_credentials ?? null, - env, executor: subprocessExecutor, - apiType: 'api', + job: { + mode: 'authenticate', + config: {}, + previousCredentials: previous_credentials ?? null, + env, + }, hooks: { - collectContents: false, onAuthArtifact: async (artifact) => { try { await client.emitAuthArtifact({ @@ -498,7 +499,7 @@ async function executeAuthRun( clearInterval(heartbeatInterval); - if (!result.auth_result?.credentials) { + if (result.mode !== 'authenticate' || !result.auth?.credentials) { await client.completeAuth({ run_id, worker_id: client.id, @@ -512,8 +513,8 @@ async function executeAuthRun( run_id, worker_id: client.id, status: 'success', - credentials: result.auth_result.credentials, - metadata: result.auth_result.metadata, + credentials: result.auth.credentials, + metadata: result.auth.metadata, }); console.error(`[executor] Auth run ${run_id} completed`); @@ -675,30 +676,53 @@ async function executeEmbedBackfillRun( } /** - * Process a content item - convert to ContentItem and optionally generate embedding + * Merge the run-level env, the per-connection stored credentials, and the + * per-feed config into the single `config` object that the connector's + * `sync()` / `execute()` sees. Connection credentials override env (per-conn + * trumps fleet-wide); feed config wins last (per-feed trumps connection). */ -async function processContent(item: Content, generateEmbeddings: boolean): Promise { - const normalized = normalizeEventEnvelope(item as Record); +function mergeEnv( + env: Env, + connectionCredentials: Record | undefined | null, + feedConfig: Record | undefined | null +): Record { + return { + ...(env as unknown as Record), + ...((connectionCredentials ?? {}) as Record), + ...((feedConfig ?? {}) as Record), + }; +} + +/** + * Convert a V1 EventEnvelope (the SDK's standard sync output) into the + * gateway-bound ContentItem shape, optionally generating an embedding. + */ +async function processEvent( + event: EventEnvelope, + generateEmbeddings: boolean +): Promise { + const occurredAtIso = + event.occurred_at instanceof Date + ? event.occurred_at.toISOString() + : (event.occurred_at as unknown as string); + const contentItem: ContentItem = { - id: normalized.origin_id, - title: normalized.title, - payload_text: normalized.payload_text, - author_name: normalized.author_name, - occurred_at: - normalized.occurred_at instanceof Date - ? normalized.occurred_at.toISOString() - : (normalized.occurred_at as unknown as string), - source_url: normalized.source_url, - score: normalized.score, - metadata: normalized.metadata, - origin_parent_id: normalized.origin_parent_id || undefined, - origin_type: normalized.origin_type, - semantic_type: normalized.semantic_type, + id: event.origin_id, + title: event.title, + payload_text: event.payload_text, + author_name: event.author_name, + occurred_at: occurredAtIso, + source_url: event.source_url ?? undefined, + score: typeof event.score === 'number' ? event.score : 0, + metadata: event.metadata ?? {}, + origin_parent_id: event.origin_parent_id ?? undefined, + origin_type: event.origin_type, + semantic_type: event.semantic_type ?? event.origin_type, }; if (generateEmbeddings) { try { - const textForEmbedding = [normalized.title, normalized.payload_text] + const textForEmbedding = [event.title, event.payload_text] .filter(Boolean) .join(' ') .trim(); @@ -706,7 +730,7 @@ async function processContent(item: Content, generateEmbeddings: boolean): Promi contentItem.embedding = await generateEmbedding(textForEmbedding); } } catch (err) { - console.error(`[executor] Embedding generation failed for ${normalized.origin_id}:`, err); + console.error(`[executor] Embedding generation failed for ${event.origin_id}:`, err); } } diff --git a/packages/connector-worker/src/executor/child-runner.ts b/packages/connector-worker/src/executor/child-runner.ts index 7268ec966..0d43a05c7 100644 --- a/packages/connector-worker/src/executor/child-runner.ts +++ b/packages/connector-worker/src/executor/child-runner.ts @@ -1,40 +1,23 @@ /** * Child Runner - Entry point for forked subprocess * - * Receives compiled connector code + context via IPC, - * dynamically imports the ConnectorRuntime class, and executes sync()/execute(). - * Streams normalized content chunks and checkpoint updates back via IPC. + * Receives compiled connector code + an ExecutorJob via IPC, dynamically + * imports the ConnectorRuntime class, and dispatches to sync() / execute() / + * authenticate() using the V1 SDK shapes directly — no magic-key adapter. */ import { randomBytes } from 'node:crypto'; import { rm, writeFile } from 'node:fs/promises'; import { join } from 'node:path'; import { pathToFileURL } from 'node:url'; -import type { SyncResult } from '@lobu/connector-sdk'; -import type { FeedSyncResult } from './interface.js'; -import { normalizeEventEnvelope } from './runtime.js'; +import type { EventEnvelope, SyncResult } from '@lobu/connector-sdk'; +import type { ExecutorJob, ExecutorResult } from './interface.js'; -const CONTENT_CHUNK_SIZE = 100; +const EVENT_CHUNK_SIZE = 100; interface ChildMessage { compiledCode: string; - context: { - options: Record; - checkpoint: any; - env: Record; - sessionState?: Record | null; - apiType: 'api' | 'browser'; - }; -} - -function stripInternalOptions(options: Record): Record { - const result: Record = {}; - for (const [key, value] of Object.entries(options)) { - if (!key.startsWith('__')) { - result[key] = value; - } - } - return result; + job: ExecutorJob; } function findRuntimeClass(mod: Record) { @@ -101,23 +84,14 @@ function awaitAuthSignal( }); } -async function executeConnectorRuntime(instance: any, context: ChildMessage['context']) { - const isAction = typeof context.options?.__action_key === 'string'; - const isAuth = context.options?.__auth_mode === true; - const credentials = context.sessionState?.oauth ?? null; - const publicConfig = stripInternalOptions(context.options ?? {}); - const runtimeConfig = { - ...(context.env ?? {}), - ...publicConfig, - }; - - if (isAuth) { +async function executeConnectorRuntime( + instance: any, + job: ExecutorJob +): Promise { + if (job.mode === 'authenticate') { const authResult = await instance.authenticate({ - config: (context.options?.__auth_config ?? {}) as Record, - previousCredentials: (context.options?.__auth_previous_credentials ?? null) as Record< - string, - unknown - > | null, + config: job.config, + previousCredentials: job.previousCredentials, emit: async (artifact: Record) => { await sendIPC({ type: 'auth_artifact', artifact }); }, @@ -129,57 +103,33 @@ async function executeConnectorRuntime(instance: any, context: ChildMessage['con throw new Error('authenticate() returned no credentials'); } - const result: FeedSyncResult = { - contents: [], - checkpoint: null, - auth_result: { - credentials: authResult.credentials, - metadata: authResult.metadata, - }, + return { + mode: 'authenticate', + auth: { credentials: authResult.credentials, metadata: authResult.metadata }, }; - return result; } - if (isAction) { - const actionKey = context.options.__action_key; - const actionInput = (context.options.__action_input ?? {}) as Record; + if (job.mode === 'action') { const actionResult = await instance.execute({ - actionKey, - input: actionInput, - credentials, - config: runtimeConfig, + actionKey: job.actionKey, + input: job.actionInput, + credentials: job.credentials, + config: { ...job.env, ...job.config }, }); if (!actionResult?.success) { - throw new Error(actionResult?.error || `Action '${actionKey}' failed`); + throw new Error(actionResult?.error || `Action '${job.actionKey}' failed`); } - const result: FeedSyncResult = { - contents: [ - { - origin_id: `action-${actionKey}-${Date.now()}`, - payload_text: '', - source_url: '', - occurred_at: new Date(), - score: 0, - metadata: actionResult.output ?? {}, - }, - ], - checkpoint: context.checkpoint ?? null, - metadata: { - items_found: 0, - items_skipped: 0, - }, - }; - return result; + return { mode: 'action', output: actionResult.output ?? {} }; } - const emitEvents = async (events: unknown[]) => { - const normalized = events.map((event: any) => normalizeEventEnvelope(event)); - for (let index = 0; index < normalized.length; index += CONTENT_CHUNK_SIZE) { + // mode === 'sync' + const emitEvents = async (events: EventEnvelope[]) => { + for (let index = 0; index < events.length; index += EVENT_CHUNK_SIZE) { await sendIPC({ - type: 'content_chunk', - items: normalized.slice(index, index + CONTENT_CHUNK_SIZE), + type: 'event_chunk', + events: events.slice(index, index + EVENT_CHUNK_SIZE), }); } }; @@ -189,22 +139,24 @@ async function executeConnectorRuntime(instance: any, context: ChildMessage['con }; const syncResult = (await instance.sync({ - feedKey: context.options?.__feed_key, - config: runtimeConfig, - checkpoint: context.checkpoint ?? null, - credentials, - entityIds: (context.options?.__entity_ids as number[] | undefined) ?? [], - sessionState: context.sessionState ?? null, + feedKey: job.feedKey, + config: { ...job.env, ...job.config }, + checkpoint: job.checkpoint, + credentials: job.credentials, + entityIds: job.entityIds, + sessionState: job.sessionState, emitEvents, updateCheckpoint, })) as SyncResult; const events = Array.isArray(syncResult?.events) ? syncResult.events : []; await emitEvents(events); - const result: FeedSyncResult = { - contents: [], - checkpoint: (syncResult?.checkpoint ?? null) as any, - auth_update: syncResult?.auth_update ?? undefined, + + return { + mode: 'sync', + events: [], + checkpoint: (syncResult?.checkpoint ?? null) as Record | null, + auth_update: syncResult?.auth_update ?? null, metadata: { items_found: typeof syncResult?.metadata?.items_found === 'number' @@ -217,7 +169,6 @@ async function executeConnectorRuntime(instance: any, context: ChildMessage['con ...(syncResult?.metadata ?? {}), }, }; - return result; } /** Send an IPC message and wait for it to be flushed to the parent. */ @@ -330,7 +281,7 @@ async function main() { ); try { - const { compiledCode, context } = msg as ChildMessage; + const { compiledCode, job } = msg as ChildMessage; // Write compiled code to temp file for dynamic import. // - `flag: 'wx'` fails if the file already exists (no symlink follow, @@ -352,7 +303,7 @@ async function main() { } const instance = new (RuntimeClass as new () => any)(); - const result = await executeConnectorRuntime(instance, context); + const result = await executeConnectorRuntime(instance, job); // Send result back to parent (wait for IPC flush before exiting) await sendIPC({ type: 'result', result }); diff --git a/packages/connector-worker/src/executor/interface.ts b/packages/connector-worker/src/executor/interface.ts index 7f8fb1187..c2b218453 100644 --- a/packages/connector-worker/src/executor/interface.ts +++ b/packages/connector-worker/src/executor/interface.ts @@ -1,39 +1,66 @@ -import type { Checkpoint, Content, FeedOptions, SessionState } from '@lobu/connector-sdk'; +import type { AuthResult, EventEnvelope, SyncCredentials } from '@lobu/connector-sdk'; /** - * Result shape returned by the subprocess executor. - * Mirrors the legacy SyncResult from the SDK (contents + checkpoint). + * Executor mode discriminator. The executor speaks the same V1 SDK shapes + * the connector code expects — no more magic `__action_key` / `__feed_key` / + * `__auth_mode` packing. */ -export interface FeedSyncResult { - contents: Content[]; - checkpoint: Checkpoint | null; - metadata?: Record; - auth_update?: Record; - /** Set when the subprocess completed an authenticate() run. */ - auth_result?: { - credentials: Record; - metadata?: Record; - }; -} +export type ExecutorJob = + | { + mode: 'sync'; + feedKey?: string | null; + config: Record; + checkpoint: Record | null; + entityIds: number[]; + credentials: SyncCredentials | null; + sessionState: Record | null; + env: Record; + } + | { + mode: 'action'; + actionKey: string; + actionInput: Record; + config: Record; + credentials: SyncCredentials | null; + sessionState: Record | null; + env: Record; + } + | { + mode: 'authenticate'; + config: Record; + previousCredentials: Record | null; + env: Record; + }; /** - * Context passed to the executor for a single sync job + * Result shape returned by the executor. One discriminated union per mode + * mirrors the SDK's `SyncResult` / `ActionResult` / `AuthResult` directly. */ -export interface SyncContext { - options: FeedOptions; - checkpoint: Checkpoint | null; - env: Record; - sessionState?: SessionState | null; - apiType: 'api' | 'browser'; -} +export type ExecutorResult = + | { + mode: 'sync'; + events: EventEnvelope[]; + checkpoint: Record | null; + auth_update?: Record | null; + metadata?: Record; + } + | { + mode: 'action'; + output: Record; + } + | { + mode: 'authenticate'; + auth: AuthResult; + }; export interface ExecutionHooks { - onContentChunk?: (items: Content[]) => Promise | void; - onCheckpointUpdate?: (checkpoint: Checkpoint | null) => Promise | void; - collectContents?: boolean; - /** Auth runs: connector emits an artifact (QR/redirect/prompt/status). */ + /** Sync runs: connector streamed a chunk of events (and we should persist them). */ + onEventChunk?: (events: EventEnvelope[]) => Promise | void; + /** Sync runs: connector pushed an incremental checkpoint update. */ + onCheckpointUpdate?: (checkpoint: Record | null) => Promise | void; + /** Auth runs: connector emitted an artifact (QR/redirect/prompt/status). */ onAuthArtifact?: (artifact: Record) => Promise | void; - /** Auth runs: connector pauses until a named signal arrives. */ + /** Auth runs: connector paused until a named signal arrives. */ onAwaitAuthSignal?: ( name: string, options?: { timeoutMs?: number } @@ -41,13 +68,13 @@ export interface ExecutionHooks { } /** - * Pluggable executor interface - * Allows swapping between subprocess execution, direct execution, etc. + * Pluggable executor interface. The only implementation today is + * `SubprocessExecutor`; the seam stays around so tests can stub it. */ export interface SyncExecutor { execute( compiledCode: string, - context: SyncContext, + job: ExecutorJob, hooks?: ExecutionHooks - ): Promise; + ): Promise; } diff --git a/packages/connector-worker/src/executor/runtime.ts b/packages/connector-worker/src/executor/runtime.ts index 541b21b2e..27f2c9d3b 100644 --- a/packages/connector-worker/src/executor/runtime.ts +++ b/packages/connector-worker/src/executor/runtime.ts @@ -1,147 +1,17 @@ -import type { Checkpoint, Content, FeedOptions, SessionState } from '@lobu/connector-sdk'; -import type { ExecutionHooks, FeedSyncResult, SyncContext, SyncExecutor } from './interface.js'; +import type { ExecutionHooks, ExecutorJob, ExecutorResult, SyncExecutor } from './interface.js'; import { SubprocessExecutor } from './subprocess.js'; -interface ConnectorOAuthCredentials { - provider: string; - accessToken: string; - refreshToken?: string | null; - expiresAt?: string | null; - scope?: string | null; -} - -interface BaseExecutionParams { +/** + * Top-level entry point used by the daemon executor. Just delegates to a + * `SyncExecutor` implementation (defaults to `SubprocessExecutor`) with the + * V1 SDK shapes — no more magic-key adapter layer in between. + */ +export async function executeCompiledConnector(params: { compiledCode: string; - env?: Record; - connectionCredentials?: Record | null; - sessionState?: SessionState | null; - credentials?: ConnectorOAuthCredentials | null; - apiType?: 'api' | 'browser'; + job: ExecutorJob; executor?: SyncExecutor; hooks?: ExecutionHooks; -} - -interface SyncConnectorExecutionParams extends BaseExecutionParams { - mode: 'sync'; - config?: Record | null; - checkpoint?: Checkpoint | null; - feedKey?: string | null; - entityIds?: number[] | null; -} - -interface ActionConnectorExecutionParams extends BaseExecutionParams { - mode: 'action'; - actionKey: string; - actionInput?: Record | null; - checkpoint?: Checkpoint | null; -} - -interface AuthConnectorExecutionParams extends BaseExecutionParams { - mode: 'authenticate'; - /** Connector-specific auth input (rare). */ - config?: Record | null; - /** Existing credentials for re-auth flows. */ - previousCredentials?: Record | null; -} - -type ConnectorExecutionParams = - | SyncConnectorExecutionParams - | ActionConnectorExecutionParams - | AuthConnectorExecutionParams; - -function mergeExecutionEnv( - env?: Record, - connectionCredentials?: Record | null -): Record { - return { - ...(env ?? {}), - ...(connectionCredentials ?? {}), - }; -} - -function mergeExecutionSessionState( - sessionState?: SessionState | null, - credentials?: ConnectorOAuthCredentials | null -): SessionState | null { - if (!sessionState && !credentials) return null; - - return { - ...(sessionState ?? {}), - ...(credentials ? { oauth: credentials } : {}), - }; -} - -function buildConnectorExecutionContext(params: ConnectorExecutionParams): SyncContext { - const env = mergeExecutionEnv(params.env, params.connectionCredentials); - const sessionState = mergeExecutionSessionState(params.sessionState, params.credentials); - - if (params.mode === 'action') { - return { - options: { - __action_key: params.actionKey, - __action_input: params.actionInput ?? {}, - ...((params.actionInput ?? {}) as Record), - } as FeedOptions, - checkpoint: (params.checkpoint ?? null) as Checkpoint | null, - env, - sessionState, - apiType: params.apiType ?? 'api', - }; - } - - if (params.mode === 'authenticate') { - return { - options: { - __auth_mode: true, - __auth_config: params.config ?? {}, - __auth_previous_credentials: params.previousCredentials ?? null, - } as FeedOptions, - checkpoint: null, - env, - sessionState, - apiType: params.apiType ?? 'api', - }; - } - - return { - options: { - ...(params.config ?? {}), - __feed_key: params.feedKey, - __entity_ids: params.entityIds ?? [], - } as FeedOptions, - checkpoint: (params.checkpoint ?? null) as Checkpoint | null, - env, - sessionState, - apiType: params.apiType ?? 'api', - }; -} - -export async function executeCompiledConnector( - params: ConnectorExecutionParams -): Promise { +}): Promise { const executor = params.executor ?? new SubprocessExecutor(); - const context = buildConnectorExecutionContext(params); - return executor.execute(params.compiledCode, context, params.hooks); -} - -export function normalizeEventEnvelope(event: Record): Content { - const originType = event.origin_type; - const rawDate = event.occurred_at ?? event.published_at; - return { - origin_id: event.origin_id ?? event.external_id, - payload_text: event.payload_text ?? event.content ?? '', - title: event.title, - author_name: event.author_name ?? event.author, - source_url: event.source_url ?? event.url ?? '', - occurred_at: rawDate ? new Date(rawDate) : new Date(), - origin_type: originType, - semantic_type: event.semantic_type ?? originType, - score: typeof event.score === 'number' ? event.score : 0, - origin_parent_id: event.origin_parent_id ?? event.parent_external_id ?? null, - metadata: event.metadata ?? {}, - }; -} - -export function getActionOutput(result: FeedSyncResult): Record { - return (result.contents[0]?.metadata ?? {}) as Record; + return executor.execute(params.compiledCode, params.job, params.hooks); } diff --git a/packages/connector-worker/src/executor/subprocess.ts b/packages/connector-worker/src/executor/subprocess.ts index 36ddf098c..cb6ec225a 100644 --- a/packages/connector-worker/src/executor/subprocess.ts +++ b/packages/connector-worker/src/executor/subprocess.ts @@ -11,7 +11,8 @@ import { existsSync } from 'node:fs'; import { createRequire } from 'node:module'; import { dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; -import type { ExecutionHooks, FeedSyncResult, SyncContext, SyncExecutor } from './interface.js'; +import type { EventEnvelope } from '@lobu/connector-sdk'; +import type { ExecutionHooks, ExecutorJob, ExecutorResult, SyncExecutor } from './interface.js'; import { StreamRedactor, redactOutput } from './redact.js'; /** @@ -115,6 +116,10 @@ const DEFAULT_OPTIONS: SubprocessExecutorOptions = { maxOldSpaceSize: 512, }; +function jobEnv(job: ExecutorJob): Record { + return job.env; +} + export class SubprocessExecutor implements SyncExecutor { private options: SubprocessExecutorOptions; @@ -124,10 +129,10 @@ export class SubprocessExecutor implements SyncExecutor { async execute( compiledCode: string, - context: SyncContext, + job: ExecutorJob, hooks?: ExecutionHooks - ): Promise { - return new Promise((resolve, reject) => { + ): Promise { + return new Promise((resolve, reject) => { let childRunnerPath = join(__dirname, 'child-runner.js'); const childRunnerTsPath = join(__dirname, 'child-runner.ts'); @@ -163,18 +168,14 @@ export class SubprocessExecutor implements SyncExecutor { const child = fork(childRunnerPath, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'], execArgv, - env: { ...pickSystemEnv(), ...context.env } as NodeJS.ProcessEnv, + env: { ...pickSystemEnv(), ...jobEnv(job) } as NodeJS.ProcessEnv, }); let resolved = false; let terminalMessageReceived = false; let timedOut = false; - let latestCheckpoint = context.checkpoint; - let finalMetadata: Record | undefined; - let finalAuthUpdate: Record | undefined; - let finalAuthResult: FeedSyncResult['auth_result'] | undefined; - const collectedContents: FeedSyncResult['contents'] = []; - const collectContents = hooks?.collectContents !== false; + let latestCheckpoint = + job.mode === 'sync' ? job.checkpoint : null; let processingChain = Promise.resolve(); // Per-stream ring buffers — preserve the *tail* (most recent bytes), @@ -250,13 +251,10 @@ export class SubprocessExecutor implements SyncExecutor { // so validate shape at this trust boundary before dereferencing fields. const onMessage = (msg: any) => { if (!msg || typeof msg !== 'object' || typeof msg.type !== 'string') return; - if (msg.type === 'content_chunk') { - const items = Array.isArray(msg.items) ? msg.items : []; + if (msg.type === 'event_chunk') { + const events: EventEnvelope[] = Array.isArray(msg.events) ? msg.events : []; queueTask(async () => { - if (collectContents) { - collectedContents.push(...items); - } - await hooks?.onContentChunk?.(items); + await hooks?.onEventChunk?.(events); }); return; } @@ -319,29 +317,15 @@ export class SubprocessExecutor implements SyncExecutor { if (msg.type === 'result') { terminalMessageReceived = true; - const result = msg.result as FeedSyncResult; + const result = msg.result as ExecutorResult; queueTask(async () => { - finalMetadata = result.metadata; - finalAuthUpdate = result.auth_update; - finalAuthResult = result.auth_result; - latestCheckpoint = result.checkpoint; - - if (Array.isArray(result.contents) && result.contents.length > 0) { - if (collectContents) { - collectedContents.push(...result.contents); - } - await hooks?.onContentChunk?.(result.contents); + // For sync results, surface the trailing checkpoint to callers + // through the result; in-flight `checkpoint_update` messages have + // already been forwarded via the hook. + if (result.mode === 'sync') { + latestCheckpoint = result.checkpoint; } - - settle(() => - resolve({ - contents: collectedContents, - checkpoint: latestCheckpoint, - metadata: finalMetadata, - auth_update: finalAuthUpdate, - auth_result: finalAuthResult, - }) - ); + settle(() => resolve(result)); }); return; } @@ -440,20 +424,19 @@ export class SubprocessExecutor implements SyncExecutor { child.stdout?.on('data', onStdout); child.stderr?.on('data', onStderr); - // Send the compiled code and context to the child. Use the callback - // form so a failed send (e.g. child died before IPC handshake, or fork - // resolved to a non-existent file) rejects the executor promise + // Hint to keep linter quiet when latestCheckpoint isn't consumed in + // non-sync modes; it's the parent-side mirror of the sync checkpoint + // stream and only relevant when hooks.onCheckpointUpdate is wired. + void latestCheckpoint; + + // Send the compiled code and job descriptor to the child. Use the + // callback form so a failed send (e.g. child died before IPC handshake, + // or fork resolved to a non-existent file) rejects the executor promise // instead of going unhandled on the IPC channel. child.send( { compiledCode, - context: { - options: context.options, - checkpoint: context.checkpoint, - env: context.env, - sessionState: context.sessionState, - apiType: context.apiType, - }, + job, }, (err) => { if (err) { From b3a9a4f6cbcb94819dd8a9e3ea583440de24f523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 17:59:52 +0100 Subject: [PATCH 03/13] wip(spike): finish landing connector V1 IPC migration in server call-sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Updates the two server callers that reached into `connector-worker`'s executor internals via relative path + dynamic import (flagged as an abstraction leak in REPORT.md spike 2): - `packages/server/src/lib/feed-sync.ts:7` was importing from `'../../../connector-worker/src/executor/runtime'` — now goes through the proper `@lobu/connector-worker/executor/runtime` package export. The sync call switches to the new `{ compiledCode, job: { mode: 'sync', ... }, hooks }` signature; item-count tracking moves from `result.contents.length` to an `onEventChunk` counter (events are streamed via the hook now, not collected in the result). - `packages/server/src/tools/admin/manage_operations.ts:207` used a dynamic `await import("../../../../connector-worker/...")` to grab `executeCompiledConnector` + `getActionOutput`. The latter no longer exists (spike 2 deleted it), so this call-site goes through `@lobu/connector-worker/executor/runtime` for `executeCompiledConnector` alone and mode-narrows directly on `result.output`. Typecheck status: - packages/connector-sdk: clean - packages/connector-worker: clean - packages/connectors: clean (modulo pre-existing whatsapp/baileys drift on main) - packages/cli: clean - packages/server: clean (modulo pre-existing gateway/guardrails/aggregator.ts errors on main) `make build-packages` was run to refresh the connector-worker .d.ts so the new exports resolve through the package boundary instead of through the workspace .ts sources. --- packages/server/src/lib/feed-sync.ts | 37 +++++++++++++------ .../src/tools/admin/manage_operations.ts | 35 +++++++++++------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/packages/server/src/lib/feed-sync.ts b/packages/server/src/lib/feed-sync.ts index c6335f47f..3bee3a5ac 100644 --- a/packages/server/src/lib/feed-sync.ts +++ b/packages/server/src/lib/feed-sync.ts @@ -4,7 +4,7 @@ * Extracted from scripts/sync-local.ts for programmatic reuse. */ -import { executeCompiledConnector } from '../../../connector-worker/src/executor/runtime'; +import { executeCompiledConnector } from '@lobu/connector-worker/executor/runtime'; import { getDb, parsePgNumberArray } from '../db/client'; import { resolveConnectorCode } from '../utils/ensure-connector-installed'; import { mergeExecutionConfig, resolveExecutionAuth } from '../utils/execution-context'; @@ -112,20 +112,33 @@ export async function runFeed(feed: FeedRecord): Promise<{ itemCount: number }> logContext: { feedId: feed.id }, logMessage: 'Failed to resolve feed credentials', }); + let itemCount = 0; const result = await executeCompiledConnector({ - mode: 'sync', compiledCode, - config: mergeExecutionConfig(feed.connection_config, feed.config), - checkpoint: feed.checkpoint as any, - env: process.env as Record, - connectionCredentials, - sessionState, - credentials, - feedKey: feed.feed_key, - entityIds: feed.entity_ids, - apiType: (feed.api_type as 'api' | 'browser') || 'api', + job: { + mode: 'sync', + config: mergeExecutionConfig( + feed.connection_config, + connectionCredentials, + feed.config + ), + checkpoint: feed.checkpoint, + env: process.env as Record, + sessionState, + credentials, + feedKey: feed.feed_key, + entityIds: feed.entity_ids, + }, + hooks: { + onEventChunk: async (events) => { + itemCount += events.length; + }, + }, }); - const itemCount = result.contents.length; + + if (result.mode !== 'sync') { + throw new Error(`Expected sync result, got mode=${result.mode}`); + } logger.info({ feedId: feed.id, itemCount }, 'Feed sync completed'); return { itemCount }; diff --git a/packages/server/src/tools/admin/manage_operations.ts b/packages/server/src/tools/admin/manage_operations.ts index 5927c94e9..611e5c86f 100644 --- a/packages/server/src/tools/admin/manage_operations.ts +++ b/packages/server/src/tools/admin/manage_operations.ts @@ -204,25 +204,32 @@ async function executeLocalActionInline( }); try { - const { executeCompiledConnector, getActionOutput } = await import( - '../../../../connector-worker/src/executor/runtime' + const { executeCompiledConnector } = await import( + '@lobu/connector-worker/executor/runtime' + ); + const envStrings = Object.fromEntries( + Object.entries(env).filter(([, value]) => typeof value === 'string') ); const result = await executeCompiledConnector({ - mode: 'action', compiledCode, - actionKey: - operation.backend_config.backend === 'local_action' - ? operation.backend_config.actionKey - : operation.operation_key, - actionInput, - env: Object.fromEntries(Object.entries(env).filter(([, value]) => typeof value === 'string')), - connectionCredentials, - sessionState, - credentials, - apiType: 'api', + job: { + mode: 'action', + actionKey: + operation.backend_config.backend === 'local_action' + ? operation.backend_config.actionKey + : operation.operation_key, + actionInput, + config: { ...envStrings, ...connectionCredentials }, + env: envStrings, + sessionState, + credentials, + }, }); - const actionOutput = getActionOutput(result); + if (result.mode !== 'action') { + throw new Error(`Expected action result, got mode=${result.mode}`); + } + const actionOutput = result.output; await sql`UPDATE runs SET status = 'completed', completed_at = NOW(), action_output = ${sql.json(actionOutput)} WHERE id = ${runId} AND organization_id = ${organizationId}`; return { status: 'completed', output: actionOutput }; } catch (error) { From 035adcc8b56ef815d0bf04e8d78bf44e14e2f0f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:02:45 +0100 Subject: [PATCH 04/13] wip(spike): default execute() on ConnectorRuntime, prune 19 boilerplate stubs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Most connectors don't expose actions — `rg -c "Actions not supported" packages/connectors/src` returned 19 identical paste-jobs of async execute(_ctx: ActionContext): Promise { return { success: false, error: 'Actions not supported' }; } (one minor variant: whatsapp's `'Actions not supported in v1.'`, and revolut's tab-indented double-quoted version). Parallel to how `authenticate()` already had a default-throws implementation, this turns `execute()` from `abstract` into a default-rejecting method on the base class. Connectors that don't declare any `actions` in their `ConnectorDefinition` no longer need to write the stub. Concrete deltas: - `packages/connector-sdk/src/connector-runtime.ts`: `execute` becomes a non-abstract method returning `{ success: false, error: 'Actions not supported' }`. Doc comment updated to call out the new optionality. - 19 connectors lose the 3-line stub and the dead `type ActionContext, type ActionResult` imports it required: capterra, g2, glassdoor, gmaps, google_play, hackernews, ios_appstore, linkedin, microsoft_outlook, producthunt, reddit, revolut, rss, spotify, trustpilot, website, whatsapp (kept Auth imports for `authenticate`), x, youtube. Net: -114 LOC across packages/connectors, +10 LOC in the SDK base class. Compatibility checks: - `child-runner.ts findRuntimeClass` checks `prototype?.sync` AND `prototype?.execute`. The inherited default on `ConnectorRuntime.prototype` still satisfies `Foo.prototype.execute` access (prototype-chain lookup), so the discovery heuristic continues to accept these subclasses. - The 8 device-only "throws BRIDGE_ONLY from execute()" stubs in apple_health, apple_photos, apple_screen_time, chrome, chrome_bookmarks, chrome_downloads, chrome_history, local_directory, whatsapp_local are NOT touched — those communicate "this connector only runs on a device worker" (a different semantic from "actions not supported") and the user's brief was specifically the 16+ "Actions not supported" pastes. Typecheck status (after `bun run build` in connector-sdk to regenerate .d.ts so connector-package consumers see the new default): - packages/connector-sdk: clean - packages/connector-worker: clean - packages/connectors: 126 errors — IDENTICAL count to pre-spike baseline (verified via git stash + recount). All are the pre-existing whatsapp baileys typings drift plus the dom-lib / .ts-import- extension / unknown-type drift in capterra, g2, glassdoor, etc. on main. None are caused by this spike. - packages/cli: clean - packages/server: clean (modulo pre-existing gateway/guardrails/aggregator.ts errors). --- packages/connector-sdk/src/connector-runtime.ts | 12 ++++++++++-- packages/connectors/src/capterra.ts | 6 ------ packages/connectors/src/g2.ts | 6 ------ packages/connectors/src/glassdoor.ts | 6 ------ packages/connectors/src/gmaps.ts | 6 ------ packages/connectors/src/google_play.ts | 6 ------ packages/connectors/src/hackernews.ts | 7 ------- packages/connectors/src/ios_appstore.ts | 6 ------ packages/connectors/src/linkedin.ts | 6 ------ packages/connectors/src/microsoft_outlook.ts | 7 ------- packages/connectors/src/producthunt.ts | 7 ------- packages/connectors/src/reddit.ts | 7 ------- packages/connectors/src/revolut.ts | 6 ------ packages/connectors/src/rss.ts | 7 ------- packages/connectors/src/spotify.ts | 7 ------- packages/connectors/src/trustpilot.ts | 6 ------ packages/connectors/src/website.ts | 7 ------- packages/connectors/src/whatsapp.ts | 6 ------ packages/connectors/src/x.ts | 6 ------ packages/connectors/src/youtube.ts | 7 ------- 20 files changed, 10 insertions(+), 124 deletions(-) diff --git a/packages/connector-sdk/src/connector-runtime.ts b/packages/connector-sdk/src/connector-runtime.ts index dc4e0203b..d04b8e598 100644 --- a/packages/connector-sdk/src/connector-runtime.ts +++ b/packages/connector-sdk/src/connector-runtime.ts @@ -21,7 +21,11 @@ import type { * Subclasses must: * - Set `definition` with connector metadata * - Implement `sync()` for feed data ingestion - * - Implement `execute()` for action execution + * + * Subclasses may optionally override `execute()` and `authenticate()`; both + * have safe defaults (action rejected with `{ success: false, ... }`, auth + * throws). Connectors that don't declare any `actions` in their definition + * need not override `execute()`. * * @example * ```ts @@ -59,11 +63,15 @@ export abstract class ConnectorRuntime { * Execute an action on the connected service. * * Called either inline (low-risk) or by the worker (high-risk with approval). + * Default implementation rejects with "Actions not supported" — connectors + * that don't declare any `actions` in their definition need not override. * * @param ctx - Action context with action key, input, and credentials * @returns Action result with output data */ - abstract execute(ctx: ActionContext): Promise; + async execute(_ctx: ActionContext): Promise { + return { success: false, error: 'Actions not supported' }; + } /** * Run an interactive authentication flow that produces credentials for the diff --git a/packages/connectors/src/capterra.ts b/packages/connectors/src/capterra.ts index 8ad4be954..7cc0bcd00 100644 --- a/packages/connectors/src/capterra.ts +++ b/packages/connectors/src/capterra.ts @@ -4,8 +4,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -270,8 +268,4 @@ export default class CapterraConnector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/g2.ts b/packages/connectors/src/g2.ts index c74351c22..e596e8277 100644 --- a/packages/connectors/src/g2.ts +++ b/packages/connectors/src/g2.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -283,8 +281,4 @@ export default class G2Connector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/glassdoor.ts b/packages/connectors/src/glassdoor.ts index 527258181..396c778c7 100644 --- a/packages/connectors/src/glassdoor.ts +++ b/packages/connectors/src/glassdoor.ts @@ -6,8 +6,6 @@ import { createHash } from 'node:crypto'; import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -288,8 +286,4 @@ export default class GlassdoorConnector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/gmaps.ts b/packages/connectors/src/gmaps.ts index 8b7333438..1d90ee6fa 100644 --- a/packages/connectors/src/gmaps.ts +++ b/packages/connectors/src/gmaps.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -190,8 +188,4 @@ export default class GoogleMapsConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/google_play.ts b/packages/connectors/src/google_play.ts index 0f5db5083..516dde71e 100644 --- a/packages/connectors/src/google_play.ts +++ b/packages/connectors/src/google_play.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -362,8 +360,4 @@ export default class GooglePlayConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/hackernews.ts b/packages/connectors/src/hackernews.ts index 0d1873c27..04279291b 100644 --- a/packages/connectors/src/hackernews.ts +++ b/packages/connectors/src/hackernews.ts @@ -7,8 +7,6 @@ import TurndownService from 'turndown'; import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -325,11 +323,6 @@ export default class HackerNewsConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Transform helpers // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/ios_appstore.ts b/packages/connectors/src/ios_appstore.ts index dd08b13f5..6036ad5dd 100644 --- a/packages/connectors/src/ios_appstore.ts +++ b/packages/connectors/src/ios_appstore.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -219,8 +217,4 @@ export default class IOSAppStoreConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/linkedin.ts b/packages/connectors/src/linkedin.ts index a29fb3577..e7969dc65 100644 --- a/packages/connectors/src/linkedin.ts +++ b/packages/connectors/src/linkedin.ts @@ -9,8 +9,6 @@ */ import { - type ActionContext, - type ActionResult, browserNetworkSync, type ConnectorDefinition, ConnectorRuntime, @@ -487,8 +485,4 @@ export default class LinkedInConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/microsoft_outlook.ts b/packages/connectors/src/microsoft_outlook.ts index 432bc82e6..8fa5316f1 100644 --- a/packages/connectors/src/microsoft_outlook.ts +++ b/packages/connectors/src/microsoft_outlook.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -244,11 +242,6 @@ export default class MicrosoftOutlookConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Feed: messages // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/producthunt.ts b/packages/connectors/src/producthunt.ts index 66bb8c19b..356a66da0 100644 --- a/packages/connectors/src/producthunt.ts +++ b/packages/connectors/src/producthunt.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -396,11 +394,6 @@ export default class ProductHuntConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Transform helpers // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/reddit.ts b/packages/connectors/src/reddit.ts index 039d50b3c..3cf3ef159 100644 --- a/packages/connectors/src/reddit.ts +++ b/packages/connectors/src/reddit.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -393,11 +391,6 @@ export default class RedditConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // App-only OAuth // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/revolut.ts b/packages/connectors/src/revolut.ts index d14e43dcc..b0542fec6 100644 --- a/packages/connectors/src/revolut.ts +++ b/packages/connectors/src/revolut.ts @@ -19,8 +19,6 @@ */ import { - type ActionContext, - type ActionResult, browserNetworkSync, type ConnectorDefinition, ConnectorRuntime, @@ -562,8 +560,4 @@ export default class RevolutConnector extends ConnectorRuntime { }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: "Actions not supported" }; - } } diff --git a/packages/connectors/src/rss.ts b/packages/connectors/src/rss.ts index 7a2d8f4a3..eebf654d8 100644 --- a/packages/connectors/src/rss.ts +++ b/packages/connectors/src/rss.ts @@ -7,8 +7,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -202,11 +200,6 @@ export default class RSSConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Feed fetching & parsing // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/spotify.ts b/packages/connectors/src/spotify.ts index e46e0467f..7c668d54e 100644 --- a/packages/connectors/src/spotify.ts +++ b/packages/connectors/src/spotify.ts @@ -6,8 +6,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -303,11 +301,6 @@ export default class SpotifyConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // Feed: saved_tracks // ------------------------------------------------------------------------- diff --git a/packages/connectors/src/trustpilot.ts b/packages/connectors/src/trustpilot.ts index 1f95a16a1..9797b1852 100644 --- a/packages/connectors/src/trustpilot.ts +++ b/packages/connectors/src/trustpilot.ts @@ -5,8 +5,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -207,8 +205,4 @@ export default class TrustpilotConnector extends ConnectorRuntime { }; }); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/website.ts b/packages/connectors/src/website.ts index f5442137c..106a0682c 100644 --- a/packages/connectors/src/website.ts +++ b/packages/connectors/src/website.ts @@ -10,8 +10,6 @@ import { createHash } from 'node:crypto'; import TurndownService from 'turndown'; import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, type EventEnvelope, @@ -290,11 +288,6 @@ export default class WebsiteConnector extends ConnectorRuntime { metadata: { pages_scraped: urls.length, events_created: events.length }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - private async dismissOverlays(page: Page): Promise { const dismissLabels = [ 'Accept', diff --git a/packages/connectors/src/whatsapp.ts b/packages/connectors/src/whatsapp.ts index 28e1cf897..8b3817a00 100644 --- a/packages/connectors/src/whatsapp.ts +++ b/packages/connectors/src/whatsapp.ts @@ -11,8 +11,6 @@ */ import { - type ActionContext, - type ActionResult, type AuthContext, type AuthResult, type ConnectorDefinition, @@ -428,10 +426,6 @@ export default class WhatsAppConnector extends ConnectorRuntime { throw error; } } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported in v1.' }; - } } // --------------------------------------------------------------------------- diff --git a/packages/connectors/src/x.ts b/packages/connectors/src/x.ts index 30f1103b2..c1decc0b7 100644 --- a/packages/connectors/src/x.ts +++ b/packages/connectors/src/x.ts @@ -7,8 +7,6 @@ */ import { - type ActionContext, - type ActionResult, browserNetworkSync, type ConnectorDefinition, ConnectorRuntime, @@ -529,8 +527,4 @@ export default class XConnector extends ConnectorRuntime { return syncViaBrowser(ctx, config, checkpoint); } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } diff --git a/packages/connectors/src/youtube.ts b/packages/connectors/src/youtube.ts index c9bbfe197..b6b317406 100644 --- a/packages/connectors/src/youtube.ts +++ b/packages/connectors/src/youtube.ts @@ -7,8 +7,6 @@ */ import { - type ActionContext, - type ActionResult, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, @@ -425,11 +423,6 @@ export default class YouTubeConnector extends ConnectorRuntime { // ------------------------------------------------------------------------- // execute // ------------------------------------------------------------------------- - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } - // ------------------------------------------------------------------------- // YouTube API helpers // ------------------------------------------------------------------------- From e4a3b8803d9caf1f84555f6dbdda7e424b7d6218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:10:50 +0100 Subject: [PATCH 05/13] wip(spike): consolidate connector compile pipelines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three packages each shipped their own near-identical copy of `findBundledConnectorFile` + `compileConnectorFromFile` + the `npm:` specifier esbuild plugin + the `lobu`/`@lobu/connector-sdk` aliasing + the mtime-keyed LRU compile cache: - packages/connector-worker/src/compile-connector.ts (~150 LOC) - packages/cli/src/commands/_lib/connector-loader.ts (~120 LOC) - packages/server/src/utils/connector-catalog.ts (~150 LOC of the file, the rest stays — metadata extraction, URI normalisation, catalog iteration) Comments in two of the three already said things like "single source of truth lives in @lobu/connector-worker. Duplicated here as a flat string array to keep this file self-contained" and "Mirror the worker-side resolver in compile-connector.ts: try the subdir layout first … Keep these in sync if either side changes." That's the drift surface this spike closes. New shared module: `packages/connector-worker/src/compile/index.ts`, exported as `@lobu/connector-worker/compile`. (Originally `./build` but the repo root `.gitignore` has `**/build/` which silently ate the new directory; renamed to `./compile` so it tracks.) Surface: - `EXTERNAL_RUNTIME_DEPS` — re-exported from `runtime-deps.ts`. - `findBundledConnectorFile(key, candidateDirs)` — pure helper; the per-environment candidate-dir list is the parameter that genuinely differs (worker image vs CLI dist-bundle vs gateway pod). No internal cache; callers that need one (`server/connector-catalog.ts`, `cli/connector-loader.ts`) wrap their own thin Map. - `createNpmSpecifierPlugin({ onUnresolved })` — esbuild plugin parametrised on the warn hook (server wants to log via pino, CLI/ worker stay silent). - `createConnectorCompiler({ cacheMax?, sdkEntry?, onUnresolvedNpm? })` → `{ compileConnectorFromFile }`. Each instance owns its own 8-entry LRU; callers that want per-process isolation create their own. Net diff: -381 / +296 (with the new 229-LOC shared module) = -85 LOC across the three packages plus zero risk of the three implementations drifting from each other. Server now declares `@lobu/connector-worker: workspace:*` as a real dep (it was already importing from it after the prior round-2 spike, but without a corresponding package.json entry). Typecheck status (`make typecheck` from worktree root): **clean.** The pre-existing `gateway/guardrails/aggregator.ts` `SkillPreToolGuardrail` errors that REPORT.md called out turned out to be stale dist artifacts in this worktree — once `make build-packages` re-emitted `@lobu/core`'s .d.ts, those resolved too. Whole repo typecheck is green. --- bun.lock | 1 + .../cli/src/commands/_lib/connector-loader.ts | 116 +-------- packages/connector-worker/package.json | 4 + .../connector-worker/src/compile-connector.ts | 168 ++----------- .../connector-worker/src/compile/index.ts | 229 ++++++++++++++++++ packages/server/package.json | 1 + .../server/src/utils/connector-catalog.ts | 158 ++---------- 7 files changed, 296 insertions(+), 381 deletions(-) create mode 100644 packages/connector-worker/src/compile/index.ts diff --git a/bun.lock b/bun.lock index ecbc11110..28766dad8 100644 --- a/bun.lock +++ b/bun.lock @@ -340,6 +340,7 @@ "@hono/node-server": "^1.13.7", "@hono/zod-openapi": "^1.2.1", "@lobu/connector-sdk": "workspace:*", + "@lobu/connector-worker": "workspace:*", "@lobu/core": "workspace:*", "@mariozechner/pi-ai": "^0.51.6", "@modelcontextprotocol/sdk": "^1.27.1", diff --git a/packages/cli/src/commands/_lib/connector-loader.ts b/packages/cli/src/commands/_lib/connector-loader.ts index 5421f87ee..991591bc2 100644 --- a/packages/cli/src/commands/_lib/connector-loader.ts +++ b/packages/cli/src/commands/_lib/connector-loader.ts @@ -1,33 +1,22 @@ /** * Connector source resolution + compilation for the CLI. * - * Mirrors `packages/server/src/utils/connector-catalog.ts` (the server-side - * versions of these helpers) but inlined here so the CLI doesn't depend on - * @lobu/server — that package is private and never published, while the CLI - * is what end users install from npm. - * * Lookup order for bundled connector source: * 1. dist/connectors/ next to this file (published CLI runtime — see * packages/cli/scripts/build.cjs which copies packages/connectors/src * there at build time). * 2. ../../../connectors/src relative to this file (monorepo dev). * 3. process.cwd()/packages/connectors/src (running from a parent dir). + * + * The resolver + esbuild bundle pipeline themselves live in + * `@lobu/connector-worker/compile` so the three call-sites (worker, CLI, + * server) share one implementation. */ -import { existsSync } from "node:fs"; -import { mkdtemp, readFile, rm, stat } from "node:fs/promises"; -import { createRequire } from "node:module"; -import { tmpdir } from "node:os"; -import { join, resolve } from "node:path"; -import { build, type Plugin } from "esbuild"; - -const require_ = createRequire(import.meta.url); -const SDK_ENTRY = require_.resolve("@lobu/connector-sdk"); - -// Single source of truth lives in @lobu/connector-worker. Duplicated here as -// a flat string array to keep this file self-contained — drift cost is low -// (the list is tiny and rarely changes; if a new external dep is added the -// CLI surfaces a clear bundling error and we update both places). -const EXTERNAL_RUNTIME_DEPS = ["playwright", "sharp", "jimp"] as const; +import { resolve } from "node:path"; +import { + createConnectorCompiler, + findBundledConnectorFile as findInDirs, +} from "@lobu/connector-worker/compile"; const SOURCE_DIR_CANDIDATES = [ // Published CLI runtime: packages/cli/scripts/build.cjs copies the @@ -44,92 +33,11 @@ const bundledFileCache = new Map(); export function findBundledConnectorFile(key: string): string | null { const cached = bundledFileCache.get(key); if (cached !== undefined) return cached; - // Mirror the resolver in @lobu/connector-worker's compile-connector.ts: - // subdir layout (`browser.evaluate` → `browser/evaluate.ts`) first, then - // the flat underscore convention (`chrome.tabs` → `chrome_tabs.ts`). - const candidates = [ - `${key.replace(/\./g, "/")}.ts`, - `${key.replace(/\./g, "_")}.ts`, - ]; - let found: string | null = null; - outer: for (const dir of SOURCE_DIR_CANDIDATES) { - for (const fileName of candidates) { - const filePath = resolve(dir, fileName); - if (!filePath.startsWith(`${dir}/`)) continue; - if (existsSync(filePath)) { - found = filePath; - break outer; - } - } - } + const found = findInDirs(key, SOURCE_DIR_CANDIDATES); bundledFileCache.set(key, found); return found; } -// Connectors import npm deps with the `npm:` prefix. Strip it so esbuild -// resolves the bare specifier against the local node_modules; mark unresolved -// ones external so the CLI bundle still produces (the runtime will fail loud -// if a missing dep is actually used). -const npmSpecifierPlugin: Plugin = { - name: "npm-specifier", - setup(b) { - b.onResolve({ filter: /^npm:/ }, async (args) => { - const bare = args.path - .slice(4) - .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, "$1") - .replace(/^([^/@]+)@[^/]*/, "$1"); - const resolved = await b.resolve(bare, { - resolveDir: args.resolveDir, - kind: args.kind, - }); - if (resolved.errors.length > 0) { - return { path: bare, external: true, errors: [], warnings: [] }; - } - return resolved; - }); - }, -}; - -const compiledFileCache = new Map(); +const compiler = createConnectorCompiler(); -export async function compileConnectorFromFile( - filePath: string -): Promise { - let mtimeMs: number | null = null; - try { - mtimeMs = (await stat(filePath)).mtimeMs; - const cached = compiledFileCache.get(filePath); - if (cached && cached.mtimeMs === mtimeMs) return cached.code; - } catch { - // stat failed — fall through and let the build surface the real error. - } - - const tmpDir = await mkdtemp(join(tmpdir(), "lobu-cli-connector-")); - const outPath = join(tmpDir, "out.mjs"); - - try { - await build({ - entryPoints: [filePath], - outfile: outPath, - bundle: true, - format: "esm", - platform: "node", - target: "node20", - alias: { lobu: SDK_ENTRY, "@lobu/connector-sdk": SDK_ENTRY }, - banner: { - js: "import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);", - }, - plugins: [npmSpecifierPlugin], - external: [...EXTERNAL_RUNTIME_DEPS], - write: true, - minify: false, - sourcemap: false, - }); - - const code = await readFile(outPath, "utf-8"); - if (mtimeMs !== null) compiledFileCache.set(filePath, { mtimeMs, code }); - return code; - } finally { - await rm(tmpDir, { recursive: true, force: true }); - } -} +export const compileConnectorFromFile = compiler.compileConnectorFromFile; diff --git a/packages/connector-worker/package.json b/packages/connector-worker/package.json index 2aaff17e8..59e4568f2 100644 --- a/packages/connector-worker/package.json +++ b/packages/connector-worker/package.json @@ -23,6 +23,10 @@ "./executor/interface": { "import": "./dist/executor/interface.js", "types": "./dist/executor/interface.d.ts" + }, + "./compile": { + "import": "./dist/compile/index.js", + "types": "./dist/compile/index.d.ts" } }, "files": [ diff --git a/packages/connector-worker/src/compile-connector.ts b/packages/connector-worker/src/compile-connector.ts index 00b5840ad..3521784ab 100644 --- a/packages/connector-worker/src/compile-connector.ts +++ b/packages/connector-worker/src/compile-connector.ts @@ -1,41 +1,35 @@ /** - * Worker-side connector compiler. + * Worker-side connector resolver + compile entry point. * - * Until lobu#771's perf brainstorm, the gateway compiled connector bundles - * via esbuild and shipped the ~13 MB output inline in every `/api/workers/poll` - * response. The gateway pod held all ~29 connector bundles in its - * compile cache (~384 MB, dominant heap occupant under the 1 GiB limit; - * see lobu#771 for the heap-snapshot trail). + * Until lobu#771, the gateway compiled connector bundles via esbuild and + * shipped the ~13 MB output inline in every `/api/workers/poll` response. + * The gateway pod held all ~29 connector bundles in its compile cache + * (~384 MB, dominant heap occupant under the 1 GiB limit). For fleet + * workers (and embedded mode where worker + gateway share a host), the + * bundled connector .ts source is on disk in both pods — the gateway + * doesn't need to compile or ship it. The gateway now sends only the + * `connector_key` and this module compiles locally in the worker process. * - * For fleet workers (and embedded mode where worker + gateway share a host), - * the bundled connector .ts source is on disk in both pods — the gateway - * doesn't need to compile or ship it. The gateway now sends - * `connector_source_path` (the absolute path that `findBundledConnectorFile` - * resolved) and this module compiles locally in the worker process. + * Device workers (Lobu Mac Bridge, etc.) keep getting `compiled_code` + * inline — they don't have the connectors directory on disk. * - * Device workers (Lobu Mac Bridge, etc.) keep getting `compiled_code` inline - * — they don't have the connectors directory on disk. - * - * This file mirrors the server's `utils/connector-catalog.ts` compile pipeline. - * Kept here (rather than shared) so the connector-worker package doesn't take - * a runtime dep on the server package's bundle layout. + * The resolver + esbuild bundle pipeline themselves are owned by the + * shared `./build` module so the gateway and CLI sides don't drift. + * This file just supplies the worker-image-specific candidate dirs. */ -import { existsSync } from 'node:fs'; -import { mkdtemp, readFile, rm, stat } from 'node:fs/promises'; -import { createRequire } from 'node:module'; -import { tmpdir } from 'node:os'; -import { join, resolve } from 'node:path'; +import { resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; -import { build, type Plugin } from 'esbuild'; -import { EXTERNAL_RUNTIME_DEPS } from './runtime-deps.js'; +import { + createConnectorCompiler, + findBundledConnectorFile as findInDirs, +} from './compile/index.js'; // Worker-side resolver for the bundled connectors directory. The gateway's -// `findBundledConnectorFile` resolves to paths that exist in the gateway -// image (e.g. /app/packages/server/dist/connectors); those paths do not -// exist in the worker image, which has the sources at -// /app/packages/connectors. Each side resolves locally instead of trusting -// gateway-supplied absolute paths. +// resolver targets paths that exist in the gateway image +// (e.g. /app/packages/server/dist/connectors); those paths do not exist +// in the worker image, which has the sources at /app/packages/connectors. +// Each side resolves locally instead of trusting gateway-supplied paths. const HERE = fileURLToPath(new URL('.', import.meta.url)); const WORKER_CONNECTOR_DIR_CANDIDATES = [ // Monorepo: workspace package at packages/connector-worker/src/ → @@ -58,120 +52,10 @@ const WORKER_CONNECTOR_DIR_CANDIDATES = [ resolve(process.cwd(), 'connectors'), ]; -// Strict regex for connector_key: lowercase letters/digits, optional dots -// for namespacing, underscores for word separators. Defense-in-depth even -// though keys come from a trusted DB column — we're about to use the value -// to construct a filesystem path. -const CONNECTOR_KEY_RE = /^[a-z][a-z0-9]*(?:[._][a-z0-9]+)*$/; - export function findBundledConnectorFile(key: string): string | null { - if (!CONNECTOR_KEY_RE.test(key)) return null; - // Two filename conventions: - // - Subdirectory layout (preferred for grouped primitives): the dot in - // `browser.evaluate` maps to `browser/evaluate.ts`. Lets us co-locate - // related connectors without renaming the key. - // - Flat-with-underscores (existing convention): `chrome.tabs` → - // `chrome_tabs.ts`, `apple_health` → `apple_health.ts`. - // Try subdirectory first so newer primitives win when both happen to exist. - const candidates = [ - `${key.replace(/\./g, '/')}.ts`, - `${key.replace(/\./g, '_')}.ts`, - ]; - for (const dir of WORKER_CONNECTOR_DIR_CANDIDATES) { - for (const fileName of candidates) { - const filePath = resolve(dir, fileName); - // Belt-and-braces: the resolved path must stay under the candidate - // dir. CONNECTOR_KEY_RE already forbids `..`, but the regex doesn't - // know about our path-joining choices. - if (!filePath.startsWith(`${dir}/`)) continue; - if (existsSync(filePath)) return filePath; - } - } - return null; -} - -const require_ = createRequire(import.meta.url); -const SDK_ENTRY = require_.resolve('@lobu/connector-sdk'); - -// Connectors declare runtime npm deps via `import x from 'npm:foo@1.2.3'`. -// Strip the prefix so esbuild resolves the bare name against node_modules; -// when the package isn't installed here, mark it external so the bundle -// still emits. The worker image is expected to provide it. -const npmSpecifierPlugin: Plugin = { - name: 'npm-specifier', - setup(b) { - b.onResolve({ filter: /^npm:/ }, async (args) => { - const bare = args.path - .slice(4) - .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, '$1') - .replace(/^([^/@]+)@[^/]*/, '$1'); - const resolved = await b.resolve(bare, { - resolveDir: args.resolveDir, - kind: args.kind, - }); - if (resolved.errors.length > 0) { - return { path: bare, external: true, errors: [], warnings: [] }; - } - return resolved; - }); - }, -}; - -// LRU-capped cache. Worker daemon is long-lived; one entry per recently-used -// connector. See the matching cache in -// `packages/server/src/utils/connector-catalog.ts` for the cap rationale. -const COMPILED_FILE_CACHE_MAX = 8; -const compiledFileCache = new Map(); - -function touchCacheEntry(filePath: string, entry: { mtimeMs: number; code: string }): void { - compiledFileCache.delete(filePath); - compiledFileCache.set(filePath, entry); - while (compiledFileCache.size > COMPILED_FILE_CACHE_MAX) { - const oldest = compiledFileCache.keys().next().value; - if (oldest === undefined) break; - compiledFileCache.delete(oldest); - } + return findInDirs(key, WORKER_CONNECTOR_DIR_CANDIDATES); } -export async function compileConnectorFromFile(filePath: string): Promise { - let mtimeMs: number | null = null; - try { - mtimeMs = (await stat(filePath)).mtimeMs; - const cached = compiledFileCache.get(filePath); - if (cached && cached.mtimeMs === mtimeMs) { - touchCacheEntry(filePath, cached); - return cached.code; - } - } catch { - // stat failed — let the build surface the real error. - } - - const tmpDir = await mkdtemp(join(tmpdir(), 'lobu-connector-worker-')); - const outPath = join(tmpDir, 'out.mjs'); +const compiler = createConnectorCompiler(); - try { - await build({ - entryPoints: [filePath], - outfile: outPath, - bundle: true, - format: 'esm', - platform: 'node', - target: 'node20', - alias: { lobu: SDK_ENTRY, '@lobu/connector-sdk': SDK_ENTRY }, - banner: { - js: `import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);`, - }, - plugins: [npmSpecifierPlugin], - external: [...EXTERNAL_RUNTIME_DEPS], - write: true, - minify: false, - sourcemap: false, - }); - - const code = await readFile(outPath, 'utf-8'); - if (mtimeMs !== null) touchCacheEntry(filePath, { mtimeMs, code }); - return code; - } finally { - await rm(tmpDir, { recursive: true, force: true }); - } -} +export const compileConnectorFromFile = compiler.compileConnectorFromFile; diff --git a/packages/connector-worker/src/compile/index.ts b/packages/connector-worker/src/compile/index.ts new file mode 100644 index 000000000..8c5575a51 --- /dev/null +++ b/packages/connector-worker/src/compile/index.ts @@ -0,0 +1,229 @@ +/** + * Shared connector compile pipeline. + * + * Three packages (`@lobu/connector-worker` itself, `@lobu/cli`, and + * `@lobu/server`) each used to ship their own near-identical copies of: + * + * - `findBundledConnectorFile(key)` — walks a list of candidate dirs + * trying both filename conventions (`browser.evaluate → browser/evaluate.ts` + * and `chrome.tabs → chrome_tabs.ts`). + * - `compileConnectorFromFile(filePath)` — esbuild bundle with the + * `npm:` specifier plugin, the `lobu` / `@lobu/connector-sdk` aliases, + * `EXTERNAL_RUNTIME_DEPS` externalised, and an mtime-keyed LRU cache. + * - The `npm:` specifier resolver plugin. + * - The `EXTERNAL_RUNTIME_DEPS` constant. + * + * Three copies meant three "keep these in sync" comments and three places + * to fix every esbuild-flag or candidate-dir change. This module is the + * one place that owns those mechanics; each caller supplies its own + * candidate-dir list (and optional warn hook) since those are genuinely + * environment-specific (gateway pod vs worker pod vs npm-installed CLI). + */ + +import { existsSync } from 'node:fs'; +import { mkdtemp, readFile, rm, stat } from 'node:fs/promises'; +import { createRequire } from 'node:module'; +import { tmpdir } from 'node:os'; +import { join, resolve } from 'node:path'; +import { build, type Plugin } from 'esbuild'; +import { EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; + +export { EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; + +// Strict regex for connector_key: lowercase letters/digits, optional dots +// for namespacing, underscores for word separators. Defense-in-depth even +// though keys come from a trusted DB column — we're about to use the value +// to construct a filesystem path. +const CONNECTOR_KEY_RE = /^[a-z][a-z0-9]*(?:[._][a-z0-9]+)*$/; + +/** + * Resolve a connector_key to a `.ts` source file under one of the supplied + * candidate directories. + * + * Tries two filename conventions in order: + * - subdirectory layout: `browser.evaluate` → `browser/evaluate.ts` + * (lets us group related primitives without renaming the key); + * - flat-with-underscores: `chrome.tabs` → `chrome_tabs.ts` + * (existing convention). + * + * Returns the absolute path of the first match, or `null` if none exists. + * Performs no caching of its own — callers that hit this on a hot path + * (gateway worker-poll, CLI compile loop) can layer their own memo on + * top, since the right TTL depends on whether they expect new connector + * files to appear at runtime. + */ +export function findBundledConnectorFile( + key: string, + candidateDirs: readonly string[] +): string | null { + if (!CONNECTOR_KEY_RE.test(key)) return null; + const candidates = [ + `${key.replace(/\./g, '/')}.ts`, + `${key.replace(/\./g, '_')}.ts`, + ]; + for (const dir of candidateDirs) { + for (const fileName of candidates) { + const filePath = resolve(dir, fileName); + // Belt-and-braces: the resolved path must stay under the candidate + // dir. CONNECTOR_KEY_RE already forbids `..`, but the regex doesn't + // know about our path-joining choices. + if (!filePath.startsWith(`${dir}/`)) continue; + if (existsSync(filePath)) return filePath; + } + } + return null; +} + +/** + * Resolve the `@lobu/connector-sdk` module entry from this module's + * perspective. Used as the esbuild `alias` target so connector code that + * imports `from 'lobu'` or `from '@lobu/connector-sdk'` resolves to the + * same physical file the runtime will import — avoiding the + * `instanceof ConnectorRuntime` cross-realm trap. + */ +function resolveSdkEntry(): string { + const require_ = createRequire(import.meta.url); + return require_.resolve('@lobu/connector-sdk'); +} + +interface NpmSpecifierPluginOptions { + /** + * Called when a `npm:foo@1.2.3` import resolves to a package that's not + * installed in the current environment. The plugin externalises the + * import (so the bundle still emits) and the runtime must supply it. + * Use this hook to log / surface the externalisation. + */ + onUnresolved?: (info: { bareSpecifier: string; importer: string }) => void; +} + +/** + * esbuild plugin that strips the `npm:` prefix from connector imports + * (`import x from 'npm:foo@1.2.3'`) and resolves the bare specifier + * against node_modules. When the package isn't installed in the build + * environment, externalises so the bundle still produces — the runtime + * is expected to provide it. + */ +export function createNpmSpecifierPlugin(options?: NpmSpecifierPluginOptions): Plugin { + return { + name: 'npm-specifier', + setup(b) { + b.onResolve({ filter: /^npm:/ }, async (args) => { + const bare = args.path + .slice(4) + .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, '$1') + .replace(/^([^/@]+)@[^/]*/, '$1'); + const resolved = await b.resolve(bare, { + resolveDir: args.resolveDir, + kind: args.kind, + }); + if (resolved.errors.length > 0) { + options?.onUnresolved?.({ bareSpecifier: bare, importer: args.importer }); + return { path: bare, external: true, errors: [], warnings: [] }; + } + return resolved; + }); + }, + }; +} + +interface CompileOptions { + /** + * Max entries kept in the mtime-keyed LRU. Each entry is the full + * compiled bundle (~13 MB today, smaller as transitive deps are + * externalised). Cap default 8 keeps memory bounded; pass a smaller + * value in memory-constrained environments. + * @default 8 + */ + cacheMax?: number; + /** + * Override the `@lobu/connector-sdk` entry esbuild aliases against. + * Defaults to the SDK resolved from this module's `require.resolve`. + * Overriding is only useful when the caller knows of a more + * appropriate physical file (e.g. a server bundle that wants to point + * back at a sibling dist file). + */ + sdkEntry?: string; + /** + * Hook fired when `npm:` specifiers fail to resolve and the import is + * externalised. Forwarded to `createNpmSpecifierPlugin`. + */ + onUnresolvedNpm?: NpmSpecifierPluginOptions['onUnresolved']; +} + +const DEFAULT_CACHE_MAX = 8; + +/** + * Compile a single connector source file to an ESM bundle string, + * suitable for the executor's subprocess `import()` step. + * + * The returned bundle: + * - is ESM (`format: 'esm'`, `target: 'node20'`); + * - aliases `lobu` and `@lobu/connector-sdk` to the SDK entry so + * connectors targeting either specifier resolve to the same module; + * - has a banner injecting a CJS-compatible `require` shim; + * - externalises `EXTERNAL_RUNTIME_DEPS` (native deps + Playwright); + * - is mtime-cached: a repeat call with the same `filePath` whose + * mtime hasn't changed returns the cached bundle without hitting + * esbuild. + */ +export function createConnectorCompiler(options?: CompileOptions) { + const cacheMax = options?.cacheMax ?? DEFAULT_CACHE_MAX; + const sdkEntry = options?.sdkEntry ?? resolveSdkEntry(); + const compiledFileCache = new Map(); + const plugin = createNpmSpecifierPlugin({ onUnresolved: options?.onUnresolvedNpm }); + + function touchCacheEntry(filePath: string, entry: { mtimeMs: number; code: string }): void { + compiledFileCache.delete(filePath); + compiledFileCache.set(filePath, entry); + while (compiledFileCache.size > cacheMax) { + const oldest = compiledFileCache.keys().next().value; + if (oldest === undefined) break; + compiledFileCache.delete(oldest); + } + } + + async function compileConnectorFromFile(filePath: string): Promise { + let mtimeMs: number | null = null; + try { + mtimeMs = (await stat(filePath)).mtimeMs; + const cached = compiledFileCache.get(filePath); + if (cached && cached.mtimeMs === mtimeMs) { + touchCacheEntry(filePath, cached); + return cached.code; + } + } catch { + // stat failed — let the build surface the real error. + } + + const tmpDir = await mkdtemp(join(tmpdir(), 'lobu-connector-')); + const outPath = join(tmpDir, 'out.mjs'); + + try { + await build({ + entryPoints: [filePath], + outfile: outPath, + bundle: true, + format: 'esm', + platform: 'node', + target: 'node20', + alias: { lobu: sdkEntry, '@lobu/connector-sdk': sdkEntry }, + banner: { + js: `import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);`, + }, + plugins: [plugin], + external: [...EXTERNAL_RUNTIME_DEPS], + write: true, + minify: false, + sourcemap: false, + }); + + const code = await readFile(outPath, 'utf-8'); + if (mtimeMs !== null) touchCacheEntry(filePath, { mtimeMs, code }); + return code; + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } + } + + return { compileConnectorFromFile }; +} diff --git a/packages/server/package.json b/packages/server/package.json index 03251c6a6..c12ad5fe5 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -37,6 +37,7 @@ "@hono/zod-openapi": "^1.2.1", "@lobu/core": "workspace:*", "@lobu/connector-sdk": "workspace:*", + "@lobu/connector-worker": "workspace:*", "@mariozechner/pi-ai": "^0.51.6", "@modelcontextprotocol/sdk": "^1.27.1", "@polyglot-sql/sdk": "^0.1.13", diff --git a/packages/server/src/utils/connector-catalog.ts b/packages/server/src/utils/connector-catalog.ts index 88463d0fb..1f308caae 100644 --- a/packages/server/src/utils/connector-catalog.ts +++ b/packages/server/src/utils/connector-catalog.ts @@ -1,17 +1,14 @@ import { existsSync } from 'node:fs'; -import { mkdtemp, readdir, readFile, rm, stat } from 'node:fs/promises'; -import { createRequire } from 'node:module'; -import { tmpdir } from 'node:os'; -import { extname, join, relative, resolve } from 'node:path'; +import { readdir, stat } from 'node:fs/promises'; +import { extname, relative, resolve } from 'node:path'; import { fileURLToPath, pathToFileURL } from 'node:url'; -import { build, type Plugin } from 'esbuild'; -import { EXTERNAL_RUNTIME_DEPS } from '../../../connector-worker/src/runtime-deps'; +import { + createConnectorCompiler, + findBundledConnectorFile as findInDirs, +} from '@lobu/connector-worker/compile'; import { extractConnectorMetadata } from './connector-compiler'; import logger from './logger'; -const require_ = createRequire(import.meta.url); -const SDK_ENTRY = require_.resolve('@lobu/connector-sdk'); - const DEFAULT_CONNECTOR_DIR_CANDIDATES = [ // Published CLI runtime: packages/cli/scripts/build.cjs copies bundled // connector source files next to server.bundle.mjs at dist/connectors. @@ -26,42 +23,18 @@ const DEFAULT_CONNECTOR_DIR_CANDIDATES = [ resolve(process.cwd(), 'connectors'), ]; -// Connectors declare their npm deps via `import x from 'npm:foo@1.2.3'`. This -// plugin strips the prefix so esbuild can resolve the bare package against -// node_modules. When the package isn't installed in the gateway image (e.g. -// heavyweight deps like `baileys` that only the worker pod needs), we mark -// the import as external rather than failing the whole bundle. The bundle -// still emits `import 'baileys'` and the worker — which has those packages -// installed — can run it. Without this, one un-installable npm dep takes -// down the entire connector-catalog path, breaking /api/workers/poll for -// every worker. -const npmSpecifierPlugin: Plugin = { - name: 'npm-specifier', - setup(b) { - b.onResolve({ filter: /^npm:/ }, async (args) => { - const bare = args.path - .slice(4) - .replace(/^(@[^/]+\/[^/@]+)@[^/]*/, '$1') - .replace(/^([^/@]+)@[^/]*/, '$1'); - const resolved = await b.resolve(bare, { - resolveDir: args.resolveDir, - kind: args.kind, - }); - if (resolved.errors.length > 0) { - // Package isn't installed in this image — externalize so the bundle - // still produces. Worker runtime supplies the implementation. - // Log so an actual typo or missing-dep regression is diagnosable - // rather than silently producing a bundle that crashes at runtime. - logger.warn( - { package: bare, importer: args.importer }, - 'externalising npm:* import — package not resolvable in gateway image (worker runtime must provide it)' - ); - return { path: bare, external: true, errors: [], warnings: [] }; - } - return resolved; - }); +const connectorCompiler = createConnectorCompiler({ + onUnresolvedNpm: ({ bareSpecifier, importer }) => { + // Package isn't installed in this image — externalize so the bundle + // still produces. Worker runtime supplies the implementation. + // Log so an actual typo or missing-dep regression is diagnosable + // rather than silently producing a bundle that crashes at runtime. + logger.warn( + { package: bareSpecifier, importer }, + 'externalising npm:* import — package not resolvable in gateway image (worker runtime must provide it)' + ); }, -}; +}); type CachedMetadata = | { @@ -134,22 +107,7 @@ const bundledFileCache = new Map(); export function findBundledConnectorFile(key: string): string | null { const cached = bundledFileCache.get(key); if (cached !== undefined) return cached; - // Mirror the worker-side resolver in compile-connector.ts: try the - // subdir layout first (`browser.evaluate` → `browser/evaluate.ts`) and - // fall back to the flat underscore convention (`chrome.tabs` → - // `chrome_tabs.ts`). Keep these in sync if either side changes. - const candidates = [`${key.replace(/\./g, '/')}.ts`, `${key.replace(/\./g, '_')}.ts`]; - let found: string | null = null; - outer: for (const dir of DEFAULT_CONNECTOR_DIR_CANDIDATES) { - for (const fileName of candidates) { - const filePath = resolve(dir, fileName); - if (!filePath.startsWith(`${dir}/`)) continue; - if (existsSync(filePath)) { - found = filePath; - break outer; - } - } - } + const found = findInDirs(key, DEFAULT_CONNECTOR_DIR_CANDIDATES); bundledFileCache.set(key, found); return found; } @@ -222,81 +180,11 @@ export function getConfiguredConnectorCatalogUris(rawUris?: string): string[] { return [...normalized]; } -// Compiling a connector spawns esbuild; on the hot paths (feed sync, worker -// poll) the same bundled .ts file is recompiled repeatedly. Cache the output -// keyed by file mtime so a recompile only happens when the source actually -// changes. Process restart (= deploy, = SDK rebuild) clears it. -// -// LRU-capped: each entry is the full compiled bundle (~13 MB today, smaller -// once non-essential transitive deps are externalized — see -// EXTERNAL_RUNTIME_DEPS). Before the cap, prod was seeing the cache hold -// every connector's bundle indefinitely (~29 × 13 MB = 384 MB resident, the -// dominant heap occupant under the 1 GiB pod limit; see lobu#771 postmortem -// for the heap-snapshot trail). The cap keeps the cache to the working set -// of recently-used connectors and lets V8 reclaim the rest. -const COMPILED_FILE_CACHE_MAX = 8; -const compiledFileCache = new Map(); - -function touchCacheEntry(filePath: string, entry: { mtimeMs: number; code: string }): void { - compiledFileCache.delete(filePath); - compiledFileCache.set(filePath, entry); - while (compiledFileCache.size > COMPILED_FILE_CACHE_MAX) { - const oldest = compiledFileCache.keys().next().value; - if (oldest === undefined) break; - compiledFileCache.delete(oldest); - } -} - -export async function compileConnectorFromFile(filePath: string): Promise { - let mtimeMs: number | null = null; - try { - mtimeMs = (await stat(filePath)).mtimeMs; - const cached = compiledFileCache.get(filePath); - if (cached && cached.mtimeMs === mtimeMs) { - // Move to end of insertion order = mark as most-recently-used. - touchCacheEntry(filePath, cached); - return cached.code; - } - } catch { - // stat failed — fall through and let the build surface the real error. - } - - const tmpDir = await mkdtemp(join(tmpdir(), 'lobu-connector-')); - const outPath = join(tmpDir, 'out.mjs'); - - try { - await build({ - entryPoints: [filePath], - outfile: outPath, - bundle: true, - format: 'esm', - platform: 'node', - target: 'node20', - alias: { lobu: SDK_ENTRY, '@lobu/connector-sdk': SDK_ENTRY }, - banner: { - js: `import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);`, - }, - plugins: [npmSpecifierPlugin], - // Single source of truth: EXTERNAL_RUNTIME_DEPS in - // packages/connector-worker/src/runtime-deps.ts. Only natives / - // runtime-installed deps belong there (playwright ships browsers via - // `npx playwright install`). Pure JS deps (pino, link-preview-js) must be - // bundled — externalising them previously caused every connector run to - // fail with "Cannot find package 'pino'" because the worker image didn't - // ship them. - external: [...EXTERNAL_RUNTIME_DEPS], - write: true, - minify: false, - sourcemap: false, - }); - - const code = await readFile(outPath, 'utf-8'); - if (mtimeMs !== null) touchCacheEntry(filePath, { mtimeMs, code }); - return code; - } finally { - await rm(tmpDir, { recursive: true, force: true }); - } -} +// `compileConnectorFromFile` is owned by `@lobu/connector-worker/compile` +// (LRU-capped at 8 entries, keyed by file mtime, identical to the previous +// implementation that lived here — see lobu#771 for the cap rationale). +// Re-exported here so existing server callers keep their import paths. +export const compileConnectorFromFile = connectorCompiler.compileConnectorFromFile; async function extractConnectorCatalogMetadata( filePath: string From 580b48a20a649b0f3fb9866b7514bd9c7ec0224f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:34:52 +0100 Subject: [PATCH 06/13] =?UTF-8?q?wip(spike):=20fix=20CLI=20runtime=20break?= =?UTF-8?q?=20=E2=80=94=20V1=20IPC=20contract=20in=20connector-run-cmd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-3 follow-up. Spike 92320888 deleted the legacy executor adapter in `@lobu/connector-worker` and migrated `daemon/executor.ts` plus the two `packages/server` call-sites. The CLI's `lobu connector run` path (`packages/cli/src/commands/_lib/connector-run-cmd.ts:434`) was missed and would have crashed at runtime on every invocation. The slip was masked by an `(executeCompiledConnector as any)` cast on the call site — typecheck couldn't see the contract mismatch. `make build-packages` is bundler-only and didn't catch it either. Concrete fixes: - Remove the `as any` cast on the call site so future contract drift is a typecheck error. - Replace the flat-shape call (`{ mode: 'sync', compiledCode, config, checkpoint, env, connectionCredentials, sessionState, credentials, feedKey, entityIds, apiType, hooks: { onContentChunk, collectContents } }`) with the V1 shape: `{ compiledCode, job: { mode: 'sync', config, checkpoint, env, sessionState, credentials, feedKey, entityIds }, hooks: { onEventChunk } }`. - Switch `onContentChunk` → `onEventChunk` and accumulate events into a local `EventEnvelope[]` (the executor used to collect them into `result.contents` for the caller via `collectContents: true`; that path is gone, sync is streaming-only now). - Replace all `result.contents.*` reads with `collectedEvents.*` for the artifact JSON, the summary line, and the `summarizeEvents` call. - Mode-narrow on `result.mode === 'sync'` before reading `result.checkpoint` / `result.metadata`. Verification (typecheck-alone-is-insufficient per the round-3 brief): 1. `bunx tsc --noEmit` in `packages/cli` — clean. 2. `make build-packages` — clean. 3. Runtime smoke test in `packages/cli`: - load `dist/commands/_lib/connector-run-cmd.js` and confirm the `connectorRun` export resolves; - dynamic-import `@lobu/connector-worker/executor/runtime` the same way the CLI does at runtime, confirm `executeCompiledConnector` resolves; - feed it a synthetic 5-line connector whose `sync()` throws `'intentional smoke-test failure'`, with the new V1 job shape; - assert the executor forks a subprocess, the subprocess loads the bundle, the connector throws, and the error round-trips back with the expected payload. Result: `OK: V1 contract end-to-end (subprocess threw expected payload)`. The OLD shape would have died with TypeError on `result.contents.length` or shape-validation in `buildConnectorExecutionContext`. --- .../src/commands/_lib/connector-run-cmd.ts | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/packages/cli/src/commands/_lib/connector-run-cmd.ts b/packages/cli/src/commands/_lib/connector-run-cmd.ts index 9e8e936a3..1f7d7da0a 100644 --- a/packages/cli/src/commands/_lib/connector-run-cmd.ts +++ b/packages/cli/src/commands/_lib/connector-run-cmd.ts @@ -26,6 +26,7 @@ import { homedir } from "node:os"; import { join } from "node:path"; import { printText } from "../memory/_lib/output.js"; +import type { EventEnvelope } from "@lobu/connector-sdk"; import { resolveContext } from "../../internal/context.js"; import { getUsableToken, resolveOrg } from "../memory/_lib/openclaw-auth.js"; import { @@ -422,31 +423,37 @@ export async function connectorRun( process.once("SIGINT", () => onSignal("SIGINT")); process.once("SIGTERM", () => onSignal("SIGTERM")); - // Stream progress to stderr (so --json output to stdout stays clean). - let streamedCount = 0; - const onContentChunk = (items: any[]) => { - streamedCount += items.length; - process.stderr.write(` ... ${streamedCount} events so far\n`); + // Sync events stream via the onEventChunk hook now (the executor used to + // collect them into result.contents for us — that path is gone). Collect + // locally so the artifact/--json output still has the full payload. + const collectedEvents: EventEnvelope[] = []; + const onEventChunk = (events: EventEnvelope[]) => { + for (const event of events) collectedEvents.push(event); + process.stderr.write(` ... ${collectedEvents.length} events so far\n`); }; printText(`Running ${connectorKey} (subprocess executor)...`); const startMs = Date.now(); - const result = await (executeCompiledConnector as any)({ - mode: "sync", + const result = await executeCompiledConnector({ compiledCode, - config: mergedConfig, - checkpoint, - env: process.env as Record, - connectionCredentials: {}, - sessionState, - credentials: {}, - feedKey: feed?.feed_key ?? `${connectorKey}-cli-run`, - entityIds: [], - apiType: "browser" as const, - hooks: { onContentChunk, collectContents: true }, + job: { + mode: "sync", + config: mergedConfig, + checkpoint, + env: process.env as Record, + sessionState, + credentials: null, + feedKey: feed?.feed_key ?? `${connectorKey}-cli-run`, + entityIds: [], + }, + hooks: { onEventChunk }, }); const durationMs = Date.now() - startMs; + if (result.mode !== "sync") { + throw new Error(`Expected sync result, got mode=${result.mode}`); + } + // Save artifact for debugging / sharing. ~/.lobu/cache/connector-runs/.json. const cacheDir = join(homedir(), ".lobu", "cache", "connector-runs"); mkdirSync(cacheDir, { recursive: true }); @@ -459,9 +466,9 @@ export async function connectorRun( config: mergedConfig, input_checkpoint: checkpoint, duration_ms: durationMs, - event_count: result.contents.length, + event_count: collectedEvents.length, next_checkpoint: result.checkpoint, - events: result.contents, + events: collectedEvents, metadata: result.metadata ?? {}, }; await writeFile(artifactPath, JSON.stringify(artifact, null, 2), "utf-8"); @@ -471,10 +478,10 @@ export async function connectorRun( } else { printText(""); printText(`✓ Completed in ${(durationMs / 1000).toFixed(1)}s`); - printText(` Events: ${result.contents.length}`); - if (result.contents.length > 0) { + printText(` Events: ${collectedEvents.length}`); + if (collectedEvents.length > 0) { printText(" Sample:"); - printText(summarizeEvents(result.contents)); + printText(summarizeEvents(collectedEvents)); } if (result.checkpoint) { printText(` Next checkpoint: ${JSON.stringify(result.checkpoint)}`); From aa4b09f570e37e8f5ce14b1aac03324f689030ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:36:37 +0100 Subject: [PATCH 07/13] wip(spike): make sync executor result streaming-only, strip magic-key comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-3 follow-up. The `sync` arm of `ExecutorResult` advertised `events: EventEnvelope[]` but `child-runner.ts` always returned `events: []` because the V1 contract streams events via `hooks.onEventChunk` and the executor doesn't collect on the result side. The shape was lying. Picked the option the code already implements: sync is streaming-only. Concrete changes: - `packages/connector-worker/src/executor/interface.ts`: - `ExecutorResult` sync arm loses the `events` field. New doc-block spells out the contract ("events leave via `hooks.onEventChunk`, never collected onto the result — callers that need a list build it themselves in the hook") and points at the CLI as the reference consumer. - Top-of-file comment no longer mentions the long-dead `__action_key` / `__feed_key` / `__auth_mode` magic-key protocol. `git grep` for those identifiers across the monorepo now returns zero hits (verified post-commit). - `packages/connector-worker/src/executor/child-runner.ts`: - The post-`sync()` `events: []` field is gone from the returned `ExecutorResult`. - Renamed the local `events` variable to `trailingEvents` to make it obvious it captures the *return-value* path (connectors that build the full list before returning) as distinct from incremental `emitEvents()` calls during sync. Both paths still funnel through the same `event_chunk` IPC so the parent sees one uniform stream. Verification: - Monorepo grep `rg -n '__action_key|__feed_key|__auth_mode' packages/ examples/ scripts/ config/`: zero hits. - `rg -n 'result\.events' packages/connector-worker/src packages/cli/src packages/server/src`: the remaining hits are all unrelated (chat SSE `eventsUrl`, embedding-fetch JSON response, the child-runner's own read of `syncResult.events` from the SDK type — which is the connector's `SyncResult.events`, not the executor's). No consumer reads sync `result.events` from `ExecutorResult` anywhere. - Typecheck: `connector-sdk`, `connector-worker`, `cli`, `server` clean; `connectors` has only pre-existing whatsapp/dom-lib drift on main. --- .../connector-worker/src/executor/child-runner.ts | 12 ++++++++---- packages/connector-worker/src/executor/interface.ts | 10 ++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/packages/connector-worker/src/executor/child-runner.ts b/packages/connector-worker/src/executor/child-runner.ts index 0d43a05c7..6ab5514d6 100644 --- a/packages/connector-worker/src/executor/child-runner.ts +++ b/packages/connector-worker/src/executor/child-runner.ts @@ -149,19 +149,23 @@ async function executeConnectorRuntime( updateCheckpoint, })) as SyncResult; - const events = Array.isArray(syncResult?.events) ? syncResult.events : []; - await emitEvents(events); + // Sync is streaming-only on the executor boundary: connectors that build + // the full list before returning still arrive here as `syncResult.events`, + // we just forward them through the same `emitEvents` IPC channel so the + // parent sees one uniform stream regardless of whether the connector + // streamed incrementally or returned in one shot. + const trailingEvents = Array.isArray(syncResult?.events) ? syncResult.events : []; + await emitEvents(trailingEvents); return { mode: 'sync', - events: [], checkpoint: (syncResult?.checkpoint ?? null) as Record | null, auth_update: syncResult?.auth_update ?? null, metadata: { items_found: typeof syncResult?.metadata?.items_found === 'number' ? syncResult.metadata.items_found - : events.length, + : trailingEvents.length, items_skipped: typeof syncResult?.metadata?.items_skipped === 'number' ? syncResult.metadata.items_skipped diff --git a/packages/connector-worker/src/executor/interface.ts b/packages/connector-worker/src/executor/interface.ts index c2b218453..771155bb5 100644 --- a/packages/connector-worker/src/executor/interface.ts +++ b/packages/connector-worker/src/executor/interface.ts @@ -2,8 +2,8 @@ import type { AuthResult, EventEnvelope, SyncCredentials } from '@lobu/connector /** * Executor mode discriminator. The executor speaks the same V1 SDK shapes - * the connector code expects — no more magic `__action_key` / `__feed_key` / - * `__auth_mode` packing. + * the connector code expects: `SyncContext` / `ActionContext` / `AuthContext` + * in, `SyncResult` / `ActionResult` / `AuthResult` out, no envelope. */ export type ExecutorJob = | { @@ -34,12 +34,14 @@ export type ExecutorJob = /** * Result shape returned by the executor. One discriminated union per mode - * mirrors the SDK's `SyncResult` / `ActionResult` / `AuthResult` directly. + * mirrors the SDK's `ActionResult` / `AuthResult` directly. Sync is + * streaming-only: events leave via `hooks.onEventChunk`, never collected + * onto the result — callers that need a list build it themselves in the + * hook (see e.g. `packages/cli/src/commands/_lib/connector-run-cmd.ts`). */ export type ExecutorResult = | { mode: 'sync'; - events: EventEnvelope[]; checkpoint: Record | null; auth_update?: Record | null; metadata?: Record; From 3b0cbcb6d48666e36021e39d59a91978dc32f7a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:38:06 +0100 Subject: [PATCH 08/13] =?UTF-8?q?wip(spike):=20close=20server=E2=86=92work?= =?UTF-8?q?er=20source=20leaks=20(relative-path=20imports)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-3 follow-up. Two server files still reached into `packages/connector-worker/src/runtime-deps.ts` via relative path even after round-2 spike `ad4e92d6` added `@lobu/connector-worker/compile` and declared the package dep. Cross-package leaks like these silently break the moment the worker's source layout shifts (dist-only consumer runtimes, esm-vs-cjs flips, file relocations) and they're the exact pattern round-3 was about closing. - `packages/server/src/server.ts:44`: `import { assertExternalDepsResolvable } from '../../connector-worker/src/runtime-deps'` - `packages/server/src/utils/connector-compiler.ts:8`: `import { EXTERNAL_RUNTIME_DEPS } from '../../../connector-worker/src/runtime-deps'` Both now go through `@lobu/connector-worker/compile`. The compile module already re-exported `EXTERNAL_RUNTIME_DEPS` (round-2 spike `ad4e92d6`); this spike adds `assertExternalDepsResolvable` to the same export so the boot-time helper has a public-export path too. Both belong in `connector-worker` (not `@lobu/core`) because they describe deps the worker's compile pipeline externalises — moving them out would split that concern across two packages. Verification: - `rg -n "from ['\"]\\.\\./\\.\\./.*connector-worker|from ['\"]\\.\\./\\.\\./\\.\\./connector-worker" packages/server packages/cli --type=ts`: zero hits. - `make build-packages`: clean. - `bunx tsc --noEmit` in `packages/server`: clean. --- packages/connector-worker/src/compile/index.ts | 2 +- packages/server/src/server.ts | 2 +- packages/server/src/utils/connector-compiler.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/connector-worker/src/compile/index.ts b/packages/connector-worker/src/compile/index.ts index 8c5575a51..f0a9b4b0e 100644 --- a/packages/connector-worker/src/compile/index.ts +++ b/packages/connector-worker/src/compile/index.ts @@ -28,7 +28,7 @@ import { join, resolve } from 'node:path'; import { build, type Plugin } from 'esbuild'; import { EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; -export { EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; +export { assertExternalDepsResolvable, EXTERNAL_RUNTIME_DEPS } from '../runtime-deps.js'; // Strict regex for connector_key: lowercase letters/digits, optional dots // for namespacing, underscores for word separators. Defense-in-depth even diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index a5ef10e10..5c92caba2 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -41,7 +41,7 @@ import { import { startStaleRunReaper } from './scheduled/check-stalled-executions'; import { bootTaskScheduler } from './scheduled/jobs'; import * as Sentry from '@sentry/node'; -import { assertExternalDepsResolvable } from '../../connector-worker/src/runtime-deps'; +import { assertExternalDepsResolvable } from '@lobu/connector-worker/compile'; import { isSentryReported, markSentryReported } from './sentry'; import { getEnvFromProcess } from './utils/env'; import logger from './utils/logger'; diff --git a/packages/server/src/utils/connector-compiler.ts b/packages/server/src/utils/connector-compiler.ts index 5ac6b3ee9..fc2b616aa 100644 --- a/packages/server/src/utils/connector-compiler.ts +++ b/packages/server/src/utils/connector-compiler.ts @@ -5,7 +5,7 @@ * and metadata extraction (finds ConnectorRuntime subclass with sync()+execute()). */ -import { EXTERNAL_RUNTIME_DEPS } from '../../../connector-worker/src/runtime-deps'; +import { EXTERNAL_RUNTIME_DEPS } from '@lobu/connector-worker/compile'; import { type CompileResult, compileSource, extractMetadata } from './compiler-core'; export interface ConnectorMetadata { From 793f1dfb4a9007f31fd95f8d3f97ee986204a792 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:41:13 +0100 Subject: [PATCH 09/13] wip(spike): drop _ctx prefix on base execute(), update README guidance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-3 follow-up covering targets 4 and 5. - `packages/connector-sdk/src/connector-runtime.ts:72`: dropped the `_` prefix on the `ctx` parameter of the base `execute()`. The repo convention (AGENTS.md: "When fixing unused-parameter errors, delete the parameter rather than prefixing with `_`") doesn't apply literally here because the parameter is contractually required: subclasses overriding `execute(ctx: ActionContext)` rely on the base signature accepting an `ActionContext`. Verified by deleting the param and watching every action-supporting connector (`google_calendar`, `google_gmail`, `chrome`, `revolut`) fail typecheck with `Target signature provides too few arguments. Expected 1 or more, but got 0.`. Restored the param without the underscore and added a single `// biome-ignore lint/correctness/noUnusedFunctionParameters` line with the rationale ("contract signature — subclasses receive the full ActionContext") so future readers see why the param looks unused but stays. - `packages/connectors/src/README.md:353`: deleted the paragraph that told new connector authors to paste the 3-line "actions not supported" stub. Replaced with one sentence pointing at the new default impl on `ConnectorRuntime` ("do nothing — the base class ships a default `execute()` that returns `{ success: false, error: 'Actions not supported' }`. Omit the method entirely."). Verification: - `make typecheck` from the worktree root: clean. - 19 connectors that already lost the stub in spike 02f990fb still compile against the new base. --- packages/connector-sdk/src/connector-runtime.ts | 5 ++++- packages/connectors/src/README.md | 11 ++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/connector-sdk/src/connector-runtime.ts b/packages/connector-sdk/src/connector-runtime.ts index d04b8e598..93e218df9 100644 --- a/packages/connector-sdk/src/connector-runtime.ts +++ b/packages/connector-sdk/src/connector-runtime.ts @@ -65,11 +65,14 @@ export abstract class ConnectorRuntime { * Called either inline (low-risk) or by the worker (high-risk with approval). * Default implementation rejects with "Actions not supported" — connectors * that don't declare any `actions` in their definition need not override. + * The `ctx` parameter is part of the public contract (subclasses overriding + * this method receive the full `ActionContext`); the base impl ignores it. * * @param ctx - Action context with action key, input, and credentials * @returns Action result with output data */ - async execute(_ctx: ActionContext): Promise { + // biome-ignore lint/correctness/noUnusedFunctionParameters: contract signature — subclasses receive the full ActionContext + async execute(ctx: ActionContext): Promise { return { success: false, error: 'Actions not supported' }; } diff --git a/packages/connectors/src/README.md b/packages/connectors/src/README.md index dbf3584a7..96470bd99 100644 --- a/packages/connectors/src/README.md +++ b/packages/connectors/src/README.md @@ -350,13 +350,10 @@ async execute(ctx: ActionContext): Promise { } ``` -If your connector doesn't support actions: - -```typescript -async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; -} -``` +If your connector doesn't support actions, do nothing — the base +`ConnectorRuntime` class ships a default `execute()` that returns +`{ success: false, error: 'Actions not supported' }`. Omit the method +entirely. ## Options Schema From 4e595af07cb57ae745c4c49755239cf4a2301e57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 18:51:15 +0100 Subject: [PATCH 10/13] wip(spike): clean up stale doc references after V1 IPC migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-4 doc polish. - `packages/connectors/src/README.md`: the Quick Start (top of file) still showed the deleted `execute(_ctx: ActionContext): Promise { return { success: false, error: 'Actions not supported' }; }` stub on every new connector — contradicting the actions-not-supported guidance later in the same file ("omit the method entirely"). Removed the stub from the Quick Start class body and dropped the now-unused `ActionContext` / `ActionResult` imports from its import block. The later section that does show actions (the `github`-style example around line 331) still imports them locally and is unchanged. - `packages/cli/src/commands/_lib/connector-run-cmd.ts:344`: comment said "Build the SyncContext shape that executeCompiledConnector expects" — but as of spike `76a4677c` we build an `ExecutorJob`, not a `SyncContext`. One-word edit. Verification: - `rg -n "execute\(_ctx|ActionContext|ActionResult" packages/connectors/src/README.md` returns one hit (line 331 — the actions-supported example), not the Quick Start or the no-actions guidance. - `make typecheck`: clean. --- packages/cli/src/commands/_lib/connector-run-cmd.ts | 2 +- packages/connectors/src/README.md | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/cli/src/commands/_lib/connector-run-cmd.ts b/packages/cli/src/commands/_lib/connector-run-cmd.ts index 1f7d7da0a..75957cc95 100644 --- a/packages/cli/src/commands/_lib/connector-run-cmd.ts +++ b/packages/cli/src/commands/_lib/connector-run-cmd.ts @@ -341,7 +341,7 @@ export async function connectorRun( printText(`Compiling ${connectorKey} from ${sourcePath}...`); const compiledCode = await compileConnectorFromFile(sourcePath); - // Build the SyncContext shape that executeCompiledConnector expects. + // Build the ExecutorJob shape that executeCompiledConnector expects. // For mirror profiles we layer two acquisition paths: // 1. DevToolsActivePort lookup against the source Chrome's // user-data root. If the file is there, Chrome is exposing a diff --git a/packages/connectors/src/README.md b/packages/connectors/src/README.md index 96470bd99..c3f74ff9e 100644 --- a/packages/connectors/src/README.md +++ b/packages/connectors/src/README.md @@ -10,8 +10,6 @@ import { ConnectorRuntime, type SyncContext, type SyncResult, - type ActionContext, - type ActionResult, type EventEnvelope, } from '@lobu/connector-sdk'; @@ -63,10 +61,6 @@ export default class MyConnector extends ConnectorRuntime { metadata: { items_found: events.length }, }; } - - async execute(_ctx: ActionContext): Promise { - return { success: false, error: 'Actions not supported' }; - } } ``` From b37481ea015234ecd835fc3c7dba99f77d82fca7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 19:02:05 +0100 Subject: [PATCH 11/13] wip(spike): refresh README + integration-test fixtures to V1 shapes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-5 polish — final doc/fixture drift after the V1 IPC migration. - `packages/connectors/src/README.md`: rewrote the "How connector code runs" section. Pre-spike it described the gateway shipping ~13 MB `compiled_code` inline in every worker-poll response and the worker loading it blindly. After spikes `ad4e92d6` (compile pipeline consolidation) and `92320888` (V1 IPC), fleet workers receive only `connector_key` and compile locally via the shared `@lobu/connector-worker/compile` module; the gateway only inlines `compiled_code` for device/DB-only workers without source on disk. Also updated the IPC step to name the live wire format (`ExecutorJob` / `ExecutorResult`, `event_chunk` for streaming). - `packages/connector-worker/integration-tests/subprocess.test.ts` + `subprocess-source-mode.test.ts`: fixtures still constructed a `BASE_CONTEXT: SyncContext` shaped as the deleted V0 `{ options, checkpoint, env, apiType }`. Tests still passed because the fixture was passed positionally and the diagnostic-capture tests trigger crash/throw paths before the executor inspects most fields, but the comment + type alias misled future readers. Migrated to `BASE_JOB: ExecutorJob` with the V1 shape (`mode: 'sync'`, `feedKey`, `config`, `checkpoint`, `entityIds`, `credentials`, `sessionState`, `env`). Also fixed the synthetic-connector `compiled()` template: the inline `execute()` returned `{ contents: [], checkpoint: null }` (V0 `FeedSyncResult` shape) — replaced with V1 `ActionResult` (`{ success: false, error: 'no actions' }`) so the fixture is internally consistent with what `findRuntimeClass` expects. Verification: - `bun test packages/connector-worker/integration-tests`: 8 pass, 0 fail. - `rg -n "\\bSyncContext\\b" packages/connector-worker packages/cli/src`: remaining hits are all live SDK-type references (the executor interface comment names the SDK type, the integration test's comment explains the synthetic `sync(_ctx)` receives a V1 `SyncContext`) — no stale fixture references. README `SyncContext` references all document the live SDK type. - `make typecheck`: clean. --- .../subprocess-source-mode.test.ts | 19 +++++---- .../integration-tests/subprocess.test.ts | 39 ++++++++++++------- packages/connectors/src/README.md | 12 +++--- 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts b/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts index eb03dbbde..3bdee9591 100644 --- a/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts +++ b/packages/connector-worker/integration-tests/subprocess-source-mode.test.ts @@ -17,23 +17,28 @@ * natively, so the import works as-is. */ import { describe, expect, test } from 'bun:test'; -import type { SyncContext } from '../src/executor/interface.ts'; +import type { ExecutorJob } from '../src/executor/interface.ts'; import { SubprocessError, SubprocessExecutor } from '../src/executor/subprocess.ts'; -const BASE_CONTEXT: SyncContext = { - options: {} as any, +// Minimal V1 ExecutorJob — see subprocess.test.ts for shape rationale. +const BASE_JOB: ExecutorJob = { + mode: 'sync', + feedKey: 'integration-test', + config: {}, checkpoint: null, + entityIds: [], + credentials: null, + sessionState: null, env: {}, - apiType: 'api', }; function compiled(body: string): string { return ` class ConnectorRuntime { - async sync(_ctx, _hooks) { + async sync(_ctx) { ${body} } - async execute() { return { contents: [], checkpoint: null }; } + async execute() { return { success: false, error: 'no actions' }; } } module.exports = { ConnectorRuntime }; `; @@ -52,7 +57,7 @@ describe('SubprocessExecutor (source-mode, Bun runtime)', () => { console.log('source-mode child ran'); process.exit(1); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; diff --git a/packages/connector-worker/integration-tests/subprocess.test.ts b/packages/connector-worker/integration-tests/subprocess.test.ts index 7937ad138..2bc8077b3 100644 --- a/packages/connector-worker/integration-tests/subprocess.test.ts +++ b/packages/connector-worker/integration-tests/subprocess.test.ts @@ -12,23 +12,36 @@ * runs that haven't built the workspace. */ import { describe, expect, test } from 'bun:test'; -import type { SyncContext } from '../dist/executor/interface.js'; +import type { ExecutorJob } from '../dist/executor/interface.js'; import { SubprocessError, SubprocessExecutor } from '../dist/executor/subprocess.js'; -const BASE_CONTEXT: SyncContext = { - options: {} as any, +// Minimal V1 ExecutorJob — the subprocess executor only reads what each +// connector body needs; the diagnostic tests below exercise the crash / +// throw / redact paths, which don't touch most fields, but the shape +// itself has to be valid `ExecutorJob` so the parent's IPC envelope is +// well-formed. +const BASE_JOB: ExecutorJob = { + mode: 'sync', + feedKey: 'integration-test', + config: {}, checkpoint: null, + entityIds: [], + credentials: null, + sessionState: null, env: {}, - apiType: 'api', }; +// Synthetic connector source. The class needs `sync` and `execute` on its +// prototype for `findRuntimeClass` to accept it. `sync` receives a V1 +// `SyncContext` (with `emitEvents` / `updateCheckpoint` hooks), `execute` +// returns a V1 `ActionResult` — both unused by these crash-path tests. function compiled(body: string): string { return ` class ConnectorRuntime { - async sync(_ctx, _hooks) { + async sync(_ctx) { ${body} } - async execute() { return { contents: [], checkpoint: null }; } + async execute() { return { success: false, error: 'no actions' }; } } module.exports = { ConnectorRuntime }; `; @@ -45,7 +58,7 @@ describe('SubprocessExecutor diagnostic capture', () => { console.log('about to die hard'); process.exit(1); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -65,7 +78,7 @@ describe('SubprocessExecutor diagnostic capture', () => { compiled(` throw new Error('connector blew up'); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -84,7 +97,7 @@ describe('SubprocessExecutor diagnostic capture', () => { setTimeout(() => { throw new Error('async tick throw'); }, 0); await new Promise(() => {}); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -103,7 +116,7 @@ describe('SubprocessExecutor diagnostic capture', () => { Promise.reject(new Error('dangling rejection')); await new Promise(() => {}); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -123,7 +136,7 @@ describe('SubprocessExecutor diagnostic capture', () => { console.error('CH_API_KEY=longvaluesecret789'); process.exit(1); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -142,7 +155,7 @@ describe('SubprocessExecutor diagnostic capture', () => { compiled(` throw new Error('upstream failed: api_key=sk_live_abcdefghijklmn123'); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; @@ -163,7 +176,7 @@ describe('SubprocessExecutor diagnostic capture', () => { compiled(` await new Promise(() => {}); `), - BASE_CONTEXT + BASE_JOB ); } catch (e) { err = e as SubprocessError; diff --git a/packages/connectors/src/README.md b/packages/connectors/src/README.md index c3f74ff9e..a4da2ae8b 100644 --- a/packages/connectors/src/README.md +++ b/packages/connectors/src/README.md @@ -493,12 +493,12 @@ Connectors can also be installed manually via `client.connections.installConnect ### How connector code runs -1. The server checks `connector_versions.compiled_code` — if present, uses it directly -2. If `compiled_code` is NULL (bundled connectors), it compiles from `connectors/{source_path}` on disk via esbuild -3. The compiled code is sent to the worker, written to a temp file (`.connector-child-{pid}.mjs`), and loaded via dynamic `import()` -4. Each sync/action runs in an **isolated child process** with a 10-minute timeout and 512MB memory limit -5. The child process has a restricted environment — only `PATH`, `HOME`, `TMPDIR`, `TZ`, `NODE_ENV`, `NODE_PATH`, and `PLAYWRIGHT_BROWSERS_PATH` are available as env vars -6. Secrets flow through `ctx.credentials` and `ctx.config`, not environment variables +1. For fleet workers and embedded-mode hosts (worker + gateway share a host), the gateway sends only `connector_key` in the worker-poll response — both pods have the `.ts` source on disk, and the worker compiles locally via the shared pipeline at `@lobu/connector-worker/compile`. For DB-only / device workers without source on disk, the gateway sends `compiled_code` inline. +2. The compiled bundle is written to a temp file (`.connector-child-{pid}-{rand}.mjs`) under cwd and loaded via dynamic `import()` inside a forked child process. +3. The parent and child speak `ExecutorJob` / `ExecutorResult` over IPC — the same V1 SDK shapes (`SyncContext` / `ActionContext` / `AuthContext` in, `SyncResult` / `ActionResult` / `AuthResult` out, no envelope). Sync events stream via `event_chunk` IPC messages as the connector emits them. +4. Each sync/action runs in an **isolated child process** with a 10-minute timeout and 512MB memory limit. +5. The child process has a restricted environment — only `PATH`, `HOME`, `TMPDIR`, `TZ`, `NODE_ENV`, `NODE_PATH`, and `PLAYWRIGHT_BROWSERS_PATH` are available as env vars. +6. Secrets flow through `ctx.credentials` and `ctx.config`, not environment variables. This means edits to `.ts` files in `connectors/` take effect on the next sync without reinstalling. From a43d003ff98f7635e2456d123d5d86fc8add4669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 19:05:48 +0100 Subject: [PATCH 12/13] wip(spike): fix stale ./build doc-comment after compile/ rename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-6 one-liner. compile-connector.ts:17 referenced './build' but the shared module was renamed to './compile' in round-2 (the gitignore gotcha — '\*\*/build/' silently swallowed the original dir name). Comment now matches the actual import path. --- packages/connector-worker/src/compile-connector.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/connector-worker/src/compile-connector.ts b/packages/connector-worker/src/compile-connector.ts index 3521784ab..d357c60ad 100644 --- a/packages/connector-worker/src/compile-connector.ts +++ b/packages/connector-worker/src/compile-connector.ts @@ -14,7 +14,7 @@ * inline — they don't have the connectors directory on disk. * * The resolver + esbuild bundle pipeline themselves are owned by the - * shared `./build` module so the gateway and CLI sides don't drift. + * shared `./compile` module so the gateway and CLI sides don't drift. * This file just supplies the worker-image-specific candidate dirs. */ From 8563dc377be6e5d1aa7082c2138b411bb995158d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 19 May 2026 20:14:41 +0100 Subject: [PATCH 13/13] fix(ci): build connector-worker before integration vitest suite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The integration job built only `core` + `connector-sdk` before invoking the server's vitest suite. After spike ad4e92d6 the server's `packages/server/src/utils/connector-catalog.ts` imports `@lobu/connector-worker/compile` — Node's package resolver follows the `exports` field to `./dist/compile/index.js` and fails when dist is absent. vitest's vite-node loader propagates the failure as a load error, breaking every integration test whose import graph transitively touches `queue-helpers` / `worker-api` / `connector-catalog` — all 30 files in the suite. The unit job already builds connector-worker (line 57-67); the integration job had a smaller list that pre-dated the shared-compile-module refactor. Root cause verification: 1. Reproduced locally by `rm -rf packages/connector-worker/dist` and running `LOBU_TEST_BACKEND=pglite vitest run` from packages/server: `Error: Failed to load url @lobu/connector-worker/compile … Does the file exist?` on every integration file — same symptom as the CI run #26117699107 attached to PR #931. 2. Rebuilt connector-worker (`cd packages/connector-worker && bun run build`), re-ran the same vitest invocation — 73 of 76 files load and pass. The 2 remaining failures are pre-existing on main (`run-script-runtime.test.ts` needs Node 22's isolated-vm abi127 prebuild; `install-operator.test.ts` fails identically on origin/main when checked out and re-run, both unrelated to this fix). Considered alternatives: - `"bun": "./src/compile/index.ts"` conditional export — rejected: the integration job runs Node (line 142 of ci.yml: "We run them under Node (not bun) … the sandbox runtime requires isolated-vm which is a V8 native addon that bun cannot load"), so the `bun` condition wouldn't activate. - `"node": "./src/compile/index.ts"` source-mapped export — rejected: the source is .ts; Node can't load .ts without a loader, and installing one for CI tests would also slow every Node consumer in production. The build-dist approach matches every other workspace package. - Switching to relative imports — rejected: that's the leak round 3 closed. The fix belongs in CI, not by undoing the package boundary. The added embeddings build is required because connector-worker's src/embeddings.ts imports `@lobu/embeddings` and resolves it via the workspace symlink → dist. Same dependency order the unit job already honors (line 66 of the unit job builds both in the same order). --- .github/workflows/ci.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b93af865..614255caf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -192,9 +192,21 @@ jobs: run: bun install - name: Build packages server depends on + # embeddings → connector-worker order matters: connector-worker's + # src/embeddings.ts imports `@lobu/embeddings` and resolves it via + # the workspace symlink → dist. connector-worker itself is required + # because `packages/server/src/utils/connector-catalog.ts` imports + # `@lobu/connector-worker/compile` (the shared compile pipeline), + # which resolves via the package's `exports` field to + # `./dist/compile/index.js`. Vitest's vite-node resolver follows + # the `import` condition and fails to load if dist is absent — + # which transitively breaks every integration file whose import + # graph touches `queue-helpers` / `worker-api` / `connector-catalog`. run: | cd packages/core && bun run build && cd ../.. cd packages/connector-sdk && bun run build && cd ../.. + cd packages/embeddings && bun run build && cd ../.. + cd packages/connector-worker && bun run build && cd ../.. - name: Verify Postgres health (fail fast if pgvector setup is broken) run: |