Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,21 @@ jobs:
run: bun install

- name: Build packages server depends on
# embeddings → connector-worker order matters: connector-worker's
# src/embeddings.ts imports `@lobu/embeddings` and resolves it via
# the workspace symlink → dist. connector-worker itself is required
# because `packages/server/src/utils/connector-catalog.ts` imports
# `@lobu/connector-worker/compile` (the shared compile pipeline),
# which resolves via the package's `exports` field to
# `./dist/compile/index.js`. Vitest's vite-node resolver follows
# the `import` condition and fails to load if dist is absent —
# which transitively breaks every integration file whose import
# graph touches `queue-helpers` / `worker-api` / `connector-catalog`.
run: |
cd packages/core && bun run build && cd ../..
cd packages/connector-sdk && bun run build && cd ../..
cd packages/embeddings && bun run build && cd ../..
cd packages/connector-worker && bun run build && cd ../..

- name: Verify Postgres health (fail fast if pgvector setup is broken)
run: |
Expand Down
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

116 changes: 12 additions & 104 deletions packages/cli/src/commands/_lib/connector-loader.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,22 @@
/**
* Connector source resolution + compilation for the CLI.
*
* Mirrors `packages/server/src/utils/connector-catalog.ts` (the server-side
* versions of these helpers) but inlined here so the CLI doesn't depend on
* @lobu/server — that package is private and never published, while the CLI
* is what end users install from npm.
*
* Lookup order for bundled connector source:
* 1. dist/connectors/ next to this file (published CLI runtime — see
* packages/cli/scripts/build.cjs which copies packages/connectors/src
* there at build time).
* 2. ../../../connectors/src relative to this file (monorepo dev).
* 3. process.cwd()/packages/connectors/src (running from a parent dir).
*
* The resolver + esbuild bundle pipeline themselves live in
* `@lobu/connector-worker/compile` so the three call-sites (worker, CLI,
* server) share one implementation.
*/
import { existsSync } from "node:fs";
import { mkdtemp, readFile, rm, stat } from "node:fs/promises";
import { createRequire } from "node:module";
import { tmpdir } from "node:os";
import { join, resolve } from "node:path";
import { build, type Plugin } from "esbuild";

const require_ = createRequire(import.meta.url);
const SDK_ENTRY = require_.resolve("@lobu/connector-sdk");

// Single source of truth lives in @lobu/connector-worker. Duplicated here as
// a flat string array to keep this file self-contained — drift cost is low
// (the list is tiny and rarely changes; if a new external dep is added the
// CLI surfaces a clear bundling error and we update both places).
const EXTERNAL_RUNTIME_DEPS = ["playwright", "sharp", "jimp"] as const;
import { resolve } from "node:path";
import {
createConnectorCompiler,
findBundledConnectorFile as findInDirs,
} from "@lobu/connector-worker/compile";

const SOURCE_DIR_CANDIDATES = [
// Published CLI runtime: packages/cli/scripts/build.cjs copies the
Expand All @@ -44,92 +33,11 @@ const bundledFileCache = new Map<string, string | null>();
export function findBundledConnectorFile(key: string): string | null {
const cached = bundledFileCache.get(key);
if (cached !== undefined) return cached;
// Mirror the resolver in @lobu/connector-worker's compile-connector.ts:
// subdir layout (`browser.evaluate` → `browser/evaluate.ts`) first, then
// the flat underscore convention (`chrome.tabs` → `chrome_tabs.ts`).
const candidates = [
`${key.replace(/\./g, "/")}.ts`,
`${key.replace(/\./g, "_")}.ts`,
];
let found: string | null = null;
outer: for (const dir of SOURCE_DIR_CANDIDATES) {
for (const fileName of candidates) {
const filePath = resolve(dir, fileName);
if (!filePath.startsWith(`${dir}/`)) continue;
if (existsSync(filePath)) {
found = filePath;
break outer;
}
}
}
const found = findInDirs(key, SOURCE_DIR_CANDIDATES);
bundledFileCache.set(key, found);
return found;
}

// Connectors import npm deps with the `npm:` prefix. Strip it so esbuild
// resolves the bare specifier against the local node_modules; mark unresolved
// ones external so the CLI bundle still produces (the runtime will fail loud
// if a missing dep is actually used).
const npmSpecifierPlugin: Plugin = {
name: "npm-specifier",
setup(b) {
b.onResolve({ filter: /^npm:/ }, async (args) => {
const bare = args.path
.slice(4)
.replace(/^(@[^/]+\/[^/@]+)@[^/]*/, "$1")
.replace(/^([^/@]+)@[^/]*/, "$1");
const resolved = await b.resolve(bare, {
resolveDir: args.resolveDir,
kind: args.kind,
});
if (resolved.errors.length > 0) {
return { path: bare, external: true, errors: [], warnings: [] };
}
return resolved;
});
},
};

const compiledFileCache = new Map<string, { mtimeMs: number; code: string }>();
const compiler = createConnectorCompiler();

export async function compileConnectorFromFile(
filePath: string
): Promise<string> {
let mtimeMs: number | null = null;
try {
mtimeMs = (await stat(filePath)).mtimeMs;
const cached = compiledFileCache.get(filePath);
if (cached && cached.mtimeMs === mtimeMs) return cached.code;
} catch {
// stat failed — fall through and let the build surface the real error.
}

const tmpDir = await mkdtemp(join(tmpdir(), "lobu-cli-connector-"));
const outPath = join(tmpDir, "out.mjs");

try {
await build({
entryPoints: [filePath],
outfile: outPath,
bundle: true,
format: "esm",
platform: "node",
target: "node20",
alias: { lobu: SDK_ENTRY, "@lobu/connector-sdk": SDK_ENTRY },
banner: {
js: "import { createRequire as __createRequire } from 'module'; const require = __createRequire(import.meta.url);",
},
plugins: [npmSpecifierPlugin],
external: [...EXTERNAL_RUNTIME_DEPS],
write: true,
minify: false,
sourcemap: false,
});

const code = await readFile(outPath, "utf-8");
if (mtimeMs !== null) compiledFileCache.set(filePath, { mtimeMs, code });
return code;
} finally {
await rm(tmpDir, { recursive: true, force: true });
}
}
export const compileConnectorFromFile = compiler.compileConnectorFromFile;
53 changes: 30 additions & 23 deletions packages/cli/src/commands/_lib/connector-run-cmd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { homedir } from "node:os";
import { join } from "node:path";
import { printText } from "../memory/_lib/output.js";

import type { EventEnvelope } from "@lobu/connector-sdk";
import { resolveContext } from "../../internal/context.js";
import { getUsableToken, resolveOrg } from "../memory/_lib/openclaw-auth.js";
import {
Expand Down Expand Up @@ -340,7 +341,7 @@ export async function connectorRun(
printText(`Compiling ${connectorKey} from ${sourcePath}...`);
const compiledCode = await compileConnectorFromFile(sourcePath);

// Build the SyncContext shape that executeCompiledConnector expects.
// Build the ExecutorJob shape that executeCompiledConnector expects.
// For mirror profiles we layer two acquisition paths:
// 1. DevToolsActivePort lookup against the source Chrome's
// user-data root. If the file is there, Chrome is exposing a
Expand Down Expand Up @@ -422,31 +423,37 @@ export async function connectorRun(
process.once("SIGINT", () => onSignal("SIGINT"));
process.once("SIGTERM", () => onSignal("SIGTERM"));

// Stream progress to stderr (so --json output to stdout stays clean).
let streamedCount = 0;
const onContentChunk = (items: any[]) => {
streamedCount += items.length;
process.stderr.write(` ... ${streamedCount} events so far\n`);
// Sync events stream via the onEventChunk hook now (the executor used to
// collect them into result.contents for us — that path is gone). Collect
// locally so the artifact/--json output still has the full payload.
const collectedEvents: EventEnvelope[] = [];
const onEventChunk = (events: EventEnvelope[]) => {
for (const event of events) collectedEvents.push(event);
process.stderr.write(` ... ${collectedEvents.length} events so far\n`);
};

printText(`Running ${connectorKey} (subprocess executor)...`);
const startMs = Date.now();
const result = await (executeCompiledConnector as any)({
mode: "sync",
const result = await executeCompiledConnector({
compiledCode,
config: mergedConfig,
checkpoint,
env: process.env as Record<string, string | undefined>,
connectionCredentials: {},
sessionState,
credentials: {},
feedKey: feed?.feed_key ?? `${connectorKey}-cli-run`,
entityIds: [],
apiType: "browser" as const,
hooks: { onContentChunk, collectContents: true },
job: {
mode: "sync",
config: mergedConfig,
checkpoint,
env: process.env as Record<string, string | undefined>,
sessionState,
credentials: null,
feedKey: feed?.feed_key ?? `${connectorKey}-cli-run`,
entityIds: [],
},
hooks: { onEventChunk },
});
const durationMs = Date.now() - startMs;

if (result.mode !== "sync") {
throw new Error(`Expected sync result, got mode=${result.mode}`);
}

// Save artifact for debugging / sharing. ~/.lobu/cache/connector-runs/<ts>.json.
const cacheDir = join(homedir(), ".lobu", "cache", "connector-runs");
mkdirSync(cacheDir, { recursive: true });
Expand All @@ -459,9 +466,9 @@ export async function connectorRun(
config: mergedConfig,
input_checkpoint: checkpoint,
duration_ms: durationMs,
event_count: result.contents.length,
event_count: collectedEvents.length,
next_checkpoint: result.checkpoint,
events: result.contents,
events: collectedEvents,
metadata: result.metadata ?? {},
};
await writeFile(artifactPath, JSON.stringify(artifact, null, 2), "utf-8");
Expand All @@ -471,10 +478,10 @@ export async function connectorRun(
} else {
printText("");
printText(`✓ Completed in ${(durationMs / 1000).toFixed(1)}s`);
printText(` Events: ${result.contents.length}`);
if (result.contents.length > 0) {
printText(` Events: ${collectedEvents.length}`);
if (collectedEvents.length > 0) {
printText(" Sample:");
printText(summarizeEvents(result.contents));
printText(summarizeEvents(collectedEvents));
}
if (result.checkpoint) {
printText(` Next checkpoint: ${JSON.stringify(result.checkpoint)}`);
Expand Down
15 changes: 13 additions & 2 deletions packages/connector-sdk/src/connector-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import type {
* Subclasses must:
* - Set `definition` with connector metadata
* - Implement `sync()` for feed data ingestion
* - Implement `execute()` for action execution
*
* Subclasses may optionally override `execute()` and `authenticate()`; both
* have safe defaults (action rejected with `{ success: false, ... }`, auth
* throws). Connectors that don't declare any `actions` in their definition
* need not override `execute()`.
*
* @example
* ```ts
Expand Down Expand Up @@ -59,11 +63,18 @@ export abstract class ConnectorRuntime {
* Execute an action on the connected service.
*
* Called either inline (low-risk) or by the worker (high-risk with approval).
* Default implementation rejects with "Actions not supported" — connectors
* that don't declare any `actions` in their definition need not override.
* The `ctx` parameter is part of the public contract (subclasses overriding
* this method receive the full `ActionContext`); the base impl ignores it.
*
* @param ctx - Action context with action key, input, and credentials
* @returns Action result with output data
*/
abstract execute(ctx: ActionContext): Promise<ActionResult>;
// biome-ignore lint/correctness/noUnusedFunctionParameters: contract signature — subclasses receive the full ActionContext
async execute(ctx: ActionContext): Promise<ActionResult> {
return { success: false, error: 'Actions not supported' };
}

/**
* Run an interactive authentication flow that produces credentials for the
Expand Down
7 changes: 0 additions & 7 deletions packages/connector-sdk/src/connector-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,6 @@ export const IDENTITY = {

export type IdentityNamespace = (typeof IDENTITY)[keyof typeof IDENTITY];

export enum FeedMode {
/** Connector code runs on worker, syncs data */
sync = 'sync',
/** Virtual feed backed by saved queries (future) */
virtual = 'virtual',
}

// =============================================================================
// Action Definition
// =============================================================================
Expand Down
31 changes: 0 additions & 31 deletions packages/connector-sdk/src/event-taxonomy.ts

This file was deleted.

Loading
Loading