diff --git a/packages/client/package.json b/packages/client/package.json index 03c0bb819..c03830070 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -9,6 +9,10 @@ ".": { "types": "./dist/index.d.ts", "import": "./dist/index.js" + }, + "./node": { + "types": "./dist/node.d.ts", + "import": "./dist/node.js" } }, "files": [ diff --git a/packages/client/src/__tests__/node.test.ts b/packages/client/src/__tests__/node.test.ts new file mode 100644 index 000000000..a5c5808c8 --- /dev/null +++ b/packages/client/src/__tests__/node.test.ts @@ -0,0 +1,99 @@ +import { describe, expect, test } from "bun:test"; +import { action, defineConnector, Lobu } from "../node.js"; + +describe("hosted connector helpers", () => { + test("registers metadata-only connectors", async () => { + let body: Record | undefined; + const lobu = new Lobu({ + baseUrl: "https://lobu.test/lobu", + org: "acme", + token: "token", + fetch: (async (...args) => { + body = JSON.parse(String(args[1]?.body)) as Record; + return json({ ok: true }); + }) as typeof fetch, + }); + + await lobu.connectors.register( + defineConnector({ + key: "app.crm", + name: "CRM", + version: "1.0.0", + actions: { + refund: action({ + key: "refund", + name: "Refund", + execute: async () => ({ ok: true }), + }), + }, + }) + ); + + expect(body?.action).toBe("install_connector"); + expect((body?.connector_definition as { key?: string }).key).toBe( + "app.crm" + ); + }); + + test("serves action runs through worker endpoints", async () => { + const controller = new AbortController(); + const completed: Record[] = []; + const lobu = new Lobu({ + baseUrl: "https://lobu.test/lobu", + org: "acme", + token: "token", + fetch: (async (input, init) => { + const url = String(input); + if (url.endsWith("/api/workers/poll")) { + return json({ + run_id: 7, + run_type: "action", + connector_key: "app.crm", + action_key: "refund", + action_input: { amount: 10 }, + }); + } + if (url.endsWith("/api/workers/complete-action")) { + completed.push( + JSON.parse(String(init?.body)) as Record + ); + controller.abort(); + return json({ success: true }); + } + throw new Error(`unexpected url ${url}`); + }) as typeof fetch, + }); + + await lobu.connectors.serve( + defineConnector({ + key: "app.crm", + name: "CRM", + version: "1.0.0", + actions: { + refund: action<{ amount: number }>({ + key: "refund", + name: "Refund", + execute: async (_ctx, input) => ({ refunded: input.amount }), + }), + }, + }), + { workerId: "worker-1", signal: controller.signal } + ); + + expect(completed).toEqual([ + { + run_id: 7, + worker_id: "worker-1", + status: "success", + action_output: { refunded: 10 }, + }, + ]); + }); +}); + +function json(body: unknown): Response { + return new Response(JSON.stringify(body), { + status: 200, + headers: { "content-type": "application/json" }, + }); +} diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 375d96a80..521242ffe 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -11,9 +11,11 @@ export class Lobu { constructor(options: LobuClientOptions) { this.rest = new LobuRestClient({ baseUrl: options.baseUrl, + apiBaseUrl: options.apiBaseUrl, token: options.token, fetch: options.fetch ?? globalThis.fetch.bind(globalThis), headers: options.headers, + org: options.org, }); this.sessions = { create: (input) => this.createSession(input), diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 479731470..2b3ae83ca 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -8,6 +8,7 @@ export type { LobuClientOptions, LobuFetch, LobuHeaders, + LobuInternalRequestOptions, LobuSseEvent, SendMessageOptions, SendMessageResponse, diff --git a/packages/client/src/node.ts b/packages/client/src/node.ts new file mode 100644 index 000000000..3ec0a3976 --- /dev/null +++ b/packages/client/src/node.ts @@ -0,0 +1,393 @@ +import { Lobu as BaseLobu } from "./client.js"; +import type { LobuInternalRequestOptions } from "./types.js"; + +export class Lobu extends BaseLobu { + readonly connectors = { + register: ( + connector: HostedConnector, + options?: LobuInternalRequestOptions + ) => registerConnector(this, connector, options), + serve: ( + connector: HostedConnector, + options?: { workerId?: string; label?: string; signal?: AbortSignal } + ) => serveConnector(this, connector, options), + }; +} + +export type JsonSchema = Record; + +export interface HostedFeedContext< + C = Record, + K = Record, +> { + runId: number; + feedId?: number; + connectionId?: number; + config: C; + checkpoint?: K; + entityIds?: number[]; + emit: (items: HostedEvent[], checkpoint?: K) => Promise; +} + +export interface HostedActionContext { + runId: number; + connectionId?: number; +} + +export interface HostedEvent { + id: string; + title?: string; + payload_type?: "text" | "markdown" | "json_template" | "media" | "empty"; + payload_text: string; + payload_data?: Record; + payload_template?: Record | null; + attachments?: unknown[]; + author_name?: string; + occurred_at?: string | Date; + source_url?: string; + score?: number; + metadata?: Record; + origin_parent_id?: string; + origin_type?: string; + semantic_type?: string; +} + +export interface HostedFeed< + C = Record, + K = Record, +> { + key: string; + name: string; + description?: string; + configSchema?: JsonSchema; + eventKinds?: Record>; + sync: ( + ctx: HostedFeedContext + ) => Promise<{ checkpoint?: K } | undefined>; +} + +export interface HostedAction< + I = Record, + O = Record, +> { + key: string; + name: string; + description?: string; + requiresApproval?: boolean; + inputSchema?: JsonSchema; + outputSchema?: JsonSchema; + execute: (ctx: HostedActionContext, input: I) => Promise; +} + +export interface HostedConnector { + key: string; + name: string; + version: string; + description?: string; + authSchema?: JsonSchema | null; + optionsSchema?: JsonSchema | null; + faviconDomain?: string | null; + feeds?: Record; + actions?: Record; +} + +export function defineConnector(connector: HostedConnector): HostedConnector { + return connector; +} + +export function feed, K = Record>( + value: HostedFeed +): HostedFeed { + return value; +} + +export function action< + I = Record, + O = Record, +>(value: HostedAction): HostedAction { + return value; +} + +export async function registerConnector( + lobu: BaseLobu, + connector: HostedConnector, + options: LobuInternalRequestOptions = {} +): Promise { + return lobu.rest.tool( + "manage_connections", + { + action: "install_connector", + connector_definition: toDefinition(connector), + }, + options + ); +} + +export async function serveConnector( + lobu: BaseLobu, + connector: HostedConnector, + options: { workerId?: string; label?: string; signal?: AbortSignal } = {} +): Promise { + const workerId = options.workerId ?? crypto.randomUUID(); + while (!options.signal?.aborted) { + try { + const poll = await lobu.rest.worker( + "/poll", + { + worker_id: workerId, + platform: "node", + label: options.label ?? connector.name, + connector_keys: [connector.key], + }, + { signal: options.signal } + ); + + if (!poll.run_id) { + await sleep((poll.next_poll_seconds ?? 10) * 1000, options.signal); + continue; + } + + if (poll.connector_key !== connector.key) { + await failRun( + lobu, + workerId, + poll, + `Worker for ${connector.key} cannot run ${poll.connector_key}` + ); + continue; + } + + if (poll.run_type === "sync") { + await runFeed(lobu, workerId, connector, poll); + } else if (poll.run_type === "action") { + await runAction(lobu, workerId, connector, poll); + } else { + await failRun( + lobu, + workerId, + poll, + `Unsupported run_type ${poll.run_type}` + ); + } + } catch (error) { + if (options.signal?.aborted) return; + console.error("[lobu] hosted connector worker error", error); + await sleep(3000, options.signal); + } + } +} + +function toDefinition(connector: HostedConnector): Record { + return { + key: connector.key, + name: connector.name, + description: connector.description, + version: connector.version, + authSchema: connector.authSchema ?? null, + optionsSchema: connector.optionsSchema ?? null, + faviconDomain: connector.faviconDomain ?? null, + feeds: Object.fromEntries( + Object.entries(connector.feeds ?? {}).map(([key, value]) => [ + key, + { + key: value.key, + name: value.name, + description: value.description, + configSchema: value.configSchema, + eventKinds: value.eventKinds, + }, + ]) + ), + actions: Object.fromEntries( + Object.entries(connector.actions ?? {}).map(([key, value]) => [ + key, + { + key: value.key, + name: value.name, + description: value.description, + requiresApproval: value.requiresApproval ?? false, + inputSchema: value.inputSchema, + outputSchema: value.outputSchema, + }, + ]) + ), + }; +} + +type WorkerPollResponse = { + next_poll_seconds?: number; + run_id?: number; + run_type?: string; + connector_key?: string; + feed_key?: string; + feed_id?: number; + connection_id?: number; + config?: Record; + checkpoint?: Record; + entity_ids?: number[]; + action_key?: string; + action_input?: Record; +}; + +async function runFeed( + lobu: BaseLobu, + workerId: string, + connector: HostedConnector, + poll: WorkerPollResponse +): Promise { + const current = poll.feed_key ? connector.feeds?.[poll.feed_key] : undefined; + if (!current || !poll.run_id) { + await complete(lobu, workerId, poll, { + status: "failed", + error_message: `Unknown feed ${poll.feed_key}`, + }); + return; + } + + let itemsCollected = 0; + const emit = async ( + items: HostedEvent[], + checkpoint?: Record + ) => { + itemsCollected += items.length; + await lobu.rest.worker("/stream", { + type: "batch", + run_id: poll.run_id!, + worker_id: workerId, + items: items.map(normalizeEvent), + checkpoint, + }); + }; + + let result: { checkpoint?: Record } | undefined; + try { + result = await current.sync({ + runId: poll.run_id, + feedId: poll.feed_id, + connectionId: poll.connection_id, + config: (poll.config ?? {}) as Record, + checkpoint: poll.checkpoint, + entityIds: poll.entity_ids, + emit, + }); + } catch (error) { + await complete(lobu, workerId, poll, { + status: "failed", + items_collected: itemsCollected, + error_message: error instanceof Error ? error.message : String(error), + }); + return; + } + + await complete(lobu, workerId, poll, { + status: "success", + items_collected: itemsCollected, + checkpoint: result?.checkpoint, + }); +} + +async function runAction( + lobu: BaseLobu, + workerId: string, + connector: HostedConnector, + poll: WorkerPollResponse +): Promise { + const current = poll.action_key + ? connector.actions?.[poll.action_key] + : undefined; + if (!current || !poll.run_id) { + await completeAction(lobu, workerId, poll, { + status: "failed", + error_message: `Unknown action ${poll.action_key}`, + }); + return; + } + + let output: unknown; + try { + output = await current.execute( + { runId: poll.run_id, connectionId: poll.connection_id }, + (poll.action_input ?? {}) as Record + ); + } catch (error) { + await completeAction(lobu, workerId, poll, { + status: "failed", + error_message: error instanceof Error ? error.message : String(error), + }); + return; + } + + await completeAction(lobu, workerId, poll, { + status: "success", + action_output: toRecord(output), + }); +} + +function toRecord(value: unknown): Record { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : { result: value }; +} + +function normalizeEvent(event: HostedEvent): Record { + return { + ...event, + occurred_at: + event.occurred_at instanceof Date + ? event.occurred_at.toISOString() + : (event.occurred_at ?? new Date().toISOString()), + }; +} + +function failRun( + lobu: BaseLobu, + workerId: string, + poll: WorkerPollResponse, + message: string +): Promise { + const body = { status: "failed", error_message: message }; + return poll.run_type === "action" + ? completeAction(lobu, workerId, poll, body) + : complete(lobu, workerId, poll, body); +} + +function complete( + lobu: BaseLobu, + workerId: string, + poll: WorkerPollResponse, + body: Record +): Promise { + return lobu.rest.worker("/complete", { + run_id: poll.run_id!, + worker_id: workerId, + ...body, + }); +} + +function completeAction( + lobu: BaseLobu, + workerId: string, + poll: WorkerPollResponse, + body: Record +): Promise { + return lobu.rest.worker("/complete-action", { + run_id: poll.run_id!, + worker_id: workerId, + ...body, + }); +} + +function sleep(ms: number, signal: AbortSignal | undefined): Promise { + if (signal?.aborted) return Promise.resolve(); + return new Promise((resolve) => { + const timeout = setTimeout(resolve, ms); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true } + ); + }); +} diff --git a/packages/client/src/rest.ts b/packages/client/src/rest.ts index 204205e6a..7fd01f8ef 100644 --- a/packages/client/src/rest.ts +++ b/packages/client/src/rest.ts @@ -10,6 +10,7 @@ import type { CreateSessionResponse, LobuFetch, LobuHeaders, + LobuInternalRequestOptions, LobuSseEvent, SendMessageOptions, SendMessageResponse, @@ -22,18 +23,28 @@ export class LobuRestClient { private readonly fetchImpl: LobuFetch; private readonly headers: LobuHeaders | undefined; private readonly client: Client; + readonly baseUrl: string; + readonly apiBaseUrl: string; + readonly org: string | undefined; constructor(options: { baseUrl: string; token: TokenProvider; fetch: LobuFetch; headers?: LobuHeaders; + org?: string; + apiBaseUrl?: string; }) { this.token = options.token; this.fetchImpl = options.fetch; this.headers = options.headers; + this.baseUrl = normalizeBaseUrl(options.baseUrl); + this.apiBaseUrl = options.apiBaseUrl + ? normalizeBaseUrl(options.apiBaseUrl) + : this.baseUrl.replace(/\/lobu$/, ""); + this.org = options.org; this.client = createClient({ - baseUrl: normalizeBaseUrl(options.baseUrl), + baseUrl: this.baseUrl, fetch: options.fetch, }); } @@ -139,10 +150,51 @@ export class LobuRestClient { } } + async tool( + toolName: string, + args: Record, + options: LobuInternalRequestOptions = {} + ): Promise { + if (!this.org) throw new Error("Lobu org is required for connector APIs"); + return this.request(`/api/${encodeURIComponent(this.org)}/${toolName}`, { + method: "POST", + body: JSON.stringify(args), + signal: options.signal, + }); + } + + async worker( + path: string, + body: Record, + options: LobuInternalRequestOptions = {} + ): Promise { + return this.request(`/api/workers${path}`, { + method: "POST", + body: JSON.stringify(body), + signal: options.signal, + }); + } + getFetch(): LobuFetch { return this.fetchImpl; } + private async request(path: string, init: RequestInit): Promise { + const response = await this.fetchImpl(`${this.apiBaseUrl}${path}`, { + ...init, + headers: { + ...headersToRecord(this.headers), + ...headersToRecord(init.headers), + ...(await this.authHeaders()), + "Content-Type": "application/json", + Accept: "application/json", + }, + }); + const body = await readBody(response); + if (!response.ok) throw new LobuApiError(response, body); + return body as T; + } + private async authHeaders(): Promise> { return this.authHeadersFor(await resolveToken(this.token)); } @@ -165,6 +217,16 @@ function normalizeBaseUrl(baseUrl: string): string { return trimmed; } +async function readBody(response: Response): Promise { + const text = await response.text(); + if (!text) return null; + try { + return JSON.parse(text); + } catch { + return text; + } +} + function headersToRecord( headers: LobuHeaders | RequestInit["headers"] | undefined ): Record { diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 0fe8aef73..1d19a4ca6 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -8,9 +8,11 @@ export type LobuHeaders = export interface LobuClientOptions { baseUrl: string; + apiBaseUrl?: string; token: TokenProvider; fetch?: LobuFetch; headers?: LobuHeaders; + org?: string; } export interface CreateSessionRequest { @@ -77,3 +79,7 @@ export interface StreamEventsOptions { signal?: AbortSignal; headers?: LobuHeaders; } + +export interface LobuInternalRequestOptions { + signal?: AbortSignal; +} diff --git a/packages/server/src/tools/admin/connector-definition-helpers.ts b/packages/server/src/tools/admin/connector-definition-helpers.ts index 273f43616..9806a36c0 100644 --- a/packages/server/src/tools/admin/connector-definition-helpers.ts +++ b/packages/server/src/tools/admin/connector-definition-helpers.ts @@ -1,5 +1,6 @@ import { createHash } from 'node:crypto'; import { getLoginProviderScopes } from '../../auth/config'; +import type { ConnectorMetadata } from '../../utils/connector-compiler'; import { type DbClient, type DbQuery, getDb } from '../../db/client'; import { probeMcpServer } from '../../mcp-proxy/client'; import { @@ -175,6 +176,81 @@ export async function installConnectorDefinitionFromSource(params: { }; } +export async function installHostedConnectorDefinition(params: { + organizationId: string; + metadata: ConnectorMetadata; +}): Promise { + const sql = getDb(); + const metadata = normalizeHostedConnectorMetadata(params.metadata); + + const { updated } = await upsertConnectorDefinitionRecords({ + sql, + organizationId: params.organizationId, + metadata, + versionRecord: { + compiledCode: null, + compiledCodeHash: null, + sourceCode: null, + sourcePath: null, + }, + }); + + const codeHash = createHash('sha256') + .update(JSON.stringify(metadata)) + .digest('hex') + .slice(0, 16); + + return { + connectorKey: metadata.key, + name: metadata.name, + version: metadata.version, + codeHash, + updated, + authSchema: metadata.authSchema, + mcpConfig: metadata.mcpConfig, + openapiConfig: metadata.openapiConfig, + }; +} + +function normalizeHostedConnectorMetadata(raw: ConnectorMetadata): ConnectorMetadata { + if (!raw || typeof raw !== 'object') throw new Error('connector_definition must be an object.'); + if (typeof raw.key !== 'string' || !raw.key.trim()) throw new Error('connector_definition.key is required.'); + if (typeof raw.name !== 'string' || !raw.name.trim()) throw new Error('connector_definition.name is required.'); + if (typeof raw.version !== 'string' || !raw.version.trim()) { + throw new Error('connector_definition.version is required.'); + } + + for (const [kind, entries] of [ + ['feeds', raw.feeds], + ['actions', raw.actions], + ] as const) { + if (entries === null || entries === undefined) continue; + if (typeof entries !== 'object' || Array.isArray(entries)) { + throw new Error(`connector_definition.${kind} must be an object.`); + } + for (const [key, value] of Object.entries(entries)) { + if (!value || typeof value !== 'object' || Array.isArray(value)) { + throw new Error(`connector_definition.${kind}.${key} must be an object.`); + } + const item = value as Record; + if (typeof item.key !== 'string' || typeof item.name !== 'string') { + throw new Error(`connector_definition.${kind}.${key} must include key and name.`); + } + } + } + + return { + ...raw, + authSchema: raw.authSchema ?? null, + feeds: raw.feeds ?? null, + actions: raw.actions ?? null, + optionsSchema: raw.optionsSchema ?? null, + mcpConfig: raw.mcpConfig ?? null, + openapiConfig: raw.openapiConfig ?? null, + runtime: { ...(raw.runtime ?? {}), mode: 'app_hosted' }, + }; +} + export async function installConnectorFromMcpUrl(params: { organizationId: string; mcpUrl: string; diff --git a/packages/server/src/tools/admin/manage_connections.ts b/packages/server/src/tools/admin/manage_connections.ts index 583911f23..912aa17aa 100644 --- a/packages/server/src/tools/admin/manage_connections.ts +++ b/packages/server/src/tools/admin/manage_connections.ts @@ -48,6 +48,7 @@ import { resolveNewConnectionSlug, } from '../../utils/connections'; import { ensureConnectorInstalled } from '../../utils/ensure-connector-installed'; +import type { ConnectorMetadata } from '../../utils/connector-compiler'; import { applyEntityLinkOverrides } from '../../utils/entity-link-overrides'; import { recordChangeEvent, recordLifecycleEvent } from '../../utils/insert-event'; import logger from '../../utils/logger'; @@ -63,6 +64,7 @@ import { getScopedConnectorDefinition, installConnectorDefinitionFromSource, installConnectorFromMcpUrl, + installHostedConnectorDefinition, listScopedConnectorDefinitions, type ScopedConnectorDefinitionRow, toggleConnectorLoginEnabled, @@ -237,6 +239,12 @@ const InstallConnectorAction = Type.Object({ 'URL to a remote MCP server (Streamable HTTP). Probes the server directly, no compilation needed.', }) ), + connector_definition: Type.Optional( + Type.Record(Type.String(), Type.Any(), { + description: + 'Metadata-only connector definition for SDK-hosted functions. Stores schemas only; code runs in the polling SDK worker.', + }) + ), auth_values: Type.Optional( Type.Record(Type.String(), Type.String(), { description: @@ -2264,18 +2272,23 @@ async function handleInstallConnector( ctx: ToolContext ): Promise { try { - const installed = args.mcp_url - ? await installConnectorFromMcpUrl({ + const installed = args.connector_definition + ? await installHostedConnectorDefinition({ organizationId: ctx.organizationId, - mcpUrl: args.mcp_url, + metadata: args.connector_definition as ConnectorMetadata, }) - : await installConnectorDefinitionFromSource({ - organizationId: ctx.organizationId, - sourceUrl: args.source_url, - sourceUri: args.source_uri, - sourceCode: args.source_code, - compiled: args.compiled, - }); + : args.mcp_url + ? await installConnectorFromMcpUrl({ + organizationId: ctx.organizationId, + mcpUrl: args.mcp_url, + }) + : await installConnectorDefinitionFromSource({ + organizationId: ctx.organizationId, + sourceUrl: args.source_url, + sourceUri: args.source_uri, + sourceCode: args.source_code, + compiled: args.compiled, + }); await maybeUpsertAuthAfterInstall(installed, args.auth_values, ctx); diff --git a/packages/server/src/tools/admin/manage_operations.ts b/packages/server/src/tools/admin/manage_operations.ts index 8d84dd9c7..1d8138af4 100644 --- a/packages/server/src/tools/admin/manage_operations.ts +++ b/packages/server/src/tools/admin/manage_operations.ts @@ -639,7 +639,11 @@ async function handleExecute( ORDER BY updated_at DESC, id DESC LIMIT 1 `) as Array<{ runtime: Record | null }>; - const isDeviceBound = defRows[0]?.runtime != null; + const runtime = defRows[0]?.runtime; + const isDeviceBound = runtime != null; + const isAppHosted = runtime?.mode === 'app_hosted'; + // App-hosted connectors also use the device lane: their code lives in the + // SDK process polling /api/workers/*, not in gateway compiled_code. const approvalMode: 'inline' | 'queued' | 'device' = shouldQueue ? 'queued' @@ -654,7 +658,7 @@ async function handleExecute( operationKey: operation.operation_key, operationInput: input, approvalMode, - requireCompiledCode: operation.backend === 'local_action', + requireCompiledCode: operation.backend === 'local_action' && !isAppHosted, }); if (args.watcher_source) { diff --git a/packages/server/src/utils/connector-compiler.ts b/packages/server/src/utils/connector-compiler.ts index fc2b616aa..4819870c0 100644 --- a/packages/server/src/utils/connector-compiler.ts +++ b/packages/server/src/utils/connector-compiler.ts @@ -22,8 +22,9 @@ export interface ConnectorMetadata { openapiConfig?: Record | null; requiredCapability?: string | null; runtime?: { - platforms: Array<'ios' | 'android' | 'macos' | 'windows' | 'linux'>; + platforms?: Array<'ios' | 'android' | 'macos' | 'windows' | 'linux'>; scopes?: string[]; + mode?: string; } | null; } diff --git a/packages/server/src/worker-api.ts b/packages/server/src/worker-api.ts index 824ed83bb..477d15473 100644 --- a/packages/server/src/worker-api.ts +++ b/packages/server/src/worker-api.ts @@ -143,6 +143,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { let platform: string | null = null; let app_version: string | null = null; let label: string | null = null; + let connectorKeys: string[] = []; try { const body = await c.req.json<{ worker_id: string; @@ -150,12 +151,16 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { platform?: string; app_version?: string; label?: string; + connector_keys?: string[]; }>(); worker_id = body.worker_id; capabilities = body.capabilities ?? {}; platform = body.platform ?? null; app_version = body.app_version ?? null; label = body.label ?? null; + connectorKeys = Array.isArray(body.connector_keys) + ? body.connector_keys.filter((key): key is string => typeof key === 'string' && key.length > 0) + : []; } catch { return c.json({ error: 'Invalid or missing JSON body' }, 400); } @@ -371,7 +376,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { FROM runs r LEFT JOIN connections con ON con.id = r.connection_id LEFT JOIN LATERAL ( - SELECT cd.api_type, cd.required_capability + SELECT cd.api_type, cd.required_capability, cd.runtime FROM connector_definitions cd WHERE cd.key = r.connector_key AND cd.organization_id = r.organization_id @@ -381,6 +386,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { ) cd ON true WHERE r.status = 'pending' AND (r.approval_status = 'auto' OR r.approval_status = 'approved') + AND (${connectorKeys.length === 0} OR r.connector_key = ANY(${pgTextArray(connectorKeys)}::text[])) AND ( -- (1) Connector-worker lanes: sync / action / embed_backfill / auth. -- Browser-only connectors require the browser capability. @@ -409,6 +415,15 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { AND con.device_worker_id IS NULL AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) ) + -- App-hosted SDK workers execute user functions in-process. They + -- advertise the connector_keys they serve; no compiled_code or + -- device pin is required. + OR ( + ${isUserScopedWorker} + AND cd.runtime->>'mode' = 'app_hosted' + AND r.connector_key = ANY(${pgTextArray(connectorKeys)}::text[]) + AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) + ) -- ... or any connection explicitly pinned to THIS device (this is -- "run the Reddit connector on my Mac"). Still: a device-only -- connector needs the capability currently advertised, and the @@ -559,7 +574,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { connection_config: Record | null; connection_device_worker_id: string | null; compiled_code: string | null; - connector_runtime: { nix?: { packages?: string[] } | null } | null; + connector_runtime: { nix?: { packages?: string[] } | null; mode?: string } | null; run_created_at: string | Date | null; // Watcher run fields (populated via LEFT JOINs) watcher_id: number | null; @@ -646,7 +661,8 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { ? findBundledConnectorFile(row.connector_key) !== null : false; const workerWillResolveLocally = !isUserScopedWorker && gatewayHasLocalSource; - if (row.connector_key && !workerWillResolveLocally) { + const isAppHostedConnector = row.connector_runtime?.mode === 'app_hosted'; + if (row.connector_key && !workerWillResolveLocally && !isAppHostedConnector) { try { compiledCode = await resolveConnectorCode(row.connector_key, row.compiled_code); } catch (err) {