From 66277d2938909f8d26a5a0606e2386bf21f288d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Thu, 28 May 2026 17:21:10 +0100 Subject: [PATCH 1/6] feat: migrate LinkedIn (and scaffold X) to Owletto extension network-intercept MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an extension-driven network-interception path for LinkedIn-style scrapers, alongside the existing Playwright `browserNetworkSync` stack. Playwright stays as the default; new path is opt-in per connection via `use_extension: true` plus the presence of a chrome dispatcher in `ctx.sessionState`. What ships: - packages/owletto: chrome connector gets three new actions — network_intercept_start/drain/stop — built on chrome.debugger + CDP Network domain. (See submodule commit for details.) - packages/connectors/src/chrome.ts: declares the three new actions in the connector definition (input/output JSON schemas). - packages/connector-sdk/src/extension-network.ts: new `extensionNetworkSync` helper that mirrors `browserNetworkSync`'s shape but routes through a caller-supplied `ChromeActionDispatcher`. 6 unit tests cover the navigate → start → drain → stop sequence, pagination, auth-check bail, non-JSON / binary body handling, and error cleanup. - packages/connectors/src/linkedin.ts: branches on a `use_extension` config flag — when set and a dispatcher is injected, routes through the extension path via `extensionNetworkSync`. Default (no flag) = unchanged Playwright path. Two new private methods `syncUpdatesViaExtension` / `syncJobsViaExtension` reuse the existing parsers and checkpoint logic; only the transport differs. Not yet shipped (called out as follow-ups in PR body): - Server-side bridge that injects `ctx.sessionState.chrome_dispatcher` into a server-side connector run when the connection is paired with an extension worker. Without this, `use_extension: true` falls back to Playwright (graceful no-op). - X (twitter) port — scaffolded path mirrors LinkedIn's; not migrated in this PR. - E2E run against a real LinkedIn company page via the extension. Submodule pointer bumped to the matching owletto branch HEAD. --- .../src/__tests__/extension-network.test.ts | 255 ++++++++++++++++ .../connector-sdk/src/extension-network.ts | 289 ++++++++++++++++++ packages/connector-sdk/src/index.ts | 13 + packages/connectors/src/chrome.ts | 109 +++++++ packages/connectors/src/linkedin.ts | 188 +++++++++++- packages/owletto | 2 +- 6 files changed, 853 insertions(+), 3 deletions(-) create mode 100644 packages/connector-sdk/src/__tests__/extension-network.test.ts create mode 100644 packages/connector-sdk/src/extension-network.ts diff --git a/packages/connector-sdk/src/__tests__/extension-network.test.ts b/packages/connector-sdk/src/__tests__/extension-network.test.ts new file mode 100644 index 000000000..ca5edf696 --- /dev/null +++ b/packages/connector-sdk/src/__tests__/extension-network.test.ts @@ -0,0 +1,255 @@ +/** + * Unit tests for extensionNetworkSync. Drives the helper with a stub + * ChromeActionDispatcher that records dispatched action_keys and returns + * canned observations — including a programmable buffer of intercepted + * responses the drain action hands back. + */ + +import { describe, expect, test } from 'bun:test'; +import type { + ChromeActionDispatcher, + InterceptedResponse, + NavigateObservation, + NetworkInterceptDrainObservation, + NetworkInterceptStartObservation, +} from '../extension-network.js'; +import { extensionNetworkSync } from '../extension-network.js'; + +interface DispatchLog { + action: string; + input: Record; +} + +function makeDispatcher({ + navUrl = 'https://www.linkedin.com/company/openai/posts/', + drainQueues = [] as InterceptedResponse[][], +}: { + navUrl?: string; + drainQueues?: InterceptedResponse[][]; +} = {}): { dispatcher: ChromeActionDispatcher; log: DispatchLog[] } { + const log: DispatchLog[] = []; + let drainIdx = 0; + const dispatcher: ChromeActionDispatcher = { + dispatch: async (action: string, input: Record) => { + log.push({ action, input }); + if (action === 'navigate') { + return { + tab_id: 555, + current_url: navUrl, + title: 'stub', + } as NavigateObservation as never; + } + if (action === 'network_intercept_start') { + return { + session_id: 'netint-stub-1', + tab_id: 555, + resumed: false, + } as NetworkInterceptStartObservation as never; + } + if (action === 'network_intercept_drain') { + const batch = drainQueues[drainIdx] ?? []; + drainIdx++; + return { + session_id: 'netint-stub-1', + drained: batch.length, + missing: false, + responses: batch, + } as NetworkInterceptDrainObservation as never; + } + // network_intercept_stop, evaluate (scroll), close_tab — caller doesn't + // care about the return value. + return {} as never; + }, + }; + return { dispatcher, log }; +} + +function makeResponse(url: string, json: unknown): InterceptedResponse { + return { + url, + status: 200, + mime: 'application/json', + body: JSON.stringify(json), + base64_encoded: false, + truncated: false, + ts: Date.now(), + }; +} + +describe('extensionNetworkSync', () => { + test('navigate → start → drain → stop → close_tab sequence', async () => { + const { dispatcher, log } = makeDispatcher({ + drainQueues: [ + [makeResponse('https://x.com/api/foo', { items: [1, 2, 3] })], + // Empty drain → triggers no-new-items stop on scroll 1. + [], + ], + }); + const result = await extensionNetworkSync({ + dispatcher, + url: 'https://x.com/feed', + config: { + interceptPatterns: ['**/api/**'], + maxScrolls: 3, + scrollDelayMs: 0, + responseTimeoutMs: 0, + }, + parseResponse: (_url, json) => { + const j = json as { items: number[] }; + return j.items ?? []; + }, + }); + expect(result.items).toEqual([1, 2, 3]); + expect(result.backend).toBe('extension'); + expect(result.apiCallCount).toBe(1); + const actions = log.map((l) => l.action); + expect(actions[0]).toBe('navigate'); + expect(actions[1]).toBe('network_intercept_start'); + // At least one drain, one stop, one close_tab. + expect(actions).toContain('network_intercept_drain'); + expect(actions).toContain('network_intercept_stop'); + expect(actions).toContain('close_tab'); + }); + + test('respects checkAuth and bails early', async () => { + const { dispatcher } = makeDispatcher({ + navUrl: 'https://www.linkedin.com/login', + }); + await expect( + extensionNetworkSync({ + dispatcher, + url: 'https://www.linkedin.com/company/openai/posts/', + config: { interceptPatterns: ['**/voyager/**'] }, + parseResponse: () => [], + checkAuth: (currentUrl) => + !currentUrl.includes('/login') && !currentUrl.includes('/authwall'), + }), + ).rejects.toThrow(/auth check failed/); + }); + + test('paginates across multiple drains until empty', async () => { + const { dispatcher, log } = makeDispatcher({ + drainQueues: [ + [makeResponse('https://x.com/api/p1', { items: ['a'] })], + [makeResponse('https://x.com/api/p2', { items: ['b'] })], + [makeResponse('https://x.com/api/p3', { items: ['c'] })], + [], + ], + }); + const result = await extensionNetworkSync({ + dispatcher, + url: 'https://x.com/feed', + config: { + interceptPatterns: ['**/api/**'], + maxScrolls: 5, + scrollDelayMs: 0, + responseTimeoutMs: 0, + }, + parseResponse: (_url, json) => (json as { items: string[] }).items, + }); + expect(result.items).toEqual(['a', 'b', 'c']); + const evalCalls = log.filter( + (l) => l.action === 'evaluate' && typeof l.input.expression === 'string', + ); + // Scrolled at least once before stopping. + expect(evalCalls.length).toBeGreaterThanOrEqual(1); + }); + + test('skips non-JSON intercepted bodies without crashing', async () => { + const { dispatcher } = makeDispatcher({ + drainQueues: [ + [ + { + url: 'https://x.com/api/broken', + status: 200, + mime: 'application/json', + body: 'not json {{{', + base64_encoded: false, + truncated: false, + ts: Date.now(), + }, + makeResponse('https://x.com/api/good', { items: ['ok'] }), + ], + [], + ], + }); + const result = await extensionNetworkSync({ + dispatcher, + url: 'https://x.com/feed', + config: { + interceptPatterns: ['**/api/**'], + maxScrolls: 1, + scrollDelayMs: 0, + responseTimeoutMs: 0, + }, + parseResponse: (_url, json) => (json as { items: string[] }).items, + }); + expect(result.items).toEqual(['ok']); + }); + + test('skips base64 (binary) bodies', async () => { + const { dispatcher } = makeDispatcher({ + drainQueues: [ + [ + { + url: 'https://x.com/api/img', + status: 200, + mime: 'image/png', + body: 'AAAAAA==', + base64_encoded: true, + truncated: false, + ts: Date.now(), + }, + ], + [], + ], + }); + const result = await extensionNetworkSync({ + dispatcher, + url: 'https://x.com/feed', + config: { + interceptPatterns: ['**/api/**'], + maxScrolls: 1, + scrollDelayMs: 0, + responseTimeoutMs: 0, + }, + parseResponse: () => ['parsed'], + }); + expect(result.items).toEqual([]); + }); + + test('always stops session + closes tab on error', async () => { + const log: DispatchLog[] = []; + const dispatcher: ChromeActionDispatcher = { + dispatch: async (action: string, input: Record) => { + log.push({ action, input }); + if (action === 'navigate') { + return { tab_id: 555, current_url: 'https://x.com/feed', title: '' } as never; + } + if (action === 'network_intercept_start') { + return { session_id: 's', tab_id: 555, resumed: false } as never; + } + if (action === 'network_intercept_drain') { + throw new Error('boom'); + } + return {} as never; + }, + }; + await expect( + extensionNetworkSync({ + dispatcher, + url: 'https://x.com/feed', + config: { + interceptPatterns: ['**/api/**'], + maxScrolls: 0, + scrollDelayMs: 0, + responseTimeoutMs: 0, + }, + parseResponse: () => [], + }), + ).rejects.toThrow(/boom/); + const actions = log.map((l) => l.action); + expect(actions).toContain('network_intercept_stop'); + expect(actions).toContain('close_tab'); + }); +}); diff --git a/packages/connector-sdk/src/extension-network.ts b/packages/connector-sdk/src/extension-network.ts new file mode 100644 index 000000000..631c48919 --- /dev/null +++ b/packages/connector-sdk/src/extension-network.ts @@ -0,0 +1,289 @@ +/** + * Extension Network Sync + * + * Mirror of `browserNetworkSync` (browser-network.ts) that runs against the + * Owletto Chrome extension instead of a Playwright-launched browser. Same + * shape — `interceptPatterns`, `parseResponse`, scroll loop — but the + * driver is a series of `chrome.*` connector actions enqueued through the + * caller-supplied `ChromeActionDispatcher`. + * + * Why this exists: server-side connectors today use `browserNetworkSync`, + * which spawns a Playwright window for every run. That's heavy and runs + * outside the user's real session (cookies / cdp-url plumbing). The + * extension stack already gives us a debugger-attached tab inside the + * user's signed-in Chrome — adding a Network-domain primitive + * (apps/chrome/network-intercept.js) lets the same parse pipeline run + * there for free. + * + * Wire shape: the dispatcher returns the same `observation` envelope the + * extension produces on /api/workers/complete-action. Connectors don't + * need to know how the run is routed (sync vs queued) — they just await + * the dispatcher. + * + * Non-goal: this does NOT replace `browserNetworkSync` yet. The two + * coexist while we migrate, gated per connector via a config flag. + */ + +import { sdkLogger } from './logger.js'; + +// ── Wire types (mirror apps/chrome/network-intercept.js + chrome connector) ── + +export type ExtensionNetworkPattern = string | { regex: string; flags?: string }; + +export interface InterceptedResponse { + url: string; + status: number; + mime: string; + /** Body text. May be truncated; see `truncated`. */ + body: string; + /** True when Chrome returned the body base64-encoded (e.g. binary content). */ + base64_encoded?: boolean; + truncated?: boolean; + ts: number; +} + +export interface NavigateObservation { + tab_id: number; + current_url: string; + title: string; + [k: string]: unknown; +} + +export interface NetworkInterceptStartObservation { + session_id: string; + tab_id: number; + resumed: boolean; + [k: string]: unknown; +} + +export interface NetworkInterceptDrainObservation { + session_id: string; + drained: number; + missing: boolean; + responses: InterceptedResponse[]; + [k: string]: unknown; +} + +export type ChromeActionInput = Record; +export type ChromeActionOutput = Record; + +/** + * Caller-supplied bridge to the Owletto extension. The server wires this + * to its run-scheduling API (enqueue a run on the chrome connector with + * `action_key=`, await /complete-action, surface the observation). + * + * Decoupled so the connector code is unit-testable: tests pass a stub + * dispatcher, the production worker passes the real one. + */ +export interface ChromeActionDispatcher { + dispatch( + action_key: string, + action_input: ChromeActionInput + ): Promise; +} + +// ── Config ──────────────────────────────────────────────────────────────── + +export interface ExtensionNetworkConfig { + /** URL patterns to intercept (glob string or {regex} object). */ + interceptPatterns: ExtensionNetworkPattern[]; + /** Maximum scroll iterations (default 10). */ + maxScrolls?: number; + /** Delay between scrolls in ms (default 2000). */ + scrollDelayMs?: number; + /** + * How long to wait for at least one response after each scroll before + * declaring no-more-pages. Default 5000. + */ + responseTimeoutMs?: number; + /** + * Per-session response buffer cap. Default 200 — higher than the + * Playwright path because the extension caps the buffer FIFO-style and + * we don't want a slow drain to lose batches between scrolls. + */ + maxBufferResponses?: number; + /** Per-response body cap. Default 1 MiB. */ + maxBodyBytes?: number; +} + +const DEFAULT_CONFIG = { + maxScrolls: 10, + scrollDelayMs: 2000, + responseTimeoutMs: 5000, + maxBufferResponses: 200, + maxBodyBytes: 1024 * 1024, +}; + +export interface ExtensionNetworkResult { + items: TItem[]; + apiCallCount: number; + backend: 'extension'; +} + +// ── Main entrypoint ─────────────────────────────────────────────────────── + +/** + * Drive a navigate → start → (scroll → drain){,n} → stop pipeline against + * the extension. Mirrors `browserNetworkSync` but emits action runs instead + * of driving a Playwright Page. + */ +export async function extensionNetworkSync(opts: { + dispatcher: ChromeActionDispatcher; + config: ExtensionNetworkConfig; + url: string; + /** + * Parse one intercepted JSON response into zero-or-more items. The + * extension hands us the raw body string; we JSON.parse here so the + * connector's parser sees the same `unknown` it does in the Playwright + * path. + */ + parseResponse: (url: string, json: unknown) => TItem[]; + /** + * Best-effort auth check from the post-navigate URL. The extension + * returns the resolved `current_url` after Page.frameStoppedLoading; + * callers compare it against known redirect destinations + * (/login, /authwall, …). + */ + checkAuth?: (currentUrl: string) => boolean; + /** + * Custom pagination trigger. Defaults to dispatching an `evaluate` + * action that runs `window.scrollTo(0, document.documentElement.scrollHeight)`. + */ + triggerNextPage?: (tabId: number, dispatcher: ChromeActionDispatcher) => Promise; +}): Promise> { + const cfg = { ...DEFAULT_CONFIG, ...opts.config }; + const items: TItem[] = []; + let apiCallCount = 0; + + // 1. navigate into a fresh background tab. + const navObs = await opts.dispatcher.dispatch('navigate', { + url: opts.url, + open_in_new_tab: true, + wait_for_load: true, + }); + const tabId = navObs.tab_id; + sdkLogger.info( + { tabId, currentUrl: navObs.current_url }, + '[ExtensionNetwork] navigated' + ); + + if (opts.checkAuth && !opts.checkAuth(navObs.current_url)) { + await safeStop(opts.dispatcher, null); + await safeCloseTab(opts.dispatcher, tabId); + throw new Error( + 'extensionNetworkSync: auth check failed — Chrome session is not logged in to this site' + ); + } + + // 2. start intercept BEFORE any scroll so the first batch of responses + // (the page's initial XHR/GraphQL calls during render) is captured. The + // extension's start() attaches the Network domain synchronously; any + // request that's already in-flight when start() returns will be seen. + // Anything that finished before start() is lost — same as the Playwright + // page.on('response') lifecycle. + const startObs = await opts.dispatcher.dispatch( + 'network_intercept_start', + { + tab_id: tabId, + patterns: opts.config.interceptPatterns, + max_buffer_responses: cfg.maxBufferResponses, + max_body_bytes: cfg.maxBodyBytes, + } + ); + const sessionId = startObs.session_id; + + try { + // 3. give the initial render a chance to fire its XHRs, then drain. + await sleep(cfg.responseTimeoutMs); + apiCallCount += await drainInto(items, opts, sessionId); + + // 4. scroll loop. Each iteration: trigger pagination, wait, drain. + let prev = items.length; + for (let n = 0; n < cfg.maxScrolls; n++) { + const trigger = + opts.triggerNextPage ?? + (async (tid: number, dispatch: ChromeActionDispatcher) => { + await dispatch.dispatch('evaluate', { + tab_id: tid, + expression: + 'window.scrollTo(0, document.documentElement.scrollHeight); 1', + }); + }); + await trigger(tabId, opts.dispatcher); + await sleep(cfg.scrollDelayMs); + apiCallCount += await drainInto(items, opts, sessionId); + + if (items.length === prev) { + sdkLogger.info( + { scroll: n + 1 }, + '[ExtensionNetwork] no new items, stopping pagination' + ); + break; + } + sdkLogger.info( + { scroll: n + 1, newItems: items.length - prev, total: items.length }, + '[ExtensionNetwork] scroll' + ); + prev = items.length; + } + + return { items, apiCallCount, backend: 'extension' }; + } finally { + await safeStop(opts.dispatcher, sessionId); + await safeCloseTab(opts.dispatcher, tabId); + } + + // Helper kept inside the closure so it sees the `cfg` + `opts` typed Ts above. + async function drainInto( + sink: TItem[], + o: typeof opts, + sid: string + ): Promise { + const drained = await o.dispatcher.dispatch( + 'network_intercept_drain', + { session_id: sid } + ); + let calls = 0; + for (const resp of drained.responses ?? []) { + calls++; + if (resp.base64_encoded) { + // Skip binary bodies — connectors using this helper want JSON. + continue; + } + let json: unknown; + try { + json = JSON.parse(resp.body); + } catch { + sdkLogger.warn( + { url: resp.url, bodyLen: resp.body?.length ?? 0 }, + '[ExtensionNetwork] non-JSON intercepted body, skipped' + ); + continue; + } + const parsed = o.parseResponse(resp.url, json); + sink.push(...parsed); + } + return calls; + } +} + +async function safeStop(dispatcher: ChromeActionDispatcher, sessionId: string | null) { + if (!sessionId) return; + try { + await dispatcher.dispatch('network_intercept_stop', { session_id: sessionId }); + } catch (err) { + sdkLogger.warn({ err, sessionId }, '[ExtensionNetwork] stop failed (already gone?)'); + } +} + +async function safeCloseTab(dispatcher: ChromeActionDispatcher, tabId: number) { + try { + await dispatcher.dispatch('close_tab', { tab_id: tabId }); + } catch (err) { + sdkLogger.warn({ err, tabId }, '[ExtensionNetwork] close_tab failed (already gone?)'); + } +} + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} diff --git a/packages/connector-sdk/src/index.ts b/packages/connector-sdk/src/index.ts index b083dcdc4..241d75231 100644 --- a/packages/connector-sdk/src/index.ts +++ b/packages/connector-sdk/src/index.ts @@ -143,6 +143,19 @@ export { } from './browser/launcher.js'; export type { BrowserNetworkConfig, BrowserNetworkResult } from './browser-network.js'; export { browserNetworkSync } from './browser-network.js'; +export type { + ChromeActionDispatcher, + ChromeActionInput, + ChromeActionOutput, + ExtensionNetworkConfig, + ExtensionNetworkPattern, + ExtensionNetworkResult, + InterceptedResponse, + NavigateObservation, + NetworkInterceptDrainObservation, + NetworkInterceptStartObservation, +} from './extension-network.js'; +export { extensionNetworkSync } from './extension-network.js'; export type { ReactionContext } from './reaction-sdk.js'; export type { ReactionClient } from './reaction-client-types.js'; export type { diff --git a/packages/connectors/src/chrome.ts b/packages/connectors/src/chrome.ts index 528c917c2..ad7c55040 100644 --- a/packages/connectors/src/chrome.ts +++ b/packages/connectors/src/chrome.ts @@ -312,6 +312,115 @@ export default class ChromeConnector extends ConnectorRuntime { properties: { tab_id: { type: 'integer' } }, }, }, + network_intercept_start: { + key: 'network_intercept_start', + name: 'Start network interception', + description: + 'Attach CDP Network and start buffering response bodies whose URL matches one of `patterns`. Returns a `session_id` the caller uses with drain/stop. Idempotent on resume — passing the same `session_id` is a no-op after a service-worker eviction. Survives SW eviction (buffer persisted to chrome.storage.session).', + requiresApproval: false, + inputSchema: { + type: 'object', + required: ['patterns'], + properties: { + tab_id: tabIdSchema, + session_id: { + type: 'string', + description: 'Reuse an existing session id. Mints a fresh one when omitted.', + }, + patterns: { + type: 'array', + minItems: 1, + items: { + oneOf: [ + { type: 'string', description: 'URL glob (** = any path; * = path segment).' }, + { + type: 'object', + required: ['regex'], + properties: { + regex: { type: 'string' }, + flags: { type: 'string' }, + }, + description: 'RegExp serialized for the wire.', + }, + ], + }, + description: + 'URL patterns to capture. Matched against response.url at receive time.', + }, + max_buffer_responses: { + type: 'integer', + minimum: 1, + maximum: 1000, + description: 'FIFO buffer cap per session. Default 100.', + }, + max_body_bytes: { + type: 'integer', + minimum: 1024, + maximum: 10 * 1024 * 1024, + description: 'Per-response body cap. Bodies above are truncated. Default 1 MiB.', + }, + }, + }, + outputSchema: { + type: 'object', + properties: { + session_id: { type: 'string' }, + tab_id: { type: 'integer' }, + resumed: { type: 'boolean' }, + }, + }, + }, + network_intercept_drain: { + key: 'network_intercept_drain', + name: 'Drain buffered network responses', + description: + 'Return all buffered intercepted responses for a session, atomically clearing the buffer.', + requiresApproval: false, + inputSchema: { + type: 'object', + required: ['session_id'], + properties: { + session_id: { type: 'string' }, + }, + }, + outputSchema: { + type: 'object', + properties: { + session_id: { type: 'string' }, + drained: { type: 'integer' }, + missing: { type: 'boolean' }, + responses: { + type: 'array', + items: { + type: 'object', + properties: { + url: { type: 'string' }, + status: { type: 'integer' }, + mime: { type: 'string' }, + body: { type: 'string' }, + base64_encoded: { type: 'boolean' }, + truncated: { type: 'boolean' }, + ts: { type: 'integer' }, + }, + }, + }, + }, + }, + }, + network_intercept_stop: { + key: 'network_intercept_stop', + name: 'Stop network interception', + description: + 'Remove the CDP listener for the session and delete its buffer. Detaches the debugger when this is the last live session on the tab.', + requiresApproval: false, + inputSchema: { + type: 'object', + required: ['session_id'], + properties: { + session_id: { type: 'string' }, + }, + }, + }, evaluate: { key: 'evaluate', name: 'Evaluate JS', diff --git a/packages/connectors/src/linkedin.ts b/packages/connectors/src/linkedin.ts index 19eecb247..8f1341b41 100644 --- a/packages/connectors/src/linkedin.ts +++ b/packages/connectors/src/linkedin.ts @@ -12,10 +12,12 @@ import { browserNetworkSync, + type ChromeActionDispatcher, type ConnectorDefinition, ConnectorRuntime, calculateEngagementScore, type EventEnvelope, + extensionNetworkSync, type SyncContext, type SyncResult, } from '@lobu/connector-sdk'; @@ -59,6 +61,28 @@ function normalizeCheckpointPostId(postId?: string): string | undefined { return postId.startsWith('li_post_') ? postId.slice('li_post_'.length) : postId; } +/** + * Discover a chrome-extension action dispatcher from the run context. The + * extension path is opted in by: + * 1. `config.use_extension === true` on the connection, AND + * 2. `ctx.sessionState.chrome_dispatcher` being a `ChromeActionDispatcher` + * (a handle the server-side bridge injects when a paired Owletto + * extension worker is available to claim the run). + * + * Returning null falls through to the existing Playwright + cookies stack. + * That keeps every existing deployment working unchanged. + */ +function pickExtensionDispatcher( + ctx: SyncContext, + config: Record +): ChromeActionDispatcher | null { + if (config.use_extension !== true) return null; + const handle = (ctx.sessionState as Record | null | undefined) + ?.chrome_dispatcher as ChromeActionDispatcher | undefined; + if (!handle || typeof handle.dispatch !== 'function') return null; + return handle; +} + export function filterPostsSinceCheckpoint( posts: LinkedInPost[], checkpoint: LinkedInCheckpoint @@ -194,6 +218,13 @@ function parseJobListings(_url: string, json: unknown): LinkedInJob[] { // ── Config Schemas ──────────────────────────────────────────── +const useExtensionProp = { + type: 'boolean', + default: false, + description: + 'Route the scrape through the paired Owletto Chrome extension instead of a Playwright-launched browser. Requires a paired Chrome connector worker; falls back to Playwright when no dispatcher is available. Default: false during the migration window.', +}; + const companyUpdatesConfigSchema = { type: 'object', required: ['company_url'], @@ -209,6 +240,7 @@ const companyUpdatesConfigSchema = { default: 5, description: 'Maximum scroll iterations for pagination (default: 5)', }, + use_extension: useExtensionProp, }, }; @@ -227,6 +259,7 @@ const jobsConfigSchema = { default: 3, description: 'Maximum scroll iterations for job listings (default: 3)', }, + use_extension: useExtensionProp, }, }; @@ -321,6 +354,32 @@ export default class LinkedInConnector extends ConnectorRuntime { // Normalize URL - remove trailing slash const baseUrl = companyUrl.replace(/\/$/, ''); + // ── Extension path (opt-in) ────────────────────────────────────────── + // + // When the connection's session state carries a `chrome_dispatcher` + // handle (injected by the server-side bridge that pairs LinkedIn runs + // with a paired Owletto extension worker) AND the connection config + // enables it, route the whole sync through the extension's network- + // intercept primitive instead of Playwright. The Playwright path + // stays as the default fallback during the migration window. + // + // The dispatcher contract (extensionNetworkSync ⇄ background.js): + // dispatch('navigate', {url, open_in_new_tab: true}) + // dispatch('network_intercept_start', {tab_id, patterns, ...}) + // dispatch('network_intercept_drain', {session_id}) + // dispatch('network_intercept_stop', {session_id}) + // dispatch('close_tab', {tab_id}) + // + // See PR body "Migration sequencing" for the server-side wiring. + const dispatcher = pickExtensionDispatcher(ctx, config); + const maxScrolls = (config.max_scrolls as number) ?? (feedKey === 'jobs' ? 3 : 5); + if (dispatcher) { + if (feedKey === 'jobs') { + return this.syncJobsViaExtension(baseUrl, maxScrolls, checkpoint, dispatcher); + } + return this.syncUpdatesViaExtension(baseUrl, maxScrolls, checkpoint, dispatcher); + } + const userDataDir = getBrowserUserDataDir(ctx.sessionState); const cdpUrlFromSession = getBrowserCdpUrl(ctx.sessionState); const cdpUrl = cdpUrlFromSession ?? 'auto'; @@ -336,8 +395,6 @@ export default class LinkedInConnector extends ConnectorRuntime { validateCookieNotExpired(cookies, 'li_at', 'linkedin'); } - const maxScrolls = (config.max_scrolls as number) ?? (feedKey === 'jobs' ? 3 : 5); - if (feedKey === 'jobs') { return this.syncJobs(baseUrl, cookies, maxScrolls, checkpoint, userDataDir, cdpUrl); } @@ -345,6 +402,133 @@ export default class LinkedInConnector extends ConnectorRuntime { return this.syncUpdates(baseUrl, cookies, maxScrolls, checkpoint, userDataDir, cdpUrl); } + // ── Extension-backed sync paths ──────────────────────────────────────── + // + // These mirror syncUpdates / syncJobs but consume extensionNetworkSync + // instead of browserNetworkSync. They share the parsers and checkpoint + // logic; the diff is purely the transport layer. + + private async syncUpdatesViaExtension( + baseUrl: string, + maxScrolls: number, + checkpoint: LinkedInCheckpoint, + dispatcher: ChromeActionDispatcher + ): Promise { + const postsUrl = `${baseUrl}/posts/`; + const result = await extensionNetworkSync({ + dispatcher, + url: postsUrl, + config: { + interceptPatterns: [ + { regex: 'voyager/api/graphql\\?variables=.*ORGANIZATION_MEMBER_FEED' }, + { regex: 'voyager/api/graphql\\?variables=.*organizationalPageUrn' }, + ], + maxScrolls, + scrollDelayMs: 3000, + responseTimeoutMs: 8000, + }, + parseResponse: parseCompanyUpdates, + checkAuth: (currentUrl) => + !currentUrl.includes('/login') && !currentUrl.includes('/authwall'), + }); + + const posts = filterPostsSinceCheckpoint(result.items, checkpoint); + const events: EventEnvelope[] = posts.map((post) => ({ + origin_id: `li_post_${post.id}`, + payload_text: post.text, + author_name: post.author, + occurred_at: post.publishedAt, + origin_type: 'post', + source_url: `https://www.linkedin.com/feed/update/urn:li:activity:${post.id}`, + score: calculateEngagementScore('linkedin', { + upvotes: post.likes, + reply_count: post.comments, + }), + metadata: { + author_headline: post.authorHeadline, + likes: post.likes, + comments: post.comments, + shares: post.shares, + }, + })); + events.sort( + (a, b) => new Date(b.occurred_at).getTime() - new Date(a.occurred_at).getTime() + ); + + return { + events, + checkpoint: { + last_post_id: posts[0]?.id ?? checkpoint.last_post_id, + last_timestamp: events[0]?.occurred_at?.toISOString?.() ?? checkpoint.last_timestamp, + } as unknown as Record, + // No cookie persistence on the extension path — auth lives in the + // user's signed-in Chrome session, not in our cookie cache. + metadata: { + items_found: events.length, + items_skipped: result.items.length - posts.length, + api_calls: result.apiCallCount, + backend: 'extension', + }, + }; + } + + private async syncJobsViaExtension( + baseUrl: string, + maxScrolls: number, + checkpoint: LinkedInCheckpoint, + dispatcher: ChromeActionDispatcher + ): Promise { + const jobsUrl = `${baseUrl}/jobs/`; + const result = await extensionNetworkSync({ + dispatcher, + url: jobsUrl, + config: { + interceptPatterns: [ + { regex: 'voyager/api/graphql.*jobPosting', flags: 'i' }, + { regex: 'voyager/api/search/dash/.*jobs', flags: 'i' }, + { regex: 'voyager/api/organization/.*jobs', flags: 'i' }, + ], + maxScrolls, + scrollDelayMs: 3000, + responseTimeoutMs: 8000, + }, + parseResponse: parseJobListings, + checkAuth: (currentUrl) => + !currentUrl.includes('/login') && !currentUrl.includes('/authwall'), + }); + + const seenIds = new Set(); + const jobs = result.items.filter((j) => { + if (!j.id || seenIds.has(j.id)) return false; + seenIds.add(j.id); + return true; + }); + jobs.sort((a, b) => b.postedAt.getTime() - a.postedAt.getTime()); + + const events: EventEnvelope[] = jobs.map((job) => ({ + origin_id: `li_job_${job.id}`, + payload_text: job.description ?? job.title, + title: job.title, + occurred_at: job.postedAt, + origin_type: 'job_posting', + source_url: job.url, + metadata: { location: job.location }, + })); + + return { + events, + checkpoint: { + last_job_id: jobs[0]?.id ?? checkpoint.last_job_id, + last_timestamp: jobs[0]?.postedAt?.toISOString?.() ?? checkpoint.last_timestamp, + } as unknown as Record, + metadata: { + items_found: events.length, + api_calls: result.apiCallCount, + backend: 'extension', + }, + }; + } + private async syncUpdates( baseUrl: string, cookies: any[], diff --git a/packages/owletto b/packages/owletto index 6bcdb9c71..6b51be054 160000 --- a/packages/owletto +++ b/packages/owletto @@ -1 +1 @@ -Subproject commit 6bcdb9c71645c98d7ee543d9c387747b864b8b78 +Subproject commit 6b51be054b5b931de3a74aa40588d975b76764fc From 881800b7b8cb5ba0007ea937d848e98ee7e6c483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Thu, 28 May 2026 17:55:58 +0100 Subject: [PATCH 2/6] feat: chrome-extension dispatcher bridge for LinkedIn (delete Playwright fallback) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the loop on the LinkedIn extension migration so the connector runs end-to-end on the paired Owletto Chrome extension. No more dual path. ## What landed ### Server bridge — POST /api/workers/dispatch-chrome-action Connector-worker fleets call this from inside a running sync to dispatch a chrome connector action against the paired extension in the same org. The endpoint: - authorizes the parent sync run (must be running, claimed by the caller) - picks an online chrome connection in the parent run's org (browser.debugger capability + last_seen within 20 min) - enqueues a device-bound chrome action run via createConnectorOperationRun - awaits completion via waitForDeviceActionRun (now exported from manage_operations) — the same Postgres-mediated wait the existing manage_operations.execute path uses for device-bound calls Reuses the existing device-action runs queue. No new state machine, no new auth surface. Multi-replica safe by construction: every signal rides Postgres rows, the chrome extension's /api/workers/complete-action POST can land on any replica and finalize the run. ### Connector-worker IPC reverse channel - ExecutionHooks gains onChromeDispatch. - Subprocess executor relays chrome_dispatch_request / chrome_dispatch_response IPC messages between child connector code and the daemon hook (mirrors the existing auth-signal channel shape). - child-runner.ts splices a live { dispatch } object onto sessionState before invoking sync(). The dispatcher rides IPC up to the daemon, the daemon calls the gateway bridge, the bridge waits for the extension, and the observation flows back down. - ExecutorClient.dispatchChromeAction posts to the new gateway endpoint (trusted worker auth via WORKER_API_TOKEN). ### LinkedIn — DELETE the Playwright fallback - Removed all browserNetworkSync code paths and the cookie cascade. - Removed the use_extension config flag (no dual path, no opt-in). - Removed the browser auth method from the authSchema; the extension carries auth implicitly from the user's signed-in Chrome. - sync() now calls requireExtensionDispatcher(ctx) which throws a clear 'no paired Owletto extension' error when none is reachable. No silent Playwright fallback. - definition.version bumped 1.1.0 → 2.0.0 (breaking config schema change). Revolut + X are unchanged — they still use browserNetworkSync. Migrating them is a separate follow-up; this PR is the LinkedIn cutover. ## Verification - bun run typecheck — clean (server + owletto strict). - make build-packages — clean. - bun test packages/connector-worker — 60 pass (existing IPC test surface green; the new IPC channel is a parallel branch). - bun test packages/connector-sdk/src/__tests__/extension-network.test.ts — 6 pass (existing extensionNetworkSync contract unchanged). - bun test packages/server/src/__tests__/unit/connectors/linkedin.test.ts — 2 pass (existing checkpoint-filter logic unchanged). - bun test packages/server/src/__tests__/unit — 240 pass / 16 skip / 0 fail. ## E2E — NOT VALIDATED in this dispatch (dead-end called per AGENTS.md hard gate) The user's live Owletto extension is paired against PROD (app.lobu.ai) in the buremba org. To E2E this branch we'd need to either: (a) deploy the branch image to prod and run a real LinkedIn sync there (Flux auto-deploys main; this branch hasn't merged yet), OR (b) re-pair the user's extension to a local make-dev gateway, apply the chrome 2.x + linkedin 2.x connector definitions to a fresh local org, create a linkedin connection, drive a sync, and verify Voyager responses land. That's a multi-step re-pair + auth + sync session that wasn't safe to do from this dispatch without explicit 'go ahead and re-pair' from the user (overwrites their current buremba pairing). Per AGENTS.md 'E2E before merge (hard gate) … if you can't reproduce, BAIL': bailing on the live LinkedIn proof. Next-step options spelled out in the agent's report. The buremba prod chrome connector_definition is still at version 0.2.0 (network_intercept_* actions not yet present). After this lands and deploys, applying chrome 0.3.0+ to buremba is the first E2E step. ## Diff stat (lobu) packages/connector-worker/src/daemon/client.ts | +46 packages/connector-worker/src/daemon/executor.ts | +13 packages/connector-worker/src/executor/child-runner.ts| +65 packages/connector-worker/src/executor/interface.ts | +13 packages/connector-worker/src/executor/subprocess.ts | +47 packages/connectors/src/linkedin.ts | +49 / -226 (net -177) packages/server/src/index.ts | +4 packages/server/src/tools/admin/manage_operations.ts | +1 / -1 (export) packages/server/src/worker-api/dispatch-chrome-action.ts | new (~170) --- .../connector-worker/src/daemon/client.ts | 46 +++ .../connector-worker/src/daemon/executor.ts | 13 + .../src/executor/child-runner.ts | 65 +++- .../src/executor/interface.ts | 13 + .../src/executor/subprocess.ts | 47 +++ packages/connectors/src/linkedin.ts | 277 +++--------------- packages/server/src/index.ts | 4 + .../src/tools/admin/manage_operations.ts | 2 +- .../src/worker-api/dispatch-chrome-action.ts | 168 +++++++++++ 9 files changed, 392 insertions(+), 243 deletions(-) create mode 100644 packages/server/src/worker-api/dispatch-chrome-action.ts diff --git a/packages/connector-worker/src/daemon/client.ts b/packages/connector-worker/src/daemon/client.ts index 3f85655da..cf8685d7b 100644 --- a/packages/connector-worker/src/daemon/client.ts +++ b/packages/connector-worker/src/daemon/client.ts @@ -39,6 +39,13 @@ export interface ExecutorClient { emitAuthArtifact(req: EmitAuthArtifactRequest): Promise; pollAuthSignal(req: PollAuthSignalRequest): Promise; completeAuth(req: CompleteAuthRequest): Promise; + /** + * Forward a chrome-extension action call from the running connector to the + * gateway, which enqueues a chrome connector action run, waits for the + * paired Owletto extension to claim/complete, and returns the observation + * — multi-replica safe because the wait is Postgres-mediated. + */ + dispatchChromeAction(req: DispatchChromeActionRequest): Promise>; } // ============================================ @@ -207,6 +214,26 @@ export interface PollAuthSignalResponse { signal?: Record; } +/** + * Request the gateway dispatch a chrome connector action on behalf of the + * currently-running sync. The gateway resolves a paired chrome connection + * in the same org and posts the result back via /api/workers/complete-action, + * the same path used by the chrome extension for its own action runs. + */ +export interface DispatchChromeActionRequest { + /** run_id of the *parent* sync run (used to scope the dispatch to that org). */ + parent_run_id: number; + worker_id: string; + action_key: string; + action_input: Record; +} + +export interface DispatchChromeActionResponse { + status: 'completed' | 'failed' | 'timeout'; + output?: Record; + error_message?: string; +} + export interface CompleteAuthRequest { run_id: number; worker_id: string; @@ -377,6 +404,25 @@ export class WorkerClient implements ExecutorClient { await this.requestVoid('/api/workers/complete-auth', req as unknown as Record); } + /** + * Forward a chrome connector action call to the gateway. Blocks until the + * paired Owletto extension completes the run or the gateway-side budget + * times out. Throws on failure/timeout with the gateway's error message. + */ + async dispatchChromeAction(req: DispatchChromeActionRequest): Promise> { + const result = await this.requestJson( + '/api/workers/dispatch-chrome-action', + req as unknown as Record + ); + if (result.status === 'completed') { + return result.output ?? {}; + } + throw new Error( + result.error_message ?? + `Chrome action '${req.action_key}' ${result.status === 'timeout' ? 'timed out' : 'failed'}` + ); + } + /** * Health check */ diff --git a/packages/connector-worker/src/daemon/executor.ts b/packages/connector-worker/src/daemon/executor.ts index a151d4cc6..670d84532 100644 --- a/packages/connector-worker/src/daemon/executor.ts +++ b/packages/connector-worker/src/daemon/executor.ts @@ -242,6 +242,19 @@ async function executeSyncRun( } } }, + onChromeDispatch: async (actionKey, actionInput) => { + // Forward to the gateway's dispatch endpoint. The endpoint + // resolves a paired chrome connection in the same org as run_id, + // inserts an 'action' run, and waits for the Owletto extension + // worker to claim + complete. Multi-replica safe — all state in + // Postgres; either replica can host the wait. + return client.dispatchChromeAction({ + parent_run_id: run_id, + worker_id: client.id, + action_key: actionKey, + action_input: actionInput, + }); + }, }, }); diff --git a/packages/connector-worker/src/executor/child-runner.ts b/packages/connector-worker/src/executor/child-runner.ts index 6ab5514d6..319424802 100644 --- a/packages/connector-worker/src/executor/child-runner.ts +++ b/packages/connector-worker/src/executor/child-runner.ts @@ -46,6 +46,41 @@ const pendingSignalWaiters = new Map< >(); const authAbortController = new AbortController(); +// --------------------------------------------------------------------------- +// Chrome-action dispatch reverse channel: connectors call +// `ctx.sessionState.chrome_dispatcher.dispatch(action_key, input)` from inside +// `sync()`; the call is routed over IPC to the parent (connector-worker +// daemon), which posts to the gateway's +// /api/workers/dispatch-chrome-action endpoint. The endpoint inserts a chrome +// connector action run, waits for the paired Owletto extension to claim + +// complete it, and returns the action_output back along the chain. +// +// One IPC request id per call. Resolves with the observation; rejects with +// the gateway-side error_message on failure. +// --------------------------------------------------------------------------- + +let nextDispatchRequestId = 1; +const pendingDispatchWaiters = new Map< + number, + { resolve: (v: Record) => void; reject: (e: Error) => void } +>(); + +function dispatchChromeAction( + actionKey: string, + actionInput: Record +): Promise> { + return new Promise>((resolve, reject) => { + const requestId = nextDispatchRequestId++; + pendingDispatchWaiters.set(requestId, { resolve, reject }); + void sendIPC({ + type: 'chrome_dispatch_request', + requestId, + actionKey, + actionInput, + }); + }); +} + function awaitAuthSignal( name: string, options?: { timeoutMs?: number } @@ -138,13 +173,27 @@ async function executeConnectorRuntime( await sendIPC({ type: 'checkpoint_update', checkpoint: checkpoint ?? null }); }; + // Always splice a live `chrome_dispatcher` handle onto sessionState. The + // dispatcher is a JS object that closes over the IPC channel — it can't + // travel through the wire, so we re-create it in the child every run. + // Connectors that don't need it simply ignore the field; calling + // .dispatch() with no online paired Owletto extension surfaces a clean + // error from the gateway-side bridge. + const sessionStateForSync = { + ...(job.sessionState ?? {}), + chrome_dispatcher: { + dispatch: (actionKey: string, actionInput: Record) => + dispatchChromeAction(actionKey, actionInput), + }, + } as Record; + const syncResult = (await instance.sync({ feedKey: job.feedKey, config: { ...job.env, ...job.config }, checkpoint: job.checkpoint, credentials: job.credentials, entityIds: job.entityIds, - sessionState: job.sessionState, + sessionState: sessionStateForSync, emitEvents, updateCheckpoint, })) as SyncResult; @@ -248,6 +297,20 @@ async function main() { let started = false; // Wait for message from parent process.on('message', async (msg: any) => { + // Chrome-dispatch reverse channel: parent ships the + // /dispatch-chrome-action observation (or error) back to us. + if (msg?.type === 'chrome_dispatch_response') { + const waiter = pendingDispatchWaiters.get(msg.requestId); + if (waiter) { + pendingDispatchWaiters.delete(msg.requestId); + if (msg.error) { + waiter.reject(new Error(String(msg.error))); + } else { + waiter.resolve((msg.output ?? {}) as Record); + } + } + return; + } // Auth-mode reverse channel: parent sends signal payloads + abort. if (msg?.type === 'await_signal_response') { const waiter = pendingSignalWaiters.get(msg.requestId); diff --git a/packages/connector-worker/src/executor/interface.ts b/packages/connector-worker/src/executor/interface.ts index bec4f1303..c0437167d 100644 --- a/packages/connector-worker/src/executor/interface.ts +++ b/packages/connector-worker/src/executor/interface.ts @@ -67,6 +67,19 @@ export interface ExecutionHooks { name: string, options?: { timeoutMs?: number } ) => Promise>; + /** + * Sync runs: connector code invoked + * `ctx.sessionState.chrome_dispatcher.dispatch(actionKey, actionInput)`. + * The host (connector-worker daemon) forwards the call to the gateway + * (POST /api/workers/dispatch-chrome-action), which inserts a chrome + * connector action run, waits for the paired Owletto extension to claim + * and complete it, and returns the observation. Implementations MUST + * reject when no extension is reachable. + */ + onChromeDispatch?: ( + actionKey: string, + actionInput: Record + ) => Promise>; } /** Per-run execution options independent of the job payload. */ diff --git a/packages/connector-worker/src/executor/subprocess.ts b/packages/connector-worker/src/executor/subprocess.ts index 2eb411f90..23a2fe174 100644 --- a/packages/connector-worker/src/executor/subprocess.ts +++ b/packages/connector-worker/src/executor/subprocess.ts @@ -326,6 +326,53 @@ export class SubprocessExecutor implements SyncExecutor { return; } + if (msg.type === 'chrome_dispatch_request') { + const requestId = msg.requestId; + const actionKey = typeof msg.actionKey === 'string' ? msg.actionKey : ''; + const actionInput = + msg.actionInput && typeof msg.actionInput === 'object' + ? (msg.actionInput as Record) + : {}; + queueTask(async () => { + if (!hooks?.onChromeDispatch) { + try { + child.send({ + type: 'chrome_dispatch_response', + requestId, + error: + 'chrome_dispatcher is not available in this execution context (no onChromeDispatch hook)', + }); + } catch { + /* ignore */ + } + return; + } + try { + const output = await hooks.onChromeDispatch(actionKey, actionInput); + try { + child.send({ + type: 'chrome_dispatch_response', + requestId, + output, + }); + } catch { + /* IPC closed — child already exited. */ + } + } catch (err) { + try { + child.send({ + type: 'chrome_dispatch_response', + requestId, + error: err instanceof Error ? err.message : String(err), + }); + } catch { + /* IPC closed — child already exited. */ + } + } + }); + return; + } + if (msg.type === 'await_signal_request') { const requestId = msg.requestId; const name = msg.name; diff --git a/packages/connectors/src/linkedin.ts b/packages/connectors/src/linkedin.ts index 8f1341b41..142c52679 100644 --- a/packages/connectors/src/linkedin.ts +++ b/packages/connectors/src/linkedin.ts @@ -1,17 +1,20 @@ /** * LinkedIn Connector * - * Scrapes LinkedIn company pages via browser network interception. - * Uses Playwright to navigate company pages and intercept Voyager API responses. - * Auth via Chrome attach over CDP — the user signs in once to a dedicated - * Chrome window started by `lobu memory browser-auth`, and the connector - * connects to that live profile at sync time. + * Scrapes LinkedIn company pages via the paired Owletto Chrome extension's + * network-intercept primitive. The extension runs inside the user's real + * Chrome session — no Playwright, no cookie cache, no `--remote-debugging- + * port` plumbing. We attach the CDP Network domain in the user's signed-in + * tab, drive scroll pagination, and parse the Voyager API responses the + * page emits. * - * Follows the same pattern as the X (Twitter) connector. + * Auth is implicit: the user is already signed into linkedin.com in the + * paired Chrome. There is no fallback path — if no online Owletto extension + * is reachable in the connection's org, this sync fails fast with a clear + * "no paired Owletto extension" error. */ import { - browserNetworkSync, type ChromeActionDispatcher, type ConnectorDefinition, ConnectorRuntime, @@ -21,12 +24,6 @@ import { type SyncContext, type SyncResult, } from '@lobu/connector-sdk'; -import { - getBrowserCdpUrl, - getBrowserCookies, - getBrowserUserDataDir, - validateCookieNotExpired, -} from './browser-scraper-utils'; // ── Types ────────────────────────────────────────────────────── @@ -62,24 +59,23 @@ function normalizeCheckpointPostId(postId?: string): string | undefined { } /** - * Discover a chrome-extension action dispatcher from the run context. The - * extension path is opted in by: - * 1. `config.use_extension === true` on the connection, AND - * 2. `ctx.sessionState.chrome_dispatcher` being a `ChromeActionDispatcher` - * (a handle the server-side bridge injects when a paired Owletto - * extension worker is available to claim the run). - * - * Returning null falls through to the existing Playwright + cookies stack. - * That keeps every existing deployment working unchanged. + * Pull the chrome action dispatcher from sessionState. The connector-worker + * subprocess (child-runner.ts) splices a live `chrome_dispatcher` object + * onto every sync's sessionState; the dispatcher's `dispatch()` rides an + * IPC channel up to the daemon and out to the gateway's + * /api/workers/dispatch-chrome-action bridge. When no paired Owletto + * extension is online in the connection's org, the bridge returns the + * `failed` status and the dispatcher throws — we surface that as the sync + * failure verbatim. */ -function pickExtensionDispatcher( - ctx: SyncContext, - config: Record -): ChromeActionDispatcher | null { - if (config.use_extension !== true) return null; +function requireExtensionDispatcher(ctx: SyncContext): ChromeActionDispatcher { const handle = (ctx.sessionState as Record | null | undefined) ?.chrome_dispatcher as ChromeActionDispatcher | undefined; - if (!handle || typeof handle.dispatch !== 'function') return null; + if (!handle || typeof handle.dispatch !== 'function') { + throw new Error( + 'LinkedIn connector requires a paired Owletto Chrome extension. No chrome_dispatcher was injected into sessionState — re-run on a connector-worker that has the dispatcher bridge.' + ); + } return handle; } @@ -218,13 +214,6 @@ function parseJobListings(_url: string, json: unknown): LinkedInJob[] { // ── Config Schemas ──────────────────────────────────────────── -const useExtensionProp = { - type: 'boolean', - default: false, - description: - 'Route the scrape through the paired Owletto Chrome extension instead of a Playwright-launched browser. Requires a paired Chrome connector worker; falls back to Playwright when no dispatcher is available. Default: false during the migration window.', -}; - const companyUpdatesConfigSchema = { type: 'object', required: ['company_url'], @@ -240,7 +229,6 @@ const companyUpdatesConfigSchema = { default: 5, description: 'Maximum scroll iterations for pagination (default: 5)', }, - use_extension: useExtensionProp, }, }; @@ -259,7 +247,6 @@ const jobsConfigSchema = { default: 3, description: 'Maximum scroll iterations for job listings (default: 3)', }, - use_extension: useExtensionProp, }, }; @@ -269,17 +256,14 @@ export default class LinkedInConnector extends ConnectorRuntime { readonly definition: ConnectorDefinition = { key: 'linkedin', name: 'LinkedIn', - description: 'Scrapes LinkedIn company pages for posts, hiring signals, and team data.', - version: '1.1.0', + description: + 'Scrapes LinkedIn company pages for posts, hiring signals, and team data via the paired Owletto Chrome extension.', + version: '2.0.0', faviconDomain: 'linkedin.com', authSchema: { methods: [ { - type: 'browser', - capture: 'cli', - requiredDomains: ['linkedin.com', '.linkedin.com'], - description: - 'Preferred auth mode for LinkedIn scraping. The CLI launches a dedicated Chrome with remote debugging; you log into LinkedIn once and the connector attaches over CDP, harvesting cookies live from that session.', + type: 'none', }, { type: 'oauth', @@ -293,7 +277,7 @@ export default class LinkedInConnector extends ConnectorRuntime { clientIdKey: 'LINKEDIN_CLIENT_ID', clientSecretKey: 'LINKEDIN_CLIENT_SECRET', description: - 'Optional LinkedIn OAuth app config for sign-in and future API-based access. Current company page and jobs feeds still scrape via browser session cookies.', + 'Optional LinkedIn OAuth app config for sign-in. Current company page and jobs feeds run via the Chrome extension; OAuth is here for downstream sign-in flows.', setupInstructions: 'Create a LinkedIn OAuth app, add {{redirect_uri}} as the callback URL, then paste the client ID and client secret below.', }, @@ -353,62 +337,16 @@ export default class LinkedInConnector extends ConnectorRuntime { // Normalize URL - remove trailing slash const baseUrl = companyUrl.replace(/\/$/, ''); - - // ── Extension path (opt-in) ────────────────────────────────────────── - // - // When the connection's session state carries a `chrome_dispatcher` - // handle (injected by the server-side bridge that pairs LinkedIn runs - // with a paired Owletto extension worker) AND the connection config - // enables it, route the whole sync through the extension's network- - // intercept primitive instead of Playwright. The Playwright path - // stays as the default fallback during the migration window. - // - // The dispatcher contract (extensionNetworkSync ⇄ background.js): - // dispatch('navigate', {url, open_in_new_tab: true}) - // dispatch('network_intercept_start', {tab_id, patterns, ...}) - // dispatch('network_intercept_drain', {session_id}) - // dispatch('network_intercept_stop', {session_id}) - // dispatch('close_tab', {tab_id}) - // - // See PR body "Migration sequencing" for the server-side wiring. - const dispatcher = pickExtensionDispatcher(ctx, config); const maxScrolls = (config.max_scrolls as number) ?? (feedKey === 'jobs' ? 3 : 5); - if (dispatcher) { - if (feedKey === 'jobs') { - return this.syncJobsViaExtension(baseUrl, maxScrolls, checkpoint, dispatcher); - } - return this.syncUpdatesViaExtension(baseUrl, maxScrolls, checkpoint, dispatcher); - } - - const userDataDir = getBrowserUserDataDir(ctx.sessionState); - const cdpUrlFromSession = getBrowserCdpUrl(ctx.sessionState); - const cdpUrl = cdpUrlFromSession ?? 'auto'; - // No need to require cookies when the device tells us to attach directly - // (managed --user-data-dir on disk, or an explicit CDP endpoint pointed - // at the user's running Chrome). The cookie cascade is only the fallback - // for the cloud/auto path. - const skipServerCookies = !!userDataDir || !!cdpUrlFromSession; - const cookies = skipServerCookies - ? [] - : getBrowserCookies(ctx.checkpoint as any, ctx.sessionState as any, 'linkedin'); - if (!skipServerCookies) { - validateCookieNotExpired(cookies, 'li_at', 'linkedin'); - } + const dispatcher = requireExtensionDispatcher(ctx); if (feedKey === 'jobs') { - return this.syncJobs(baseUrl, cookies, maxScrolls, checkpoint, userDataDir, cdpUrl); + return this.syncJobs(baseUrl, maxScrolls, checkpoint, dispatcher); } - - return this.syncUpdates(baseUrl, cookies, maxScrolls, checkpoint, userDataDir, cdpUrl); + return this.syncUpdates(baseUrl, maxScrolls, checkpoint, dispatcher); } - // ── Extension-backed sync paths ──────────────────────────────────────── - // - // These mirror syncUpdates / syncJobs but consume extensionNetworkSync - // instead of browserNetworkSync. They share the parsers and checkpoint - // logic; the diff is purely the transport layer. - - private async syncUpdatesViaExtension( + private async syncUpdates( baseUrl: string, maxScrolls: number, checkpoint: LinkedInCheckpoint, @@ -461,8 +399,8 @@ export default class LinkedInConnector extends ConnectorRuntime { last_post_id: posts[0]?.id ?? checkpoint.last_post_id, last_timestamp: events[0]?.occurred_at?.toISOString?.() ?? checkpoint.last_timestamp, } as unknown as Record, - // No cookie persistence on the extension path — auth lives in the - // user's signed-in Chrome session, not in our cookie cache. + // No cookie persistence — auth lives in the user's signed-in Chrome, + // not in our cookie cache. metadata: { items_found: events.length, items_skipped: result.items.length - posts.length, @@ -472,7 +410,7 @@ export default class LinkedInConnector extends ConnectorRuntime { }; } - private async syncJobsViaExtension( + private async syncJobs( baseUrl: string, maxScrolls: number, checkpoint: LinkedInCheckpoint, @@ -528,147 +466,4 @@ export default class LinkedInConnector extends ConnectorRuntime { }, }; } - - private async syncUpdates( - baseUrl: string, - cookies: any[], - maxScrolls: number, - checkpoint: LinkedInCheckpoint, - userDataDir: string | undefined, - cdpUrl: string | 'auto' - ): Promise { - const postsUrl = `${baseUrl}/posts/`; - - const result = await browserNetworkSync({ - config: { - interceptPatterns: [ - /voyager\/api\/graphql\?variables=.*ORGANIZATION_MEMBER_FEED/, - /voyager\/api\/graphql\?variables=.*organizationalPageUrn/, - ], - authDomains: ['linkedin.com', '.linkedin.com', '.www.linkedin.com'], - stealth: true, - maxScrolls, - scrollDelayMs: 3000, - responseTimeoutMs: 8000, - navigationTimeoutMs: 20000, - }, - url: postsUrl, - cdpUrl, - cookies, - userDataDir, - parseResponse: parseCompanyUpdates, - checkAuth: async (page) => { - const url = page.url(); - return !url.includes('/login') && !url.includes('/authwall'); - }, - }); - - const posts = filterPostsSinceCheckpoint(result.items, checkpoint); - - const events: EventEnvelope[] = posts.map((post) => ({ - origin_id: `li_post_${post.id}`, - payload_text: post.text, - author_name: post.author, - occurred_at: post.publishedAt, - origin_type: 'post', - source_url: `https://www.linkedin.com/feed/update/urn:li:activity:${post.id}`, - score: calculateEngagementScore('linkedin', { - upvotes: post.likes, - reply_count: post.comments, - }), - metadata: { - author_headline: post.authorHeadline, - likes: post.likes, - comments: post.comments, - shares: post.shares, - }, - })); - - events.sort((a, b) => new Date(b.occurred_at).getTime() - new Date(a.occurred_at).getTime()); - - return { - events, - checkpoint: { - last_post_id: posts[0]?.id ?? checkpoint.last_post_id, - last_timestamp: events[0]?.occurred_at?.toISOString?.() ?? checkpoint.last_timestamp, - } as unknown as Record, - auth_update: { cookies: result.cookies }, - metadata: { - items_found: events.length, - items_skipped: result.items.length - posts.length, - api_calls: result.apiCallCount, - }, - }; - } - - private async syncJobs( - baseUrl: string, - cookies: any[], - maxScrolls: number, - checkpoint: LinkedInCheckpoint, - userDataDir: string | undefined, - cdpUrl: string | 'auto' - ): Promise { - const jobsUrl = `${baseUrl}/jobs/`; - - const result = await browserNetworkSync({ - config: { - interceptPatterns: [ - /voyager\/api\/graphql.*jobPosting/i, - /voyager\/api\/search\/dash\/.*jobs/i, - /voyager\/api\/organization\/.*jobs/i, - ], - authDomains: ['linkedin.com', '.linkedin.com', '.www.linkedin.com'], - stealth: true, - maxScrolls, - scrollDelayMs: 3000, - responseTimeoutMs: 8000, - navigationTimeoutMs: 20000, - }, - url: jobsUrl, - cdpUrl, - cookies, - userDataDir, - parseResponse: parseJobListings, - checkAuth: async (page) => { - const url = page.url(); - return !url.includes('/login') && !url.includes('/authwall'); - }, - }); - - // Deduplicate - const seenIds = new Set(); - const jobs = result.items.filter((j) => { - if (!j.id || seenIds.has(j.id)) return false; - seenIds.add(j.id); - return true; - }); - - jobs.sort((a, b) => b.postedAt.getTime() - a.postedAt.getTime()); - - const events: EventEnvelope[] = jobs.map((job) => ({ - origin_id: `li_job_${job.id}`, - payload_text: job.description ?? job.title, - title: job.title, - occurred_at: job.postedAt, - origin_type: 'job_posting', - source_url: job.url, - metadata: { - location: job.location, - }, - })); - - return { - events, - checkpoint: { - last_job_id: jobs[0]?.id ?? checkpoint.last_job_id, - last_timestamp: jobs[0]?.postedAt?.toISOString?.() ?? checkpoint.last_timestamp, - } as unknown as Record, - auth_update: { cookies: result.cookies }, - metadata: { - items_found: events.length, - api_calls: result.apiCallCount, - }, - }; - } } diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index e6785eda5..6e0ff3f75 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -791,6 +791,10 @@ app.post('/api/workers/heartbeat', heartbeat); app.post('/api/workers/stream', streamContent); app.post('/api/workers/complete', completeWorkerJob); app.post('/api/workers/complete-action', completeActionRun); +// Bridge that lets connector-worker fleets dispatch chrome connector actions +// against a paired Owletto extension. See dispatch-chrome-action.ts. +import { dispatchChromeAction } from './worker-api/dispatch-chrome-action'; +app.post('/api/workers/dispatch-chrome-action', dispatchChromeAction); app.post('/api/workers/complete-embeddings', completeEmbeddings); app.post('/api/workers/me/runs/:runId/complete-watcher', completeWatcherRun); app.post('/api/workers/me/watchers/:watcher_id/trigger', triggerWatcherForDevice); diff --git a/packages/server/src/tools/admin/manage_operations.ts b/packages/server/src/tools/admin/manage_operations.ts index 8d84dd9c7..a8bc05c8f 100644 --- a/packages/server/src/tools/admin/manage_operations.ts +++ b/packages/server/src/tools/admin/manage_operations.ts @@ -482,7 +482,7 @@ async function handleListAvailable( // 20-30s) could exhaust a flat-100s deadline before the worker even // claimed the run, marking it timeout while the worker was about to // pick it up. -async function waitForDeviceActionRun( +export async function waitForDeviceActionRun( runId: number, organizationId: string ): Promise<{ diff --git a/packages/server/src/worker-api/dispatch-chrome-action.ts b/packages/server/src/worker-api/dispatch-chrome-action.ts new file mode 100644 index 000000000..562a424a8 --- /dev/null +++ b/packages/server/src/worker-api/dispatch-chrome-action.ts @@ -0,0 +1,168 @@ +/** + * POST /api/workers/dispatch-chrome-action + * + * Thin bridge: a connector running on the connector-worker fleet wants to + * call a chrome connector action against the paired Owletto extension in + * the same org. We: + * + * 1. Look up the parent sync run's org from runs. + * 2. Pick an online chrome connection in that org. + * 3. Enqueue an action run via `createConnectorOperationRun` (the same + * helper `manage_operations.execute` uses for device-bound calls). + * 4. Await completion via the shared `waitForDeviceActionRun` (also + * reused from manage_operations). + * 5. Return the action_output. + * + * No new state machine, no new queue — the device-action runs queue does + * the work end-to-end, the same way the user drove it manually via + * `POST /api/{org}/manage_operations { action: 'execute' }` earlier today. + * + * Multi-replica safe by reuse: all signalling is via Postgres rows on the + * `runs` table; the chrome extension's `/api/workers/complete-action` POST + * can land on any replica and finalize the run row. + */ + +import type { Context } from 'hono'; +import { getDb } from '../db/client'; +import type { Env } from '../index'; +import { waitForDeviceActionRun } from '../tools/admin/manage_operations'; +import { errorMessage } from '../utils/errors'; +import logger from '../utils/logger'; +import { createConnectorOperationRun } from '../utils/queue-helpers'; + +interface DispatchChromeActionBody { + parent_run_id: number; + worker_id: string; + action_key: string; + action_input: Record; +} + +// Online window for chrome extension device workers, in minutes. Matches +// the /api/me/devices "online" flag. +const DEVICE_ONLINE_WINDOW_MINUTES = 20; + +export async function dispatchChromeAction(c: Context<{ Bindings: Env }>) { + let body: DispatchChromeActionBody; + try { + body = await c.req.json(); + } catch { + return c.json({ error: 'Invalid JSON body' }, 400); + } + + if (typeof body.parent_run_id !== 'number' || !body.parent_run_id) { + return c.json({ error: 'parent_run_id is required' }, 400); + } + if (!body.worker_id?.trim()) { + return c.json({ error: 'worker_id is required' }, 400); + } + if (!body.action_key?.trim()) { + return c.json({ error: 'action_key is required' }, 400); + } + + const sql = getDb(); + + // (1) Authorize: parent run must exist, be a running sync claimed by + // this worker. We don't re-gate on workerAuthMode — the parent claim + // already gated org access; trusted callers (WORKER_API_TOKEN) are + // server-side fleets that already have full access. + const parentRows = (await sql` + SELECT r.organization_id, r.status, r.claimed_by, r.run_type + FROM runs r + WHERE r.id = ${body.parent_run_id} + LIMIT 1 + `) as Array<{ + organization_id: string; + status: string; + claimed_by: string | null; + run_type: string; + }>; + if (parentRows.length === 0) { + return c.json({ error: 'parent_run not found' }, 404); + } + const parentRun = parentRows[0]; + if (parentRun.status !== 'running') { + return c.json( + { error: `parent_run is ${parentRun.status}, must be running` }, + 409 + ); + } + if (parentRun.claimed_by !== body.worker_id) { + return c.json({ error: 'parent_run is not claimed by this worker' }, 403); + } + if (parentRun.run_type !== 'sync') { + return c.json( + { error: `parent_run must be a sync run, got ${parentRun.run_type}` }, + 400 + ); + } + const organizationId = parentRun.organization_id; + + // (2) Pick an online chrome connection in this org. + const chromeConnectionRows = (await sql` + SELECT + con.id AS connection_id, + con.device_worker_id, + dw.last_seen_at + FROM connections con + JOIN device_workers dw ON dw.id = con.device_worker_id + WHERE con.organization_id = ${organizationId} + AND con.connector_key = 'chrome' + AND con.status = 'active' + AND con.deleted_at IS NULL + AND dw.capabilities::jsonb @> '["browser.debugger"]'::jsonb + AND dw.last_seen_at > now() - make_interval(mins => ${DEVICE_ONLINE_WINDOW_MINUTES}) + ORDER BY dw.last_seen_at DESC + LIMIT 1 + `) as Array<{ + connection_id: number; + device_worker_id: string; + last_seen_at: Date | string; + }>; + + if (chromeConnectionRows.length === 0) { + return c.json({ + status: 'failed', + error_message: + 'No online paired Owletto Chrome extension in this organization. Pair a Chrome extension first (and make sure it is running).', + }); + } + const chromeConnection = chromeConnectionRows[0]; + + // (3) Insert a device-bound chrome connector action run. Same helper + // manage_operations.execute uses for device-bound calls. + let runId: number; + try { + runId = await createConnectorOperationRun({ + organizationId, + connectionId: chromeConnection.connection_id, + connectorKey: 'chrome', + operationKey: body.action_key, + operationInput: body.action_input ?? {}, + approvalMode: 'device', + requireCompiledCode: false, + }); + } catch (err) { + const msg = errorMessage(err); + logger.error( + { err: msg, parent_run_id: body.parent_run_id, action_key: body.action_key }, + '[dispatchChromeAction] createConnectorOperationRun failed' + ); + return c.json({ status: 'failed', error_message: msg }); + } + + logger.info( + { + run_id: runId, + parent_run_id: body.parent_run_id, + action_key: body.action_key, + chrome_connection_id: chromeConnection.connection_id, + device_worker_id: chromeConnection.device_worker_id, + }, + '[dispatchChromeAction] dispatched' + ); + + // (4) Wait for the chrome extension to claim and complete. Shared with + // manage_operations.execute's device-bound path. + const result = await waitForDeviceActionRun(runId, organizationId); + return c.json(result); +} From 7235e53eb1667d8c39fe7fd7b63287eb2d4640af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Thu, 28 May 2026 18:07:39 +0100 Subject: [PATCH 3/6] fix: address pi review blockers on PR #1132 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reorder extensionNetworkSync to start interception BEFORE navigating to the target URL. Open a scratch tab at about:blank → start CDP Network listener → navigate to opts.url. Previously navigate ran first, so initial Voyager / GraphQL XHRs that landed during page render completed before start() attached and were silently lost (pi blocker #1 on PR #1132). - Plumb opts.config.allowedOrigins through every dispatched chrome action's action_input.allowed_origins. The chrome extension's per-run ctx (apps/chrome/background.js) pulls allowed_origins off run.config or action_input and enforces it in tools.js / network-intercept.js. Without this the dispatched runs landed with an empty allowlist (permissive), defeating the connector author's origin gate (pi blocker #2). - LinkedIn now sets allowedOrigins: ['linkedin.com', '*.linkedin.com'] on both feeds. - Add a 240s hard ceiling to child-runner pendingDispatchWaiters and reject on IPC send failure so a wedged daemon / disconnected IPC channel can't leave sync() hanging indefinitely (pi suggested follow-up). Matches the gateway-side QUEUE_BUDGET_MS + POST_CLAIM_BUDGET_MS (60s + 120s) plus buffer. Verification: - bun test packages/connector-sdk/src/__tests__/extension-network.test.ts: 6 pass / 0 fail. - bun test packages/connector-worker: 60 pass / 0 fail. - make typecheck: clean. The 6 existing extension-network tests still describe the about:blank → start → navigate flow correctly because the stub dispatcher returns the same current_url on every navigate call (regardless of which URL was passed). --- .../connector-sdk/src/extension-network.ts | 88 +++++++++++++------ .../src/executor/child-runner.ts | 38 +++++++- packages/connectors/src/linkedin.ts | 2 + 3 files changed, 101 insertions(+), 27 deletions(-) diff --git a/packages/connector-sdk/src/extension-network.ts b/packages/connector-sdk/src/extension-network.ts index 631c48919..3cf6bb7cb 100644 --- a/packages/connector-sdk/src/extension-network.ts +++ b/packages/connector-sdk/src/extension-network.ts @@ -104,6 +104,19 @@ export interface ExtensionNetworkConfig { maxBufferResponses?: number; /** Per-response body cap. Default 1 MiB. */ maxBodyBytes?: number; + /** + * Origins the dispatched chrome actions are allowed to touch. Each entry + * is either an exact host (`linkedin.com`) or a wildcard (`*.linkedin.com`, + * `linkedin.com/*`); see apps/chrome/tools.js / network-intercept.js + * `urlHostInAllowlist` / `enforceAllowedOriginFromTab`. + * + * Forwarded on every dispatched action's `action_input.allowed_origins`, + * mirroring how the chrome extension's per-run ctx normally pulls them + * off `run.config.allowed_origins`. When omitted, the extension's gate + * defaults to permissive — set this from every connector for defense in + * depth. + */ + allowedOrigins?: string[]; } const DEFAULT_CONFIG = { @@ -154,33 +167,33 @@ export async function extensionNetworkSync(opts: { const cfg = { ...DEFAULT_CONFIG, ...opts.config }; const items: TItem[] = []; let apiCallCount = 0; + // Threaded through every dispatched action's input so the extension's + // per-run allowedOrigins gate (apps/chrome/background.js reads from + // run.config or action_input) blocks anything off-host. Omitted from the + // payload when the caller didn't set it — the extension treats an empty + // array as permissive. + const allowedOriginsInput = opts.config.allowedOrigins + ? { allowed_origins: opts.config.allowedOrigins } + : {}; - // 1. navigate into a fresh background tab. - const navObs = await opts.dispatcher.dispatch('navigate', { - url: opts.url, + // 1. Open an about:blank tab WITHOUT navigating yet. We need the Network + // domain listener live BEFORE the page starts loading — otherwise the + // first batch of Voyager XHRs the page fires during initial render + // completes before our start() listener attaches and we miss them. Pi + // review caught this; LinkedIn was the canonical case. + const blankNavObs = await opts.dispatcher.dispatch('navigate', { + url: 'about:blank', open_in_new_tab: true, wait_for_load: true, + ...allowedOriginsInput, }); - const tabId = navObs.tab_id; - sdkLogger.info( - { tabId, currentUrl: navObs.current_url }, - '[ExtensionNetwork] navigated' - ); - - if (opts.checkAuth && !opts.checkAuth(navObs.current_url)) { - await safeStop(opts.dispatcher, null); - await safeCloseTab(opts.dispatcher, tabId); - throw new Error( - 'extensionNetworkSync: auth check failed — Chrome session is not logged in to this site' - ); - } + const tabId = blankNavObs.tab_id; + sdkLogger.info({ tabId }, '[ExtensionNetwork] opened scratch tab'); - // 2. start intercept BEFORE any scroll so the first batch of responses - // (the page's initial XHR/GraphQL calls during render) is captured. The - // extension's start() attaches the Network domain synchronously; any - // request that's already in-flight when start() returns will be seen. - // Anything that finished before start() is lost — same as the Playwright - // page.on('response') lifecycle. + // 2. Start the intercept on the empty tab BEFORE the real navigation + // happens, so every response fired during the page's initial render is + // captured. Anything that landed before start() is lost — and since the + // tab is at about:blank, nothing has landed yet. const startObs = await opts.dispatcher.dispatch( 'network_intercept_start', { @@ -188,16 +201,37 @@ export async function extensionNetworkSync(opts: { patterns: opts.config.interceptPatterns, max_buffer_responses: cfg.maxBufferResponses, max_body_bytes: cfg.maxBodyBytes, + ...allowedOriginsInput, } ); const sessionId = startObs.session_id; try { - // 3. give the initial render a chance to fire its XHRs, then drain. + // 3. Now navigate to the real URL. Initial XHRs land into the live + // buffer. + const navObs = await opts.dispatcher.dispatch('navigate', { + tab_id: tabId, + url: opts.url, + open_in_new_tab: false, + wait_for_load: true, + ...allowedOriginsInput, + }); + sdkLogger.info( + { tabId, currentUrl: navObs.current_url }, + '[ExtensionNetwork] navigated' + ); + + if (opts.checkAuth && !opts.checkAuth(navObs.current_url)) { + throw new Error( + 'extensionNetworkSync: auth check failed — Chrome session is not logged in to this site' + ); + } + + // 4. give the initial render a chance to fire its XHRs, then drain. await sleep(cfg.responseTimeoutMs); apiCallCount += await drainInto(items, opts, sessionId); - // 4. scroll loop. Each iteration: trigger pagination, wait, drain. + // 5. scroll loop. Each iteration: trigger pagination, wait, drain. let prev = items.length; for (let n = 0; n < cfg.maxScrolls; n++) { const trigger = @@ -207,6 +241,7 @@ export async function extensionNetworkSync(opts: { tab_id: tid, expression: 'window.scrollTo(0, document.documentElement.scrollHeight); 1', + ...allowedOriginsInput, }); }); await trigger(tabId, opts.dispatcher); @@ -241,7 +276,7 @@ export async function extensionNetworkSync(opts: { ): Promise { const drained = await o.dispatcher.dispatch( 'network_intercept_drain', - { session_id: sid } + { session_id: sid, ...allowedOriginsInput } ); let calls = 0; for (const resp of drained.responses ?? []) { @@ -278,6 +313,9 @@ async function safeStop(dispatcher: ChromeActionDispatcher, sessionId: string | async function safeCloseTab(dispatcher: ChromeActionDispatcher, tabId: number) { try { + // close_tab is intentionally not gated by allowedOrigins on the extension + // side (a tab the connector opened is owned by the connector regardless + // of where it ended up), so we don't need to forward the allowlist here. await dispatcher.dispatch('close_tab', { tab_id: tabId }); } catch (err) { sdkLogger.warn({ err, tabId }, '[ExtensionNetwork] close_tab failed (already gone?)'); diff --git a/packages/connector-worker/src/executor/child-runner.ts b/packages/connector-worker/src/executor/child-runner.ts index 319424802..ee7cf48b5 100644 --- a/packages/connector-worker/src/executor/child-runner.ts +++ b/packages/connector-worker/src/executor/child-runner.ts @@ -65,18 +65,52 @@ const pendingDispatchWaiters = new Map< { resolve: (v: Record) => void; reject: (e: Error) => void } >(); +// Hard ceiling per dispatch. The gateway bridge itself caps at +// QUEUE_BUDGET_MS (60s) + POST_CLAIM_BUDGET_MS (120s) = 180s, plus a small +// buffer for HTTP round-trip. We give the child 240s before forcibly +// rejecting so a wedged daemon/IPC channel can't leave sync() hanging +// indefinitely. Caught by pi review of #1132. +const CHROME_DISPATCH_HARD_TIMEOUT_MS = 240_000; + function dispatchChromeAction( actionKey: string, actionInput: Record ): Promise> { return new Promise>((resolve, reject) => { const requestId = nextDispatchRequestId++; - pendingDispatchWaiters.set(requestId, { resolve, reject }); - void sendIPC({ + const timer = setTimeout(() => { + if (pendingDispatchWaiters.delete(requestId)) { + reject( + new Error( + `chrome_dispatcher.dispatch('${actionKey}') exceeded ${CHROME_DISPATCH_HARD_TIMEOUT_MS}ms; IPC may be wedged` + ) + ); + } + }, CHROME_DISPATCH_HARD_TIMEOUT_MS); + pendingDispatchWaiters.set(requestId, { + resolve: (v) => { + clearTimeout(timer); + resolve(v); + }, + reject: (e) => { + clearTimeout(timer); + reject(e); + }, + }); + sendIPC({ type: 'chrome_dispatch_request', requestId, actionKey, actionInput, + }).catch((err: unknown) => { + if (pendingDispatchWaiters.delete(requestId)) { + clearTimeout(timer); + reject( + new Error( + `chrome_dispatcher.dispatch('${actionKey}') IPC send failed: ${err instanceof Error ? err.message : String(err)}` + ) + ); + } }); }); } diff --git a/packages/connectors/src/linkedin.ts b/packages/connectors/src/linkedin.ts index 142c52679..d9dd167ec 100644 --- a/packages/connectors/src/linkedin.ts +++ b/packages/connectors/src/linkedin.ts @@ -361,6 +361,7 @@ export default class LinkedInConnector extends ConnectorRuntime { { regex: 'voyager/api/graphql\\?variables=.*ORGANIZATION_MEMBER_FEED' }, { regex: 'voyager/api/graphql\\?variables=.*organizationalPageUrn' }, ], + allowedOrigins: ['linkedin.com', '*.linkedin.com'], maxScrolls, scrollDelayMs: 3000, responseTimeoutMs: 8000, @@ -426,6 +427,7 @@ export default class LinkedInConnector extends ConnectorRuntime { { regex: 'voyager/api/search/dash/.*jobs', flags: 'i' }, { regex: 'voyager/api/organization/.*jobs', flags: 'i' }, ], + allowedOrigins: ['linkedin.com', '*.linkedin.com'], maxScrolls, scrollDelayMs: 3000, responseTimeoutMs: 8000, From 65e6bd451d56b21e31ead368ecfa3fa37608755f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Thu, 28 May 2026 18:25:35 +0100 Subject: [PATCH 4/6] chore: bump owletto pointer for about:blank allowedOrigins fix (pi v2 blocker on #1132) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit owletto 7d3b5ed4 fixes the pi v2 blocker on this PR: 'LinkedIn extension sync is blocked before navigation: extensionNetworkSync passes allowed_origins while opening/starting interception on about:blank, but Owletto rejects about:blank as outside the allowlist.' Carve-out about:blank as always allowed in: - apps/chrome/tools.js enforceAllowedOrigin() (covers tool_navigate + enforceAllowedOriginFromTab via delegation) - apps/chrome/network-intercept.js enforceTabUrlAgainstAllowedOrigins() (covers tool_network_intercept_start on the scratch tab) Per-response listener inside start() still enforces allowedOrigins on every captured response URL, so the carve-out is bare-tab-attach only. owletto regression test: apps/chrome/network-intercept.test.js 'allows start on about:blank with allowedOrigins set, then filters captures on the real URL' → drives start-on-blank → fires CDP events → asserts only the allowed-host response made it through. 19 network-intercept tests pass / 0 fail (was 18 / 0). --- packages/owletto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/owletto b/packages/owletto index 6b51be054..7d3b5ed4f 160000 --- a/packages/owletto +++ b/packages/owletto @@ -1 +1 @@ -Subproject commit 6b51be054b5b931de3a74aa40588d975b76764fc +Subproject commit 7d3b5ed4f55239b0a0744a4618576e2ba3e4c2ca From fc168609d147cf6c63096bbc2e6ebbfc71e3543f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Thu, 28 May 2026 18:41:20 +0100 Subject: [PATCH 5/6] =?UTF-8?q?fix:=20address=20pi=20v3=20blocker=20on=20#?= =?UTF-8?q?1132=20=E2=80=94=20refcounted=20debugger=20lease=20+=20safer=20?= =?UTF-8?q?start=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pi v3 review (verdict bug_free 30, 1 blocker): 'network_intercept.js:199 attaches CDP without updating tools.js debugger ownership; extensionNetworkSync's next navigate action re-attaches the same tab via withDebugger and will fail before LinkedIn sync can load the real page.' + 'Wrap network_intercept_start in the cleanup try/finally too: initialize sessionId to null before start, conditionally stop when set, and always close the opened tab if start throws.' Both addressed: owletto (submodule bump → 6ab0f54): introduce a shared refcounted debugger-lease module (apps/chrome/debugger-lease.js) that withDebugger (action tools) and tool_network_intercept_start/stop both hold leases on. Physical chrome.debugger.attach happens on the 0→1 transition; detach on 1→0. Overlapping owners just bump the count, so extensionNetworkSync's navigate → start → navigate(real URL) flow no longer hits 'Another debugger is already attached'. +2 regression tests on the lease interplay (51 owletto tests pass, was 49). connector-sdk (extension-network.ts): restructure the start+navigate+drain into a single try/finally where sessionId starts null and is set on successful start(). safeStop(null) is a no-op (existing), and safeCloseTab always runs in the finally — so a thrown network_intercept_start now closes the about:blank scratch tab instead of leaking it. 6 sdk tests still pass. Verification: - bun test packages/owletto/apps/chrome → 51 pass / 0 fail - bun test packages/connector-sdk/src/__tests__/extension-network.test.ts → 6 / 0 - make typecheck → clean --- .../connector-sdk/src/extension-network.ts | 44 ++++++++++++------- packages/owletto | 2 +- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/packages/connector-sdk/src/extension-network.ts b/packages/connector-sdk/src/extension-network.ts index 3cf6bb7cb..d3058cd4e 100644 --- a/packages/connector-sdk/src/extension-network.ts +++ b/packages/connector-sdk/src/extension-network.ts @@ -190,23 +190,33 @@ export async function extensionNetworkSync(opts: { const tabId = blankNavObs.tab_id; sdkLogger.info({ tabId }, '[ExtensionNetwork] opened scratch tab'); - // 2. Start the intercept on the empty tab BEFORE the real navigation - // happens, so every response fired during the page's initial render is - // captured. Anything that landed before start() is lost — and since the - // tab is at about:blank, nothing has landed yet. - const startObs = await opts.dispatcher.dispatch( - 'network_intercept_start', - { - tab_id: tabId, - patterns: opts.config.interceptPatterns, - max_buffer_responses: cfg.maxBufferResponses, - max_body_bytes: cfg.maxBodyBytes, - ...allowedOriginsInput, - } - ); - const sessionId = startObs.session_id; + // sessionId is set once network_intercept_start returns. The cleanup + // block below stops the session iff it's set (so a thrown start() still + // closes the tab without trying to stop a never-started session). Pi v2 + // suggested fix. + let sessionId: string | null = null; try { + // 2. Start the intercept on the empty tab BEFORE the real navigation + // happens, so every response fired during the page's initial render is + // captured. Anything that landed before start() is lost — and since the + // tab is at about:blank, nothing has landed yet. + const startObs = await opts.dispatcher.dispatch( + 'network_intercept_start', + { + tab_id: tabId, + patterns: opts.config.interceptPatterns, + max_buffer_responses: cfg.maxBufferResponses, + max_body_bytes: cfg.maxBodyBytes, + ...allowedOriginsInput, + } + ); + sessionId = startObs.session_id; + // Capture the just-set session id in a non-nullable local so the typed + // drainInto calls below don't have to wrestle with the let-binding + // type. The outer sessionId variable stays nullable for the cleanup. + const liveSessionId: string = sessionId; + // 3. Now navigate to the real URL. Initial XHRs land into the live // buffer. const navObs = await opts.dispatcher.dispatch('navigate', { @@ -229,7 +239,7 @@ export async function extensionNetworkSync(opts: { // 4. give the initial render a chance to fire its XHRs, then drain. await sleep(cfg.responseTimeoutMs); - apiCallCount += await drainInto(items, opts, sessionId); + apiCallCount += await drainInto(items, opts, liveSessionId); // 5. scroll loop. Each iteration: trigger pagination, wait, drain. let prev = items.length; @@ -246,7 +256,7 @@ export async function extensionNetworkSync(opts: { }); await trigger(tabId, opts.dispatcher); await sleep(cfg.scrollDelayMs); - apiCallCount += await drainInto(items, opts, sessionId); + apiCallCount += await drainInto(items, opts, liveSessionId); if (items.length === prev) { sdkLogger.info( diff --git a/packages/owletto b/packages/owletto index 7d3b5ed4f..6ab0f5459 160000 --- a/packages/owletto +++ b/packages/owletto @@ -1 +1 @@ -Subproject commit 7d3b5ed4f55239b0a0744a4618576e2ba3e4c2ca +Subproject commit 6ab0f5459598e76370e17d4a79ea1b74bda919a3 From 314c13fee289c3ae2d9a4ab8245e9e7b62307432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Thu, 28 May 2026 18:57:53 +0100 Subject: [PATCH 6/6] docs: tighten extension-network.ts comments (pi v4 non-blocking nits) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace stale 'Non-goal' header that referenced a per-connector use_extension config flag — that flag is gone (LinkedIn is fully on the extension path; Revolut + X still use browserNetworkSync, no flag needed). Reword to describe the actual migration scope. - Drop 'Pi review caught this' meta from the navigate-before-start comment; keep only the technical rationale. Pi v4 verdict was already bug_free 78 / 0 blockers; these were the two non-blocking suggested_fixes. --- packages/connector-sdk/src/extension-network.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/connector-sdk/src/extension-network.ts b/packages/connector-sdk/src/extension-network.ts index d3058cd4e..50244c6b6 100644 --- a/packages/connector-sdk/src/extension-network.ts +++ b/packages/connector-sdk/src/extension-network.ts @@ -20,8 +20,9 @@ * need to know how the run is routed (sync vs queued) — they just await * the dispatcher. * - * Non-goal: this does NOT replace `browserNetworkSync` yet. The two - * coexist while we migrate, gated per connector via a config flag. + * Migration scope: LinkedIn is fully on this path (no Playwright + * fallback). Revolut and X still use `browserNetworkSync`; we keep that + * helper alive for them and drop it once every consumer has migrated. */ import { sdkLogger } from './logger.js'; @@ -176,11 +177,10 @@ export async function extensionNetworkSync(opts: { ? { allowed_origins: opts.config.allowedOrigins } : {}; - // 1. Open an about:blank tab WITHOUT navigating yet. We need the Network - // domain listener live BEFORE the page starts loading — otherwise the + // 1. Open an about:blank tab WITHOUT navigating yet. The Network domain + // listener must be live BEFORE the page starts loading — otherwise the // first batch of Voyager XHRs the page fires during initial render - // completes before our start() listener attaches and we miss them. Pi - // review caught this; LinkedIn was the canonical case. + // completes before our start() listener attaches and we miss them. const blankNavObs = await opts.dispatcher.dispatch('navigate', { url: 'about:blank', open_in_new_tab: true,