From f9f36c8ba520160d73fa2b42b7f9736a165c1621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 13 May 2026 05:48:13 +0100 Subject: [PATCH] =?UTF-8?q?refactor:=20simplification=20sweep=20=E2=80=94?= =?UTF-8?q?=20dead=20code,=20dedup,=20indirection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the low-risk "TOP 3"-style items from a parallel analysis sweep. No behavior change intended; build + typecheck pass. Buckets touched: - core: drop unused `PlatformError`/`SessionError` error classes + tests; drop unused `safeJsonStringify`. - connector-sdk: delete the dead legacy `IFeed` contract + `ParentFeedDefinition`, `SearchResult`, and the whole `FeedAuth*` family (zero refs; connectors use `ConnectorRuntime`/`ConnectorAuthSchema`). Trim the `index.ts` re-exports. - openclaw-plugin: delete the dead-by-default `memory-wiki-compat.ts` spike (~960 LOC) + its `memoryWikiCompat` config branch, manifest `wiki_*`/`memory_*` tool entries, and unit test wiring. - server (gateway): delete the unwired standalone gateway CLI (`gateway/cli/index.ts`, `startGateway`, `loadEnvFile`, `displayGatewayConfig`); drop the dead `FixedWindowRateLimiter`/`getClientIp` from `gateway/utils/rate-limiter.ts` (keep `sweepExpiredRateLimits`); delete the empty `TelegramResponseStrategy` subclass; collapse `resolveSettingsLookupUserId` now that `OAUTH_PLATFORMS` is empty; fold `createTokenVerifier` into `agent-ownership.ts` and delete the one-liner `token-verifier.ts`. - server (connect): drop the `example.invalid` placeholder dance in `oauth-providers.ts` — `resolveProviderConfig` now returns optional URLs and callers null-check the one they use. - server (misc dedup/cleanup): dedupe `pgBigintArray` in test-fixtures; drop the unused `resource` field from the invalidation `events/emitter.ts` shape; collapse the no-op `DEV_FALLBACK_FROM` record in `email/send.ts`. - no-dynamic-imports rule: convert `await import(...)` to static imports in the SDK namespaces (`knowledge`/`entities`/`watchers`), `watchers/reaction-executor.ts`, `server.ts` (lobu gateway / db / scheduled jobs), and `index.ts` (`/health/scheduler` per-request import); hoist `decrypt` import in `postgres-stores.ts`. --- packages/connector-sdk/src/index.ts | 10 - packages/connector-sdk/src/types.ts | 167 --- packages/core/src/__tests__/errors.test.ts | 42 - packages/core/src/errors.ts | 48 - packages/core/src/utils/json.ts | 15 - packages/openclaw-plugin/openclaw.plugin.json | 33 +- packages/openclaw-plugin/src/index.ts | 35 - .../openclaw-plugin/src/memory-wiki-compat.ts | 960 ------------------ packages/openclaw-plugin/src/types.ts | 20 - .../test/unit/manifest-contracts.test.ts | 2 - .../test/unit/memory-wiki-compat.test.ts | 515 ---------- .../src/__tests__/setup/test-fixtures.ts | 7 +- .../server/src/auth/__tests__/config.test.ts | 7 - packages/server/src/auth/config.ts | 7 - .../server/src/connect/oauth-providers.ts | 31 +- packages/server/src/email/send.ts | 7 +- packages/server/src/events/emitter.ts | 2 - .../__tests__/utils/rate-limiter.test.ts | 114 --- packages/server/src/gateway/cli/gateway.ts | 152 +-- packages/server/src/gateway/cli/index.ts | 74 -- packages/server/src/gateway/config/index.ts | 66 -- .../connections/platform-strategies/index.ts | 10 - .../src/gateway/routes/public/agent-config.ts | 2 +- .../gateway/routes/public/agent-history.ts | 2 +- .../src/gateway/routes/public/channels.ts | 2 +- .../gateway/routes/shared/agent-ownership.ts | 39 +- .../gateway/routes/shared/token-verifier.ts | 34 - .../server/src/gateway/utils/rate-limiter.ts | 120 +-- packages/server/src/index.ts | 2 +- .../server/src/lobu/stores/postgres-stores.ts | 24 +- .../server/src/sandbox/namespaces/entities.ts | 4 +- .../src/sandbox/namespaces/knowledge.ts | 16 +- .../server/src/sandbox/namespaces/watchers.ts | 4 +- packages/server/src/server.ts | 13 +- .../server/src/watchers/reaction-executor.ts | 2 +- 35 files changed, 87 insertions(+), 2501 deletions(-) delete mode 100644 packages/openclaw-plugin/src/memory-wiki-compat.ts delete mode 100644 packages/openclaw-plugin/test/unit/memory-wiki-compat.test.ts delete mode 100644 packages/server/src/gateway/__tests__/utils/rate-limiter.test.ts delete mode 100644 packages/server/src/gateway/cli/index.ts delete mode 100644 packages/server/src/gateway/routes/shared/token-verifier.ts diff --git a/packages/connector-sdk/src/index.ts b/packages/connector-sdk/src/index.ts index 85e499886..90dbe2acc 100644 --- a/packages/connector-sdk/src/index.ts +++ b/packages/connector-sdk/src/index.ts @@ -137,17 +137,7 @@ export type { Checkpoint, Content, Env, - FeedAuthBrowserMethod, - FeedAuthEnvField, - FeedAuthEnvKeysMethod, - FeedAuthMethod, - FeedAuthNoneMethod, - FeedAuthOAuthMethod, - FeedAuthSchema, FeedOptions, FeedSyncResult, - IFeed, - ParentFeedDefinition, - SearchResult, SessionState, } from './types.js'; diff --git a/packages/connector-sdk/src/types.ts b/packages/connector-sdk/src/types.ts index 56a86f926..a98d39167 100644 --- a/packages/connector-sdk/src/types.ts +++ b/packages/connector-sdk/src/types.ts @@ -1,5 +1,3 @@ -import type { TObject } from '@sinclair/typebox'; - /** * Checkpoint data structure for tracking feed sync state */ @@ -38,12 +36,6 @@ export interface FeedSyncResult { auth_update?: Record; } -export interface ParentFeedDefinition { - type: string; - options: FeedOptions; - description?: string; -} - /** * Extracted content from platform */ @@ -73,16 +65,6 @@ export interface Content { metadata?: Record; } -/** - * Search result from platform search - */ -export interface SearchResult { - url: string; // Link to the resource (company page, app listing, etc.) - title: string; // Name or title of the result - description: string; // Brief description of the result - metadata?: Record; // Structured config data (e.g., { subreddit: "spotify", content_type: "posts", requires_parent: "posts" }) -} - /** * Feed options passed from MCP tool */ @@ -175,152 +157,3 @@ export interface Env { */ export type SessionState = Record; -/** - * Auth field definition for connector environment keys - */ -export interface FeedAuthEnvField { - key: string; - label?: string; - description?: string; - example?: string; - secret?: boolean; -} - -export interface FeedAuthNoneMethod { - type: 'none'; -} - -export interface FeedAuthEnvKeysMethod { - type: 'env_keys'; - required?: boolean; - scope?: 'connection' | 'organization'; - fields: FeedAuthEnvField[]; - description?: string; -} - -export interface FeedAuthOAuthMethod { - type: 'oauth'; - provider: string; - requiredScopes: string[]; - optionalScopes?: string[]; - required?: boolean; - scope?: 'connection' | 'organization'; - description?: string; - authorizationUrl?: string; - tokenUrl?: string; - userinfoUrl?: string; - authParams?: Record; - tokenEndpointAuthMethod?: 'client_secret_post' | 'client_secret_basic' | 'none'; - usePkce?: boolean; - loginScopes?: string[]; - clientIdKey?: string; - clientSecretKey?: string; - setupInstructions?: string; - loginProvisioning?: { - autoCreateConnection?: boolean; - }; -} - -export interface FeedAuthBrowserMethod { - type: 'browser'; - required?: boolean; - description?: string; - capture?: 'cli'; -} - -export type FeedAuthMethod = - | FeedAuthNoneMethod - | FeedAuthEnvKeysMethod - | FeedAuthOAuthMethod - | FeedAuthBrowserMethod; - -export interface FeedAuthSchema { - methods: FeedAuthMethod[]; -} - -/** - * Main feed interface - */ -export interface IFeed { - /** - * Unique identifier for this feed type - */ - readonly type: string; - - /** - * Human-readable display name for this feed - */ - readonly displayName: string; - - /** - * API type: 'api' for HTTP/REST APIs, 'browser' for browser rendering - */ - readonly apiType: 'api' | 'browser'; - - /** - * Feed mode: 'entity' for platforms with specific pages (repos, subreddits, companies) - * or 'search' for query-based platforms (Hacker News, Twitter search) - */ - readonly feedMode: 'entity' | 'search'; - - /** - * TypeBox schema for validating feed options - */ - readonly optionsSchema: TObject; - - /** - * Default SQL formula to calculate normalized score (0-100) - * Can reference: f.score, f.content_length, f.metadata, f.occurred_at - * Can use window functions like PERCENT_RANK() - * User can override this per-connection via connections.scoring_formula - */ - readonly defaultScoringFormula: string; - - /** - * Pull new content from platform - */ - pull( - options: FeedOptions, - checkpoint: Checkpoint | null, - env: Env, - sessionState?: SessionState | null, - updateCheckpointFn?: (checkpoint: Checkpoint) => Promise - ): Promise; - - /** - * Validate feed options before saving to database - */ - validateOptions(options: FeedOptions): string | null; - - /** - * Get rate limit information for this platform - */ - getRateLimit(): { - requests_per_minute: number; - requests_per_hour?: number; - recommended_interval_ms: number; - }; - - /** - * Search platform for entities - * Optional method - not all platforms may support search - */ - search?(searchTerm: string, env: Env): Promise; - - /** - * Generate a URL for the connection from options - */ - urlFromOptions(options: FeedOptions): string; - - /** - * Generate a human-readable display label from options - */ - displayLabelFromOptions(options: FeedOptions): string; - - /** - * Return parent feed definitions required to preserve hierarchy. - */ - getParentFeedDefinitions(options: FeedOptions): ParentFeedDefinition[]; - - readonly authSchema?: FeedAuthSchema; -} diff --git a/packages/core/src/__tests__/errors.test.ts b/packages/core/src/__tests__/errors.test.ts index 83ef9ea49..d39a78c72 100644 --- a/packages/core/src/__tests__/errors.test.ts +++ b/packages/core/src/__tests__/errors.test.ts @@ -4,8 +4,6 @@ import { ConfigError, ErrorCode, OrchestratorError, - PlatformError, - SessionError, WorkerError, WorkspaceError, } from "../errors"; @@ -106,46 +104,6 @@ describe("WorkspaceError", () => { }); }); -describe("PlatformError", () => { - test("stores platform and operation", () => { - const err = new PlatformError("slack", "send", "rate limited"); - expect(err.name).toBe("PlatformError"); - expect(err.platform).toBe("slack"); - expect(err.operation).toBe("send"); - expect(err.message).toBe("rate limited"); - }); - - test("toJSON includes platform alongside base fields", () => { - const cause = new Error("429"); - const err = new PlatformError("slack", "send", "rate limited", cause); - const json = err.toJSON(); - expect(json.platform).toBe("slack"); - expect(json.name).toBe("PlatformError"); - expect(json.message).toBe("rate limited"); - expect(json.operation).toBe("send"); - expect(json.cause).toBe("429"); - }); -}); - -describe("SessionError", () => { - test("stores sessionKey and code", () => { - const err = new SessionError("sess-1", "EXPIRED", "session expired"); - expect(err.name).toBe("SessionError"); - expect(err.sessionKey).toBe("sess-1"); - expect(err.code).toBe("EXPIRED"); - expect(err.message).toBe("session expired"); - }); - - test("toJSON includes sessionKey and code", () => { - const err = new SessionError("sess-1", "EXPIRED", "session expired"); - const json = err.toJSON(); - expect(json.sessionKey).toBe("sess-1"); - expect(json.code).toBe("EXPIRED"); - expect(json.name).toBe("SessionError"); - expect(json.message).toBe("session expired"); - }); -}); - describe("OrchestratorError", () => { test("stores code, details, shouldRetry default false", () => { const err = new OrchestratorError( diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 15a821426..ecc6671e7 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -71,54 +71,6 @@ export class WorkspaceError extends BaseError { } } -/** - * Error class for platform-related operations (Slack, WhatsApp, etc.) - */ -export class PlatformError extends BaseError { - override readonly name = "PlatformError"; - - constructor( - public platform: string, - operation: string, - message: string, - cause?: Error - ) { - super(message, cause); - this.operation = operation; - } - - override toJSON(): Record { - return { - ...super.toJSON(), - platform: this.platform, - }; - } -} - -/** - * Error class for session-related operations - */ -export class SessionError extends BaseError { - readonly name = "SessionError"; - - constructor( - public sessionKey: string, - public code: string, - message: string, - cause?: Error - ) { - super(message, cause); - } - - toJSON(): Record { - return { - ...super.toJSON(), - sessionKey: this.sessionKey, - code: this.code, - }; - } -} - // ErrorCode enum for orchestration operations export enum ErrorCode { DATABASE_CONNECTION_FAILED = "DATABASE_CONNECTION_FAILED", diff --git a/packages/core/src/utils/json.ts b/packages/core/src/utils/json.ts index 0e8e94618..e63dbdf0e 100644 --- a/packages/core/src/utils/json.ts +++ b/packages/core/src/utils/json.ts @@ -21,21 +21,6 @@ export function safeJsonParse( } } -/** - * Safely stringify value to JSON - * Returns null on stringify failure instead of throwing - */ -export function safeJsonStringify(value: unknown): string | null { - try { - return JSON.stringify(value); - } catch (error) { - logger.error("JSON stringify failed", { - error: error instanceof Error ? error.message : String(error), - }); - return null; - } -} - /** * Stringify a value to JSON, converting bigint values to numbers (when safe) * or strings. Use this when serializing query results that may contain bigint columns. diff --git a/packages/openclaw-plugin/openclaw.plugin.json b/packages/openclaw-plugin/openclaw.plugin.json index d0d5271cd..fc14d9e88 100644 --- a/packages/openclaw-plugin/openclaw.plugin.json +++ b/packages/openclaw-plugin/openclaw.plugin.json @@ -14,14 +14,7 @@ "lobu_search_sdk", "lobu_query_sdk", "lobu_query_sql", - "lobu_run_sdk", - "wiki_status", - "wiki_search", - "wiki_get", - "wiki_apply", - "wiki_lint", - "memory_search", - "memory_get" + "lobu_run_sdk" ] }, "configSchema": { @@ -67,30 +60,6 @@ "type": "boolean", "default": true, "description": "When true, captures conversation observations as long-term memories after each agent session." - }, - "memoryWikiCompat": { - "description": "Spike/compat mode: register OpenClaw memory-wiki style tools (wiki_status, wiki_search, wiki_get, wiki_apply, wiki_lint) backed by existing Lobu MCP tools. Does not write a separate wiki vault.", - "oneOf": [ - { "type": "boolean" }, - { - "type": "object", - "additionalProperties": false, - "properties": { - "enabled": { - "type": "boolean", - "default": false - }, - "fanoutTimeoutMs": { - "type": "integer", - "minimum": 1000, - "maximum": 90000, - "default": 30000, - "description": "Per-fanout timeout (ms) for SDK-backed wiki calls. Slow paths are dropped and the tool returns partial results with a degraded marker. Defaults to 30000." - } - } - } - ], - "default": false } } }, diff --git a/packages/openclaw-plugin/src/index.ts b/packages/openclaw-plugin/src/index.ts index 61ffa8b9c..7922ca1b3 100644 --- a/packages/openclaw-plugin/src/index.ts +++ b/packages/openclaw-plugin/src/index.ts @@ -11,7 +11,6 @@ import { dirname, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; import { promisify } from 'node:util'; import { renderFallbackSystemContext } from './lobu-guidance.js'; -import { registerMemoryWikiCompatTools } from './memory-wiki-compat.js'; import type { McpToolDefinition, McpToolResponse, @@ -271,35 +270,6 @@ function asPositiveInt(value: unknown, fallback: number): number { return n > 0 ? n : fallback; } -const DEFAULT_WIKI_FANOUT_TIMEOUT_MS = 30_000; -const MIN_WIKI_FANOUT_TIMEOUT_MS = 1_000; -const MAX_WIKI_FANOUT_TIMEOUT_MS = 90_000; - -function clampFanoutTimeoutMs(value: unknown): number { - if (typeof value !== 'number' || !Number.isFinite(value)) { - return DEFAULT_WIKI_FANOUT_TIMEOUT_MS; - } - if (value < MIN_WIKI_FANOUT_TIMEOUT_MS) return MIN_WIKI_FANOUT_TIMEOUT_MS; - if (value > MAX_WIKI_FANOUT_TIMEOUT_MS) return MAX_WIKI_FANOUT_TIMEOUT_MS; - return Math.floor(value); -} - -function resolveMemoryWikiCompatConfig(value: unknown): { - enabled: boolean; - fanoutTimeoutMs: number; -} { - if (typeof value === 'boolean') { - return { enabled: value, fanoutTimeoutMs: DEFAULT_WIKI_FANOUT_TIMEOUT_MS }; - } - if (isRecord(value)) { - return { - enabled: asBoolean(value.enabled, false), - fanoutTimeoutMs: clampFanoutTimeoutMs(value.fanoutTimeoutMs), - }; - } - return { enabled: false, fanoutTimeoutMs: DEFAULT_WIKI_FANOUT_TIMEOUT_MS }; -} - function getLogger(api: Record): PluginLogger { const logger = api.logger; if ( @@ -381,7 +351,6 @@ function resolvePluginConfig(api: Record, pluginId: string): Re autoRecall: asBoolean(cfg.autoRecall, true), autoCapture: asBoolean(cfg.autoCapture, true), recallLimit: asPositiveInt(cfg.recallLimit, DEFAULT_RECALL_LIMIT), - memoryWikiCompat: resolveMemoryWikiCompatConfig(cfg.memoryWikiCompat), }; } @@ -1310,10 +1279,6 @@ const plugin = { registerMcpTools(config, registerTool, log); } - if (registerTool && config.memoryWikiCompat.enabled) { - registerMemoryWikiCompatTools(config, registerTool, log, callMcpTool); - } - // Inject workspace instructions (dynamic from server) or fallback (static). // When autoRecall is enabled, also inject recalled memories. { diff --git a/packages/openclaw-plugin/src/memory-wiki-compat.ts b/packages/openclaw-plugin/src/memory-wiki-compat.ts deleted file mode 100644 index c0235e438..000000000 --- a/packages/openclaw-plugin/src/memory-wiki-compat.ts +++ /dev/null @@ -1,960 +0,0 @@ -import type { McpToolResponse, ResolvedPluginConfig } from './types.js'; - -type PluginLogger = { - info(message: string, ...args: unknown[]): void; - warn(message: string, ...args: unknown[]): void; - error(message: string, ...args: unknown[]): void; - debug?(message: string, ...args: unknown[]): void; -}; - -type McpToolCallOptions = { rawJson?: boolean; signal?: AbortSignal }; - -type McpToolCaller = ( - config: ResolvedPluginConfig, - toolName: string, - args: Record, - options?: McpToolCallOptions -) => Promise; - -function isRecord(value: unknown): value is Record { - return typeof value === 'object' && value !== null; -} - -type FanoutOutcome = - | { ok: true; value: T; durationMs: number } - | { ok: false; reason: 'timeout' | 'error'; error: string; durationMs: number }; - -/** - * Race a fanout part against a wall-clock timeout. Returns a tagged result so - * callers can return partial responses (with a `degraded`/`timeouts` marker) - * instead of letting one slow upstream block the whole wiki tool response. - * - * If the underlying MCP caller supports AbortSignal, the timeout also aborts - * the in-flight request so CLI harnesses and long-lived plugin hosts don't - * accumulate orphaned HTTP calls after returning a degraded partial result. - */ -async function raceFanout( - label: string, - start: (signal: AbortSignal) => Promise, - timeoutMs: number -): Promise> { - // Defensive: callers occasionally pass an unresolved or non-finite timeout - // (e.g. a third-party config that didn't go through `resolveMemoryWikiCompatConfig`). - // Fall back to a safe default rather than letting `setTimeout(undefined)` fire immediately. - const safeTimeoutMs = - typeof timeoutMs === 'number' && Number.isFinite(timeoutMs) && timeoutMs >= 1 - ? Math.floor(timeoutMs) - : 30_000; - const started = Date.now(); - const controller = new AbortController(); - let timer: ReturnType | null = null; - const timeoutPromise = new Promise>((resolve) => { - timer = setTimeout(() => { - controller.abort(`${label} timed out after ${safeTimeoutMs}ms`); - resolve({ - ok: false, - reason: 'timeout', - error: `${label} timed out after ${safeTimeoutMs}ms`, - durationMs: Date.now() - started, - }); - }, safeTimeoutMs); - }); - const settled = start(controller.signal).then, FanoutOutcome>( - (value) => ({ ok: true, value, durationMs: Date.now() - started }), - (error) => ({ - ok: false, - reason: controller.signal.aborted ? 'timeout' : 'error', - error: - error instanceof Error - ? error.message - : typeof controller.signal.reason === 'string' - ? controller.signal.reason - : String(error), - durationMs: Date.now() - started, - }) - ); - try { - return await Promise.race([settled, timeoutPromise]); - } finally { - if (timer) clearTimeout(timer); - } -} - -function asString(value: unknown): string | null { - return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null; -} - -function extractTextFromContent(content: Array<{ type: string; text: string }>): string { - return content - .filter((c) => c.type === 'text' && typeof c.text === 'string') - .map((c) => c.text) - .join('\n'); -} - -function parseJsonText(text: string): T | null { - const trimmed = text.trim(); - if (!trimmed) return null; - try { - return JSON.parse(trimmed) as T; - } catch { - return null; - } -} - -async function callMcpToolJson( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - toolName: string, - args: Record, - options?: Pick -): Promise { - const result = await callMcpTool(config, toolName, args, { rawJson: true, signal: options?.signal }); - if (!result) return null; - if (result.isError) { - throw new Error(`MCP tool ${toolName} returned error: ${extractTextFromContent(result.content).slice(0, 240)}`); - } - return parseJsonText(extractTextFromContent(result.content)); -} - -/** - * Execute a TypeScript snippet via the MCP `query_sdk` (read-only) or `run_sdk` (writes) - * tool. INTERNAL_REST_TOOLS like list_watchers / read_knowledge / manage_watchers are not - * exposed on the MCP wire (their `internal: true` flag hides them from tools/list); the - * agent-facing path for these capabilities is to script over the ClientSDK. - */ -function stripMarkdownJsonFence(text: string): string { - const fenced = text.match(/^\s*```(?:json)?\s*\n([\s\S]*?)\n```\s*$/i); - return fenced ? fenced[1] : text; -} - -async function runSdkScript( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - mode: 'read' | 'write', - script: string, - options?: Pick -): Promise { - const toolName = mode === 'read' ? 'query_sdk' : 'run_sdk'; - const raw = await callMcpTool(config, toolName, { script }, { rawJson: true, signal: options?.signal }); - if (!raw) return null; - if (raw.isError) { - throw new Error(`MCP tool ${toolName} returned error: ${extractTextFromContent(raw.content).slice(0, 240)}`); - } - // The sandbox tool returns its envelope wrapped in a ```json fenced block. - const text = stripMarkdownJsonFence(extractTextFromContent(raw.content)); - const envelope = parseJsonText<{ - success?: boolean; - error?: { name?: string; message?: string }; - return_value?: T; - }>(text); - if (envelope == null) return null; - if (envelope.success === false) { - const name = envelope.error?.name ?? 'SdkScriptError'; - const message = envelope.error?.message ?? 'sandbox script failed'; - throw new Error(`${toolName} reported ${name}: ${message}`); - } - // Real query_sdk wraps the script return in `return_value` (which may - // legitimately be `null` when the script returns null); tests pass the - // value directly in the response and rely on the fallback when the key - // isn't present at all. - if (Object.prototype.hasOwnProperty.call(envelope, 'return_value')) { - return (envelope.return_value ?? null) as T; - } - return envelope as unknown as T; -} - -type WikiCorpus = 'memory' | 'wiki' | 'all'; - -type WikiSearchResult = { - corpus: 'memory' | 'wiki'; - path: string; - title: string; - kind: 'memory' | 'source' | 'claim' | 'synthesis' | 'report' | 'watcher'; - score: number; - snippet: string; - id?: string | number; - source_url?: string | null; - details?: Record; -}; - -const WikiCorpusSchema = { - type: 'string', - enum: ['memory', 'wiki', 'all'], -}; - -const WikiSearchSchema = { - type: 'object', - additionalProperties: false, - properties: { - query: { type: 'string', minLength: 1 }, - maxResults: { type: 'number', minimum: 1 }, - corpus: WikiCorpusSchema, - backend: { type: 'string', enum: ['shared', 'local'] }, - mode: { - type: 'string', - enum: ['auto', 'find-person', 'route-question', 'source-evidence', 'raw-claim'], - }, - }, - required: ['query'], -}; - -const MemorySearchSchema = { - type: 'object', - additionalProperties: false, - properties: { - query: { type: 'string', minLength: 1 }, - maxResults: { type: 'number', minimum: 1 }, - }, - required: ['query'], -}; - -const WikiGetSchema = { - type: 'object', - additionalProperties: false, - properties: { - lookup: { type: 'string', minLength: 1 }, - fromLine: { type: 'number', minimum: 1 }, - lineCount: { type: 'number', minimum: 1 }, - corpus: WikiCorpusSchema, - backend: { type: 'string', enum: ['shared', 'local'] }, - }, - required: ['lookup'], -}; - -const WikiApplySchema = { - type: 'object', - additionalProperties: true, - properties: { - op: { type: 'string', enum: ['create_synthesis', 'update_metadata'] }, - title: { type: 'string' }, - body: { type: 'string' }, - lookup: { type: 'string' }, - sourceIds: { type: 'array', items: { type: 'string' } }, - claims: { type: 'array', items: { type: 'object', additionalProperties: true } }, - confidence: { type: ['number', 'null'], minimum: 0, maximum: 1 }, - status: { type: 'string' }, - watcher_id: { type: ['string', 'number'] }, - window_id: { type: 'number' }, - corrections: { type: 'array', items: { type: 'object', additionalProperties: true } }, - metadata: { type: 'object', additionalProperties: true }, - }, - required: ['op'], -}; - -function normalizeCorpus(value: unknown): WikiCorpus { - return value === 'memory' || value === 'wiki' || value === 'all' ? value : 'wiki'; -} - -function readPositiveNumber(value: unknown, fallback: number, max: number): number { - if (typeof value !== 'number' || !Number.isFinite(value)) return fallback; - return Math.min(Math.max(1, Math.floor(value)), max); -} - -function stringifyResult(value: unknown): string { - return typeof value === 'string' ? value : JSON.stringify(value, null, 2); -} - -function asObjectArray(value: unknown): Array> { - return Array.isArray(value) ? value.filter(isRecord) : []; -} - -function pickSnippet(value: unknown, maxLength = 280): string { - const text = typeof value === 'string' ? value : value == null ? '' : stringifyResult(value); - const compact = text.replace(/\s+/g, ' ').trim(); - return compact.length > maxLength ? `${compact.slice(0, maxLength - 1)}…` : compact; -} - -function matchesQuery(row: Record, query: string): boolean { - const lower = query.toLowerCase(); - return ['name', 'watcher_name', 'slug', 'description', 'title', 'summary'] - .map((key) => row[key]) - .some((value) => typeof value === 'string' && value.toLowerCase().includes(lower)); -} - -function memoryResultsFromSearch(raw: unknown, maxResults: number): WikiSearchResult[] { - if (!isRecord(raw)) return []; - const results: WikiSearchResult[] = []; - for (const item of asObjectArray(raw.content).slice(0, maxResults)) { - const id = item.id; - const title = - typeof item.title === 'string' && item.title.trim() - ? item.title.trim() - : `Memory item ${String(id ?? results.length + 1)}`; - results.push({ - corpus: 'memory', - path: id != null ? `sources/events/${String(id)}` : `sources/memory/${results.length + 1}`, - title, - kind: 'source', - score: typeof item.similarity === 'number' ? item.similarity : 0.5, - snippet: pickSnippet(item.text_content ?? item.payload_text ?? item.content ?? item.metadata), - id: typeof id === 'number' || typeof id === 'string' ? id : undefined, - source_url: typeof item.source_url === 'string' ? item.source_url : null, - details: { source: 'search_memory.content' }, - }); - } - for (const item of asObjectArray(raw.matches).slice(0, Math.max(0, maxResults - results.length))) { - const id = item.id; - const name = typeof item.name === 'string' ? item.name : `Entity ${String(id ?? '')}`.trim(); - results.push({ - corpus: 'memory', - path: id != null ? `entities/${String(id)}` : `entities/${name}`, - title: name, - kind: 'memory', - score: typeof item.match_score === 'number' ? item.match_score : 0.4, - snippet: pickSnippet(item.metadata ?? item.content ?? item.match_reason), - id: typeof id === 'number' || typeof id === 'string' ? id : undefined, - details: { source: 'search_memory.matches', entity_type: item.type ?? item.entity_type }, - }); - } - return results.slice(0, maxResults); -} - -async function searchMemoryCorpus( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - query: string, - maxResults: number, - options?: Pick -): Promise { - const raw = await callMcpToolJson(callMcpTool, config, 'search_memory', { - query, - include_content: true, - content_limit: maxResults, - include_connections: false, - limit: maxResults, - }, options); - return memoryResultsFromSearch(raw, maxResults); -} - -function watcherResultsFromList(raw: unknown, query: string, maxResults: number): WikiSearchResult[] { - const watchers = isRecord(raw) ? asObjectArray(raw.watchers) : []; - const scored = watchers - .map((watcher) => { - const watcherId = watcher.watcher_id ?? watcher.id; - const title = - typeof watcher.watcher_name === 'string' - ? watcher.watcher_name - : typeof watcher.name === 'string' - ? watcher.name - : `Watcher ${String(watcherId ?? '')}`.trim(); - const haystackMatch = matchesQuery({ ...watcher, title }, query); - const pending = typeof watcher.pending_content_count === 'number' ? watcher.pending_content_count : 0; - const historical = - typeof watcher.historical_content_count === 'number' ? watcher.historical_content_count : 0; - return { - result: { - corpus: 'wiki' as const, - path: watcherId != null ? `reports/watchers/${String(watcherId)}` : `reports/watchers/${title}`, - title, - kind: 'report' as const, - score: haystackMatch ? 0.8 : historical > 0 ? 0.35 : 0.2, - snippet: pickSnippet( - watcher.description ?? - watcher.slug ?? - `${historical} historical items, ${pending} pending analysis` - ), - id: typeof watcherId === 'number' || typeof watcherId === 'string' ? watcherId : undefined, - source_url: typeof watcher.view_url === 'string' ? watcher.view_url : null, - details: { - source: 'client.watchers.list', - pending_content_count: pending, - historical_content_count: historical, - }, - }, - include: haystackMatch || query.trim().length === 0, - }; - }) - .filter((entry) => entry.include) - .map((entry) => entry.result) - .sort((a, b) => b.score - a.score); - return scored.slice(0, maxResults); -} - -function contentResultsFromReadKnowledge( - raw: unknown, - semanticType: 'claim' | 'synthesis', - maxResults: number -): WikiSearchResult[] { - if (!isRecord(raw)) return []; - return asObjectArray(raw.content) - .slice(0, maxResults) - .map((item, index) => { - const id = item.id; - const title = - typeof item.title === 'string' && item.title.trim() - ? item.title.trim() - : `${semanticType[0].toUpperCase()}${semanticType.slice(1)} ${String(id ?? index + 1)}`; - return { - corpus: 'wiki' as const, - path: - semanticType === 'claim' - ? `claims/${String(id ?? index + 1)}` - : `syntheses/${String(id ?? index + 1)}`, - title, - kind: semanticType, - score: 0.7, - snippet: pickSnippet(item.text_content ?? item.payload_text ?? item.content ?? item.metadata), - id: typeof id === 'number' || typeof id === 'string' ? id : undefined, - source_url: typeof item.source_url === 'string' ? item.source_url : null, - details: { source: 'client.knowledge.read', semantic_type: semanticType }, - }; - }); -} - -async function searchWikiCorpus( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - query: string, - maxResults: number, - options?: Pick -): Promise { - const includeContent = query.length >= 3; - const script = ` -export default async (_ctx, client) => { - const query = ${JSON.stringify(query)}; - const limit = ${maxResults}; - const includeContent = ${includeContent}; - const [watchers, claims, syntheses] = await Promise.all([ - client.watchers.list({ include_details: false }).catch((e) => ({ error: String(e) })), - includeContent - ? client.knowledge.read({ query, semantic_type: 'claim', limit }).catch((e) => ({ error: String(e) })) - : null, - includeContent - ? client.knowledge.read({ query, semantic_type: 'synthesis', limit }).catch((e) => ({ error: String(e) })) - : null, - ]); - return { watchers, claims, syntheses }; -}; -`; - const sdkResult = await runSdkScript<{ - watchers: unknown; - claims: unknown; - syntheses: unknown; - }>(callMcpTool, config, 'read', script, options); - if (!sdkResult) return []; - return [ - ...contentResultsFromReadKnowledge(sdkResult.claims, 'claim', maxResults), - ...contentResultsFromReadKnowledge(sdkResult.syntheses, 'synthesis', maxResults), - ...watcherResultsFromList(sdkResult.watchers, query, maxResults), - ] - .sort((a, b) => b.score - a.score) - .slice(0, maxResults); -} - -function formatWikiSearchResults( - query: string, - corpus: WikiCorpus, - results: WikiSearchResult[], - meta: { degraded: boolean; timeouts: string[] } = { degraded: false, timeouts: [] } -): string { - const degradedSuffix = meta.degraded - ? `\nWarning: partial results — fanout ${meta.timeouts.length ? `timed out on [${meta.timeouts.join(', ')}]` : 'reported errors'}.` - : ''; - if (results.length === 0) { - return `No Lobu wiki compatibility results for ${JSON.stringify(query)} in corpus=${corpus}.${degradedSuffix}`; - } - return ( - [ - `Lobu memory-wiki compatibility search for ${JSON.stringify(query)} (corpus=${corpus}).`, - ...results.map((result, index) => { - const source = result.source_url ? ` — ${result.source_url}` : ''; - return `${index + 1}. ${result.title} (${result.corpus}/${result.kind}, path=${result.path}, score=${result.score.toFixed(2)})${source}\n ${result.snippet}`; - }), - ].join('\n') + degradedSuffix - ); -} - -async function runWikiSearch( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - args: Record -): Promise<{ text: string; details: Record }> { - const query = asString(args.query) ?? ''; - const corpus = normalizeCorpus(args.corpus); - const maxResults = readPositiveNumber(args.maxResults, 8, 25); - const timeoutMs = config.memoryWikiCompat.fanoutTimeoutMs; - const wantMemory = corpus === 'memory' || corpus === 'all'; - const wantWiki = corpus === 'wiki' || corpus === 'all'; - const [memoryOutcome, wikiOutcome] = await Promise.all([ - wantMemory - ? raceFanout( - 'wiki_search:memory', - (signal) => searchMemoryCorpus(callMcpTool, config, query, maxResults, { signal }), - timeoutMs - ) - : Promise.resolve>({ ok: true, value: [], durationMs: 0 }), - wantWiki - ? raceFanout( - 'wiki_search:wiki', - (signal) => searchWikiCorpus(callMcpTool, config, query, maxResults, { signal }), - timeoutMs - ) - : Promise.resolve>({ ok: true, value: [], durationMs: 0 }), - ]); - const memory = memoryOutcome.ok ? memoryOutcome.value : []; - const wiki = wikiOutcome.ok ? wikiOutcome.value : []; - const results = [...wiki, ...memory].sort((a, b) => b.score - a.score).slice(0, maxResults); - const timeouts: string[] = []; - const fanoutErrors: Array<{ part: string; reason: string; error: string }> = []; - if (wantMemory && !memoryOutcome.ok) { - if (memoryOutcome.reason === 'timeout') timeouts.push('memory'); - fanoutErrors.push({ part: 'memory', reason: memoryOutcome.reason, error: memoryOutcome.error }); - } - if (wantWiki && !wikiOutcome.ok) { - if (wikiOutcome.reason === 'timeout') timeouts.push('wiki'); - fanoutErrors.push({ part: 'wiki', reason: wikiOutcome.reason, error: wikiOutcome.error }); - } - const degraded = fanoutErrors.length > 0; - return { - text: formatWikiSearchResults(query, corpus, results, { degraded, timeouts }), - details: { query, corpus, results, degraded, timeouts, fanout_errors: fanoutErrors }, - }; -} - -function parseWikiLookup(lookup: string): { kind: string; id?: number; raw: string } { - const trimmed = lookup.trim(); - const prefixed = trimmed.match(/^(event|content|source|watcher|window|claim|synthesis):(.+)$/i); - if (prefixed) { - const id = Number(prefixed[2]); - return { kind: prefixed[1].toLowerCase(), id: Number.isFinite(id) ? id : undefined, raw: trimmed }; - } - const pathMatch = trimmed.match(/(?:events|sources\/events|watchers|reports\/watchers|windows|claims|syntheses)\/(\d+)/i); - if (pathMatch) { - const lower = trimmed.toLowerCase(); - const kind = lower.includes('watcher') ? 'watcher' : lower.includes('window') ? 'window' : 'event'; - return { kind, id: Number(pathMatch[1]), raw: trimmed }; - } - const numeric = Number(trimmed); - return { kind: Number.isFinite(numeric) ? 'event' : 'query', id: Number.isFinite(numeric) ? numeric : undefined, raw: trimmed }; -} - -async function runWikiGet( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - args: Record -): Promise<{ text: string; details: Record }> { - const lookup = asString(args.lookup) ?? ''; - const lineCount = readPositiveNumber(args.lineCount, 80, 500); - const parsed = parseWikiLookup(lookup); - let script = ''; - let sourceTool = 'client.knowledge.read'; - - if (parsed.kind === 'watcher' && parsed.id != null) { - sourceTool = 'client.watchers.get'; - script = `export default async (_ctx, client) => client.watchers.get(${JSON.stringify(String(parsed.id))});`; - } else if (parsed.kind === 'window' && parsed.id != null) { - script = `export default async (_ctx, client) => client.knowledge.read({ window_id: ${parsed.id}, limit: ${lineCount} });`; - } else if (parsed.id != null) { - script = `export default async (_ctx, client) => client.knowledge.read({ content_ids: [${parsed.id}], limit: ${lineCount} });`; - } else { - script = `export default async (_ctx, client) => client.knowledge.read({ query: ${JSON.stringify(lookup)}, limit: 1 });`; - } - - const outcome = await raceFanout( - 'wiki_get', - (signal) => runSdkScript(callMcpTool, config, 'read', script, { signal }), - config.memoryWikiCompat.fanoutTimeoutMs - ); - if (!outcome.ok) { - const reason = outcome.reason === 'timeout' ? 'timeout' : 'error'; - return { - text: `Lobu memory-wiki compatibility get (${sourceTool}, lookup=${lookup}) — ${reason}: ${outcome.error}`, - details: { - lookup, - parsed, - sourceTool, - result: null, - degraded: true, - timeout: outcome.reason === 'timeout', - error: outcome.error, - }, - }; - } - const raw = outcome.value; - const text = stringifyResult(raw ?? { message: `No result for ${lookup}` }); - return { - text: `Lobu memory-wiki compatibility get (${sourceTool}, lookup=${lookup})\n\n${text}`, - details: { lookup, parsed, sourceTool, result: raw }, - }; -} - -async function runWikiApply( - callMcpTool: McpToolCaller, - config: ResolvedPluginConfig, - args: Record -): Promise<{ text: string; details: Record }> { - const op = asString(args.op); - if (op === 'create_synthesis') { - const title = asString(args.title) ?? 'Untitled synthesis'; - const body = asString(args.body) ?? ''; - const metadata = isRecord(args.metadata) ? args.metadata : {}; - const result = await callMcpToolJson(callMcpTool, config, 'save_memory', { - title, - content: body || title, - semantic_type: 'synthesis', - payload_type: body ? 'markdown' : 'text', - metadata: { - ...metadata, - memory_wiki_compat: true, - source_ids: Array.isArray(args.sourceIds) ? args.sourceIds : [], - claims: Array.isArray(args.claims) ? args.claims : [], - confidence: typeof args.confidence === 'number' ? args.confidence : undefined, - status: asString(args.status) ?? 'active', - }, - }); - return { - text: `Created Lobu-backed synthesis via save_memory.\n\n${stringifyResult(result)}`, - details: { op, sourceTool: 'save_memory', result }, - }; - } - - if (op === 'update_metadata') { - const watcherId = args.watcher_id ?? args.watcherId; - const windowId = args.window_id ?? args.windowId; - const corrections = Array.isArray(args.corrections) ? args.corrections : null; - const looksLikeWatcherFeedback = - (typeof watcherId === 'string' || typeof watcherId === 'number') && - typeof windowId === 'number' && - corrections !== null; - if (looksLikeWatcherFeedback) { - const invalid = corrections!.filter( - (c): c is Record => - !isRecord(c) || typeof c.field_path !== 'string' || c.field_path.trim().length === 0 - ); - if (corrections!.length === 0) { - throw new Error('wiki_apply update_metadata: corrections must be a non-empty array'); - } - if (invalid.length > 0) { - throw new Error( - `wiki_apply update_metadata: ${invalid.length} correction(s) missing required "field_path" string` - ); - } - const script = `export default async (_ctx, client) => client.watchers.submitFeedback(${JSON.stringify({ - watcher_id: String(watcherId), - window_id: windowId, - corrections, - })});`; - const result = await runSdkScript(callMcpTool, config, 'write', script); - return { - text: `Submitted watcher feedback via client.watchers.submitFeedback.\n\n${stringifyResult(result)}`, - details: { op, sourceTool: 'client.watchers.submitFeedback', result }, - }; - } - - const result = await callMcpToolJson(callMcpTool, config, 'save_memory', { - title: asString(args.title) ?? `Wiki metadata update: ${asString(args.lookup) ?? 'unknown'}`, - content: asString(args.body) ?? stringifyResult(args), - semantic_type: 'claim', - metadata: { - memory_wiki_compat: true, - op, - lookup: asString(args.lookup), - status: asString(args.status) ?? 'review', - confidence: typeof args.confidence === 'number' ? args.confidence : undefined, - source_ids: Array.isArray(args.sourceIds) ? args.sourceIds : [], - claims: Array.isArray(args.claims) ? args.claims : [], - }, - }); - return { - text: `Stored metadata update as a Lobu-backed claim event.\n\n${stringifyResult(result)}`, - details: { op, sourceTool: 'save_memory', result }, - }; - } - - throw new Error('wiki_apply supports op="create_synthesis" and op="update_metadata" in Lobu compat mode.'); -} - -type SessionCall = { - tool: string; - at: number; - argsSummary: Record; - outcome: Record; -}; - -const SESSION_BUFFER_CAP = 32; - -// Agent tools registered by registerMemoryWikiCompatTools() when memoryWikiCompat -// is enabled. OpenClaw 2026.5.x requires these to appear in `contracts.tools` in -// openclaw.plugin.json — a unit test enforces the manifest stays in sync. -export const MEMORY_WIKI_COMPAT_TOOL_NAMES = [ - 'wiki_status', - 'wiki_search', - 'wiki_get', - 'wiki_apply', - 'wiki_lint', - 'memory_search', - 'memory_get', -] as const; - -export function registerMemoryWikiCompatTools( - config: ResolvedPluginConfig, - registerTool: (def: Record) => void, - log: PluginLogger, - callMcpTool: McpToolCaller -): void { - if (!config.memoryWikiCompat.enabled) return; - if (!config.mcpUrl) { - log.warn('lobu: memoryWikiCompat enabled but mcpUrl is missing; wiki_* tools not registered'); - return; - } - - const sessionCalls: SessionCall[] = []; - const recordCall = ( - tool: string, - argsSummary: Record, - outcome: Record - ): void => { - sessionCalls.push({ tool, at: Date.now(), argsSummary, outcome }); - if (sessionCalls.length > SESSION_BUFFER_CAP) { - sessionCalls.splice(0, sessionCalls.length - SESSION_BUFFER_CAP); - } - }; - - const registerTextTool = ( - name: string, - label: string, - description: string, - parameters: Record, - execute: (args: Record) => Promise<{ text: string; details: Record }> - ) => { - registerTool({ - name, - label, - description, - parameters, - execute: async (_id: string, args: Record) => { - const result = await execute(args ?? {}); - return { content: [{ type: 'text', text: result.text }], details: result.details }; - }, - }); - }; - - registerTextTool( - 'wiki_status', - 'Wiki Status', - 'Inspect Lobu-backed OpenClaw memory-wiki compatibility status. This is a compatibility layer, not a separate wiki source of truth.', - { type: 'object', additionalProperties: false, properties: {} }, - async () => { - const watcherCountScript = `export default async (_ctx, client) => { - const r = await client.watchers.list({ include_details: false }).catch((e) => ({ error: String(e) })); - return r; -};`; - const timeoutMs = config.memoryWikiCompat.fanoutTimeoutMs; - const [watchersOutcome, memoryOutcome] = await Promise.all([ - raceFanout( - 'wiki_status:watchers', - (signal) => runSdkScript(callMcpTool, config, 'read', watcherCountScript, { signal }), - timeoutMs - ), - raceFanout( - 'wiki_status:memory', - (signal) => callMcpToolJson(callMcpTool, config, 'search_memory', { - query: 'memory wiki compatibility status', - include_content: false, - include_connections: false, - limit: 1, - }, { signal }), - timeoutMs - ), - ]); - const watchers = watchersOutcome.ok ? watchersOutcome.value : null; - const watcherCount = - isRecord(watchers) && Array.isArray((watchers as { watchers?: unknown }).watchers) - ? ((watchers as { watchers: unknown[] }).watchers.length) - : null; - const timeouts: string[] = []; - const fanoutErrors: Array<{ part: string; reason: string; error: string }> = []; - if (!watchersOutcome.ok) { - if (watchersOutcome.reason === 'timeout') timeouts.push('watchers'); - fanoutErrors.push({ part: 'watchers', reason: watchersOutcome.reason, error: watchersOutcome.error }); - } - if (!memoryOutcome.ok) { - if (memoryOutcome.reason === 'timeout') timeouts.push('memory'); - fanoutErrors.push({ part: 'memory', reason: memoryOutcome.reason, error: memoryOutcome.error }); - } - const status = { - mode: 'lobu-memory-wiki-compat', - source_of_truth: 'lobu-memory-mcp', - corpus: ['memory', 'wiki', 'all'], - tools: ['wiki_status', 'wiki_search', 'wiki_get', 'wiki_apply', 'wiki_lint', 'memory_search', 'memory_get'], - watcher_count: watcherCount, - memory_available: memoryOutcome.ok, - degraded: !watchersOutcome.ok || !memoryOutcome.ok, - timeouts, - fanout_errors: fanoutErrors, - fanout_timeout_ms: timeoutMs, - notes: [ - 'corpus is memory|wiki|all within the authenticated Lobu org; it is not an org selector.', - 'wiki corpus is a virtual projection over Lobu claims, syntheses, watchers, reports, and sources.', - 'no separate Markdown vault is written by this spike.', - ], - }; - recordCall( - 'wiki_status', - {}, - { watcherCount, memoryAvailable: status.memory_available, degraded: status.degraded } - ); - return { text: stringifyResult(status), details: status }; - } - ); - - registerTextTool( - 'wiki_search', - 'Wiki Search', - 'Search Lobu memory-wiki compatibility corpus. corpus=memory searches raw Lobu memory; corpus=wiki searches claims/syntheses/watchers; corpus=all merges both.', - WikiSearchSchema, - async (args) => { - const result = await runWikiSearch(callMcpTool, config, args); - const results = Array.isArray(result.details.results) ? result.details.results : []; - recordCall( - 'wiki_search', - { query: result.details.query, corpus: result.details.corpus }, - { resultCount: results.length, kinds: results.map((r) => (isRecord(r) ? r.kind : undefined)) } - ); - return result; - } - ); - - registerTextTool( - 'wiki_get', - 'Wiki Get', - 'Read a Lobu-backed wiki compatibility result by path or lookup, e.g. sources/events/123, reports/watchers/7, watcher:7, window:9, event:123.', - WikiGetSchema, - async (args) => { - const result = await runWikiGet(callMcpTool, config, args); - recordCall( - 'wiki_get', - { lookup: result.details.lookup, sourceTool: result.details.sourceTool }, - { hadResult: result.details.result != null } - ); - return result; - } - ); - - registerTextTool( - 'wiki_apply', - 'Wiki Apply', - 'Apply a narrow memory-wiki compatible mutation backed by Lobu MCP. Supports create_synthesis via save_memory and update_metadata via watcher feedback or claim event.', - WikiApplySchema, - async (args) => { - const result = await runWikiApply(callMcpTool, config, args); - const hasEvidence = - (Array.isArray(args.sourceIds) && args.sourceIds.length > 0) || - args.watcher_id != null || - args.watcherId != null || - args.window_id != null || - args.windowId != null || - (Array.isArray(args.claims) && args.claims.length > 0); - recordCall( - 'wiki_apply', - { - op: asString(args.op), - sourceTool: result.details.sourceTool, - status: asString(args.status), - confidence: typeof args.confidence === 'number' ? args.confidence : null, - }, - { hasEvidence } - ); - return result; - } - ); - - registerTextTool( - 'wiki_lint', - 'Wiki Lint', - 'Lint the recent memory-wiki compatibility tool calls in this session for missing evidence, missing provenance, and low-confidence active claims.', - { type: 'object', additionalProperties: false, properties: {} }, - async () => { - const conventions = { - required_provenance: ['event ids', 'watcher window ids', 'source URLs when available'], - evidence_fields: ['sourceIds', 'watcher_id', 'window_id', 'claims'], - confidence_floor_for_active: 0.5, - }; - const warnings: Array<{ tool: string; at: number; reason: string; details: Record }> = []; - for (const call of sessionCalls) { - if (call.tool === 'wiki_apply' && call.outcome.hasEvidence === false) { - warnings.push({ - tool: call.tool, - at: call.at, - reason: 'wiki_apply written with no evidence (sourceIds/watcher_id/window_id/claims all empty)', - details: call.argsSummary, - }); - } - if ( - call.tool === 'wiki_apply' && - call.argsSummary.status === 'active' && - typeof call.argsSummary.confidence === 'number' && - call.argsSummary.confidence < conventions.confidence_floor_for_active - ) { - warnings.push({ - tool: call.tool, - at: call.at, - reason: `wiki_apply status=active with confidence ${call.argsSummary.confidence} below floor ${conventions.confidence_floor_for_active}`, - details: call.argsSummary, - }); - } - if ( - (call.tool === 'wiki_search' || call.tool === 'memory_search') && - call.outcome.resultCount === 0 - ) { - warnings.push({ - tool: call.tool, - at: call.at, - reason: 'search returned zero results — agent may have queried with an off-vocabulary term', - details: call.argsSummary, - }); - } - } - const report = { - ok: warnings.length === 0, - observed_calls: sessionCalls.length, - warnings, - conventions, - }; - return { text: stringifyResult(report), details: report }; - } - ); - - registerTextTool( - 'memory_search', - 'Memory Search', - 'OpenClaw compatibility alias for Lobu search_memory. Searches Lobu memory only — use wiki_search for corpus routing across wiki/all.', - MemorySearchSchema, - async (args) => { - const query = asString(args.query) ?? ''; - const maxResults = readPositiveNumber(args.maxResults, 8, 25); - const raw = await callMcpTool(config, 'search_memory', { - query, - include_content: true, - content_limit: maxResults, - include_connections: false, - limit: maxResults, - }); - const text = raw ? extractTextFromContent(raw.content) : ''; - // resultCount is the key wiki_lint reads to flag zero-result searches; - // memory_search has no parsed structure to count, so approximate "0 - // results" as "empty text body" while keeping the same key shape as - // wiki_search. - recordCall('memory_search', { query, maxResults }, { resultCount: text.length > 0 ? 1 : 0 }); - return { text, details: { sourceTool: 'search_memory' } }; - } - ); - - registerTextTool( - 'memory_get', - 'Memory Get', - 'OpenClaw compatibility alias for Lobu read_knowledge.', - WikiGetSchema, - async (args) => { - const result = await runWikiGet(callMcpTool, config, args); - recordCall( - 'memory_get', - { lookup: result.details.lookup, sourceTool: result.details.sourceTool }, - { hadResult: result.details.result != null } - ); - return result; - } - ); - - log.info('lobu: registered memory-wiki compatibility tools'); -} diff --git a/packages/openclaw-plugin/src/types.ts b/packages/openclaw-plugin/src/types.ts index 715474a94..2685a6aad 100644 --- a/packages/openclaw-plugin/src/types.ts +++ b/packages/openclaw-plugin/src/types.ts @@ -2,20 +2,6 @@ * Plugin configuration types for the Lobu OpenClaw plugin. */ -export interface MemoryWikiCompatConfig { - /** Enable OpenClaw memory-wiki compatible tools backed by Lobu MCP primitives. */ - enabled?: boolean; - /** - * Per-fanout timeout (ms) for SDK-backed wiki tool calls (`wiki_status`, - * `wiki_search` corpus=all|wiki, `wiki_get`). When a single fanout exceeds - * this budget, the slow side is dropped and the tool returns partial results - * with a `degraded`/`timeouts` marker rather than blocking the whole call. - * Defaults to 30000 (well under Cloudflare's 100s edge timeout). Clamped to - * [1000, 90000]. - */ - fanoutTimeoutMs?: number; -} - export interface PluginConfig { mcpUrl?: string; webUrl?: string; @@ -26,8 +12,6 @@ export interface PluginConfig { autoRecall?: boolean; autoCapture?: boolean; recallLimit?: number; - /** Spike/compat mode: register wiki_* and memory_* aliases without changing Lobu MCP. */ - memoryWikiCompat?: boolean | MemoryWikiCompatConfig; } export interface ResolvedPluginConfig { @@ -40,10 +24,6 @@ export interface ResolvedPluginConfig { autoRecall: boolean; autoCapture: boolean; recallLimit: number; - memoryWikiCompat: { - enabled: boolean; - fanoutTimeoutMs: number; - }; } export interface McpToolDefinition { diff --git a/packages/openclaw-plugin/test/unit/manifest-contracts.test.ts b/packages/openclaw-plugin/test/unit/manifest-contracts.test.ts index cf3c17b88..1146111ee 100644 --- a/packages/openclaw-plugin/test/unit/manifest-contracts.test.ts +++ b/packages/openclaw-plugin/test/unit/manifest-contracts.test.ts @@ -3,7 +3,6 @@ import { dirname, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; import { describe, expect, it } from 'vitest'; import { KNOWN_MCP_TOOL_NAMES, LOGIN_TOOL_NAMES } from '../../src/index.js'; -import { MEMORY_WIKI_COMPAT_TOOL_NAMES } from '../../src/memory-wiki-compat.js'; const packageRoot = resolve(dirname(fileURLToPath(import.meta.url)), '../..'); const manifest = JSON.parse( @@ -15,7 +14,6 @@ const manifest = JSON.parse( const expectedTools = [ ...LOGIN_TOOL_NAMES, ...[...KNOWN_MCP_TOOL_NAMES].map((name) => `lobu_${name}`), - ...MEMORY_WIKI_COMPAT_TOOL_NAMES, ]; describe('openclaw.plugin.json contracts.tools', () => { diff --git a/packages/openclaw-plugin/test/unit/memory-wiki-compat.test.ts b/packages/openclaw-plugin/test/unit/memory-wiki-compat.test.ts deleted file mode 100644 index c623fc272..000000000 --- a/packages/openclaw-plugin/test/unit/memory-wiki-compat.test.ts +++ /dev/null @@ -1,515 +0,0 @@ -import { describe, it, expect, beforeEach } from 'vitest'; -import { registerMemoryWikiCompatTools } from '../../src/memory-wiki-compat.js'; -import type { McpToolResponse, ResolvedPluginConfig } from '../../src/types.js'; - -type RegisteredTool = { - name: string; - label: string; - description: string; - parameters: Record; - execute: (id: string, args: Record) => Promise }>; -}; - -type FakeCall = { tool: string; args: Record; options?: { signal?: AbortSignal } }; - -function makeConfig(overrides: Partial = {}): ResolvedPluginConfig { - const baseWikiCompat: ResolvedPluginConfig['memoryWikiCompat'] = { - enabled: true, - fanoutTimeoutMs: 30_000, - }; - const overriddenWikiCompat = overrides.memoryWikiCompat - ? { ...baseWikiCompat, ...overrides.memoryWikiCompat } - : baseWikiCompat; - return { - mcpUrl: 'http://localhost:8787/mcp', - webUrl: null, - token: 'test-token', - tokenCommand: null, - gatewayAuthUrl: null, - headers: {}, - autoRecall: false, - autoCapture: false, - recallLimit: 10, - ...overrides, - memoryWikiCompat: overriddenWikiCompat, - }; -} - -function jsonContent(payload: unknown): McpToolResponse { - return { - content: [{ type: 'text', text: JSON.stringify(payload) }], - isError: false, - }; -} - -function noopLogger() { - const logs: string[] = []; - return { - info: (m: string) => logs.push(`info:${m}`), - warn: (m: string) => logs.push(`warn:${m}`), - error: (m: string) => logs.push(`error:${m}`), - debug: (m: string) => logs.push(`debug:${m}`), - logs, - }; -} - -type Harness = { - tools: Map; - calls: FakeCall[]; - responses: Map; - setResponse(tool: string, response: McpToolResponse | null): void; - setDelay(tool: string, ms: number): void; - invoke(name: string, args?: Record): ReturnType; -}; - -function makeHarness(overrideConfig: Partial = {}): Harness { - const tools = new Map(); - const calls: FakeCall[] = []; - const responses = new Map(); - const delays = new Map(); - const registerTool = (def: Record): void => { - tools.set(def.name as string, def as unknown as RegisteredTool); - }; - const callMcpTool = async ( - _config: ResolvedPluginConfig, - toolName: string, - args: Record, - options?: { signal?: AbortSignal } - ): Promise => { - calls.push({ tool: toolName, args, options }); - const delay = delays.get(toolName); - if (delay && delay > 0) { - await new Promise((resolve, reject) => { - const timer = setTimeout(resolve, delay); - options?.signal?.addEventListener( - 'abort', - () => { - clearTimeout(timer); - reject(new Error('aborted')); - }, - { once: true } - ); - }); - } - return responses.has(toolName) ? (responses.get(toolName) ?? null) : null; - }; - registerMemoryWikiCompatTools(makeConfig(overrideConfig), registerTool, noopLogger(), callMcpTool); - return { - tools, - calls, - responses, - setResponse(tool, response) { - responses.set(tool, response); - }, - setDelay(tool, ms) { - delays.set(tool, ms); - }, - invoke(name, args = {}) { - const tool = tools.get(name); - if (!tool) throw new Error(`tool not registered: ${name}`); - return tool.execute('call-1', args); - }, - }; -} - -describe('memory-wiki-compat tool registration', () => { - it('registers all wiki_* and memory_* tools when enabled', () => { - const h = makeHarness(); - expect([...h.tools.keys()].sort()).toEqual( - ['memory_get', 'memory_search', 'wiki_apply', 'wiki_get', 'wiki_lint', 'wiki_search', 'wiki_status'].sort() - ); - }); - - it('registers nothing when memoryWikiCompat.enabled=false', () => { - const h = makeHarness({ memoryWikiCompat: { enabled: false } }); - expect(h.tools.size).toBe(0); - }); - - it('registers nothing when mcpUrl is missing', () => { - const h = makeHarness({ mcpUrl: null }); - expect(h.tools.size).toBe(0); - }); -}); - -describe('wiki_status', () => { - it('reports memory_available and watcher_count via query_sdk', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ watchers: [{ watcher_id: 1 }, { watcher_id: 2 }] })); - h.setResponse('search_memory', jsonContent({ content: [] })); - const result = await h.invoke('wiki_status'); - const status = result.details as Record; - expect(status.watcher_count).toBe(2); - expect(status.memory_available).toBe(true); - expect(status.source_of_truth).toBe('lobu-memory-mcp'); - const sdkCall = h.calls.find((c) => c.tool === 'query_sdk'); - expect(sdkCall).toBeDefined(); - expect(String(sdkCall!.args.script)).toContain('client.watchers.list'); - }); -}); - -describe('wiki_search corpus routing', () => { - it('corpus=memory only calls search_memory', async () => { - const h = makeHarness(); - h.setResponse('search_memory', jsonContent({ content: [{ id: 7, title: 'note', text_content: 'hello' }] })); - const result = await h.invoke('wiki_search', { query: 'hello', corpus: 'memory' }); - const toolsHit = h.calls.map((c) => c.tool); - expect(toolsHit).toContain('search_memory'); - expect(toolsHit).not.toContain('list_watchers'); - expect(toolsHit).not.toContain('read_knowledge'); - const results = (result.details as { results: unknown[] }).results; - expect(results.length).toBeGreaterThan(0); - }); - - it('corpus=wiki fans out via query_sdk (one script with watchers.list + knowledge.read)', async () => { - const h = makeHarness(); - h.setResponse( - 'query_sdk', - jsonContent({ - watchers: { watchers: [{ watcher_id: 3, watcher_name: 'hello watcher', historical_content_count: 5 }] }, - claims: { content: [{ id: 11, title: 'a claim' }] }, - syntheses: { content: [] }, - }) - ); - const result = await h.invoke('wiki_search', { query: 'hello there', corpus: 'wiki' }); - const sdkCalls = h.calls.filter((c) => c.tool === 'query_sdk'); - expect(sdkCalls.length).toBe(1); - const script = String(sdkCalls[0].args.script); - expect(script).toContain('client.watchers.list'); - expect(script).toContain("semantic_type: 'claim'"); - expect(script).toContain("semantic_type: 'synthesis'"); - expect(h.calls.find((c) => c.tool === 'search_memory')).toBeUndefined(); - const results = (result.details as { results: Array<{ corpus: string }> }).results; - expect(results.length).toBeGreaterThan(0); - expect(results.every((r) => r.corpus === 'wiki')).toBe(true); - }); - - it('corpus=all merges search_memory + query_sdk; short queries embed includeContent=false', async () => { - const h = makeHarness(); - h.setResponse('search_memory', jsonContent({ content: [{ id: 1, title: 'mem' }] })); - h.setResponse('query_sdk', jsonContent({ watchers: { watchers: [] }, claims: null, syntheses: null })); - await h.invoke('wiki_search', { query: 'hi', corpus: 'all' }); - const toolsHit = h.calls.map((c) => c.tool); - expect(toolsHit).toContain('search_memory'); - expect(toolsHit).toContain('query_sdk'); - const script = String(h.calls.find((c) => c.tool === 'query_sdk')!.args.script); - expect(script).toContain('includeContent = false'); - }); -}); - -describe('upstream failures degrade gracefully', () => { - it('wiki_search corpus=wiki returns degraded empty result when query_sdk returns isError', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', { content: [{ type: 'text', text: 'Tool not found: query_sdk' }], isError: true }); - const result = await h.invoke('wiki_search', { query: 'anything', corpus: 'wiki' }); - const details = result.details as { - results: unknown[]; - degraded: boolean; - fanout_errors: Array<{ part: string; reason: string; error: string }>; - }; - expect(details.degraded).toBe(true); - expect(details.results).toEqual([]); - expect(details.fanout_errors[0]?.part).toBe('wiki'); - expect(details.fanout_errors[0]?.reason).toBe('error'); - expect(details.fanout_errors[0]?.error).toMatch(/query_sdk/); - }); - - it('wiki_get surfaces sandbox success=false envelope as a degraded error result (not a thrown exception)', async () => { - const h = makeHarness(); - h.setResponse( - 'query_sdk', - jsonContent({ - success: false, - error: { name: 'RuntimeUnavailable', message: 'isolated-vm is not installed' }, - }) - ); - const result = await h.invoke('wiki_get', { lookup: 'watcher:42' }); - const details = result.details as Record; - expect(details.degraded).toBe(true); - expect(details.timeout).toBe(false); - expect(details.result).toBeNull(); - expect(String(details.error)).toMatch(/RuntimeUnavailable/); - }); - - it('wiki_get returns null cleanly when the SDK script return_value is explicitly null', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ success: true, return_value: null })); - const result = await h.invoke('wiki_get', { lookup: 'watcher:99' }); - // The result.details.result must be null, not the envelope object - expect((result.details as { result: unknown }).result).toBeNull(); - expect(result.content[0].text).toContain('lookup=watcher:99'); - }); -}); - -describe('wiki_get lookup parser (query_sdk)', () => { - it('watcher:7 -> query_sdk with client.watchers.get("7")', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ watcher_id: '7', name: 'demo' })); - const result = await h.invoke('wiki_get', { lookup: 'watcher:7' }); - expect(h.calls[0].tool).toBe('query_sdk'); - expect(String(h.calls[0].args.script)).toContain('client.watchers.get("7")'); - expect((result.details as { sourceTool: string }).sourceTool).toBe('client.watchers.get'); - }); - - it('window:9 -> query_sdk with client.knowledge.read({ window_id: 9 })', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ content: [] })); - await h.invoke('wiki_get', { lookup: 'window:9' }); - expect(h.calls[0].tool).toBe('query_sdk'); - expect(String(h.calls[0].args.script)).toMatch(/client\.knowledge\.read\(\{\s*window_id:\s*9/); - }); - - it('event:123 -> query_sdk with client.knowledge.read({ content_ids: [123] })', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ content: [{ id: 123 }] })); - await h.invoke('wiki_get', { lookup: 'event:123' }); - expect(h.calls[0].tool).toBe('query_sdk'); - expect(String(h.calls[0].args.script)).toContain('content_ids: [123]'); - }); - - it('reports/watchers/4 path -> client.watchers.get', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ watcher_id: '4' })); - await h.invoke('wiki_get', { lookup: 'reports/watchers/4' }); - expect(String(h.calls[0].args.script)).toContain('client.watchers.get("4")'); - }); - - it('plain natural-language lookup -> client.knowledge.read({ query }) fallback', async () => { - const h = makeHarness(); - h.setResponse('query_sdk', jsonContent({ content: [] })); - await h.invoke('wiki_get', { lookup: 'what did the watcher report on monday' }); - expect(h.calls[0].tool).toBe('query_sdk'); - expect(String(h.calls[0].args.script)).toContain('"what did the watcher report on monday"'); - }); -}); - -describe('wiki_apply create_synthesis', () => { - it('calls save_memory with semantic_type=synthesis and memory_wiki_compat metadata flag', async () => { - const h = makeHarness(); - h.setResponse('save_memory', jsonContent({ id: 42 })); - await h.invoke('wiki_apply', { - op: 'create_synthesis', - title: 'Weekly digest', - body: 'Things happened', - sourceIds: ['ev:1', 'ev:2'], - confidence: 0.7, - }); - expect(h.calls[0].tool).toBe('save_memory'); - const args = h.calls[0].args as { semantic_type: string; metadata: Record }; - expect(args.semantic_type).toBe('synthesis'); - expect(args.metadata.memory_wiki_compat).toBe(true); - expect(args.metadata.source_ids).toEqual(['ev:1', 'ev:2']); - expect(args.metadata.confidence).toBe(0.7); - }); -}); - -describe('wiki_apply update_metadata', () => { - it('routes well-formed corrections through run_sdk -> client.watchers.submitFeedback', async () => { - const h = makeHarness(); - h.setResponse('run_sdk', jsonContent({ ok: true })); - await h.invoke('wiki_apply', { - op: 'update_metadata', - watcher_id: 7, - window_id: 12, - corrections: [{ field_path: 'summary', value: 'better summary' }], - }); - expect(h.calls[0].tool).toBe('run_sdk'); - const script = String(h.calls[0].args.script); - expect(script).toContain('client.watchers.submitFeedback'); - expect(script).toContain('"watcher_id":"7"'); - expect(script).toContain('"window_id":12'); - expect(script).toContain('"field_path":"summary"'); - }); - - it('throws when corrections array is empty', async () => { - const h = makeHarness(); - await expect( - h.invoke('wiki_apply', { op: 'update_metadata', watcher_id: 7, window_id: 12, corrections: [] }) - ).rejects.toThrow(/non-empty/); - expect(h.calls.length).toBe(0); - }); - - it('throws when a correction is missing field_path', async () => { - const h = makeHarness(); - await expect( - h.invoke('wiki_apply', { - op: 'update_metadata', - watcher_id: 7, - window_id: 12, - corrections: [{ value: 'x' }], - }) - ).rejects.toThrow(/field_path/); - expect(h.calls.length).toBe(0); - }); - - it('falls back to save_memory claim when no watcher feedback fields are present', async () => { - const h = makeHarness(); - h.setResponse('save_memory', jsonContent({ id: 99 })); - await h.invoke('wiki_apply', { - op: 'update_metadata', - lookup: 'sources/events/5', - body: 'this is now disputed', - status: 'review', - }); - expect(h.calls[0].tool).toBe('save_memory'); - const args = h.calls[0].args as { semantic_type: string; metadata: Record }; - expect(args.semantic_type).toBe('claim'); - expect(args.metadata.status).toBe('review'); - }); - - it('rejects unsupported ops with a clear error', async () => { - const h = makeHarness(); - await expect(h.invoke('wiki_apply', { op: 'delete_everything' })).rejects.toThrow(/wiki_apply supports/); - }); -}); - -describe('memory_search alias', () => { - it('calls search_memory and ignores corpus argument silently', async () => { - const h = makeHarness(); - h.setResponse('search_memory', { content: [{ type: 'text', text: 'hello' }], isError: false }); - const result = await h.invoke('memory_search', { query: 'hello' }); - expect(h.calls.length).toBe(1); - expect(h.calls[0].tool).toBe('search_memory'); - expect(result.content[0].text).toBe('hello'); - }); -}); - -describe('wiki_lint session-aware', () => { - let h: Harness; - - beforeEach(() => { - h = makeHarness(); - }); - - it('reports ok=true with no calls', async () => { - const result = await h.invoke('wiki_lint'); - const report = result.details as { ok: boolean; observed_calls: number }; - expect(report.ok).toBe(true); - expect(report.observed_calls).toBe(0); - }); - - it('warns when wiki_apply was called without evidence', async () => { - h.setResponse('save_memory', jsonContent({ id: 1 })); - await h.invoke('wiki_apply', { op: 'create_synthesis', title: 't', body: 'b' }); - const result = await h.invoke('wiki_lint'); - const report = result.details as { ok: boolean; warnings: Array<{ reason: string }> }; - expect(report.ok).toBe(false); - expect(report.warnings.some((w) => /no evidence/.test(w.reason))).toBe(true); - }); - - it('warns when wiki_apply status=active has low confidence', async () => { - h.setResponse('save_memory', jsonContent({ id: 2 })); - await h.invoke('wiki_apply', { - op: 'create_synthesis', - title: 't', - body: 'b', - sourceIds: ['ev:1'], - status: 'active', - confidence: 0.2, - }); - const result = await h.invoke('wiki_lint'); - const report = result.details as { ok: boolean; warnings: Array<{ reason: string }> }; - expect(report.warnings.some((w) => /confidence/.test(w.reason))).toBe(true); - }); - - it('warns when wiki_search returns zero results', async () => { - h.setResponse('search_memory', jsonContent({ content: [] })); - h.setResponse('query_sdk', jsonContent({ watchers: { watchers: [] }, claims: null, syntheses: null })); - await h.invoke('wiki_search', { query: 'nonexistent topic', corpus: 'all' }); - const result = await h.invoke('wiki_lint'); - const report = result.details as { warnings: Array<{ reason: string }> }; - expect(report.warnings.some((w) => /zero results/.test(w.reason))).toBe(true); - }); - - it('warns when memory_search returns zero results (matches wiki_search behaviour)', async () => { - h.setResponse('search_memory', { content: [{ type: 'text', text: '' }], isError: false }); - await h.invoke('memory_search', { query: 'no hits' }); - const result = await h.invoke('wiki_lint'); - const report = result.details as { warnings: Array<{ tool: string; reason: string }> }; - expect(report.warnings.some((w) => w.tool === 'memory_search' && /zero results/.test(w.reason))).toBe(true); - }); - - it('does NOT flag wiki_apply update_metadata with camelCase watcherId/windowId as missing evidence', async () => { - h.setResponse('run_sdk', jsonContent({ success: true, return_value: { ok: true } })); - await h.invoke('wiki_apply', { - op: 'update_metadata', - watcherId: 7, - windowId: 12, - corrections: [{ field_path: 'summary', value: 'x' }], - }); - const result = await h.invoke('wiki_lint'); - const report = result.details as { warnings: Array<{ reason: string }> }; - expect(report.warnings.every((w) => !/no evidence/.test(w.reason))).toBe(true); - }); -}); - -describe('fanout timeout (slow upstreams do not block whole tool call)', () => { - it('wiki_status returns partial status when watchers fanout times out', async () => { - const h = makeHarness({ memoryWikiCompat: { enabled: true, fanoutTimeoutMs: 50 } }); - h.setResponse('query_sdk', jsonContent({ watchers: [{ watcher_id: 1 }] })); - h.setDelay('query_sdk', 200); - h.setResponse('search_memory', jsonContent({ content: [] })); - - const started = Date.now(); - const result = await h.invoke('wiki_status'); - const elapsed = Date.now() - started; - - const status = result.details as Record; - expect(elapsed).toBeLessThan(180); - expect(status.degraded).toBe(true); - expect(status.timeouts).toEqual(['watchers']); - expect(status.watcher_count).toBeNull(); - expect(status.memory_available).toBe(true); - expect(Array.isArray(status.fanout_errors)).toBe(true); - expect(h.calls.find((call) => call.tool === 'query_sdk')?.options?.signal?.aborted).toBe(true); - }); - - it('wiki_search corpus=all merges partial results when one side times out', async () => { - const h = makeHarness({ memoryWikiCompat: { enabled: true, fanoutTimeoutMs: 50 } }); - h.setResponse('search_memory', jsonContent({ content: [{ id: 1, title: 'memory hit', text_content: 'hi' }] })); - h.setResponse('query_sdk', jsonContent({ watchers: { watchers: [] }, claims: null, syntheses: null })); - h.setDelay('query_sdk', 200); - - const started = Date.now(); - const result = await h.invoke('wiki_search', { query: 'hi', corpus: 'all' }); - const elapsed = Date.now() - started; - - const details = result.details as { - results: Array<{ corpus: string }>; - degraded: boolean; - timeouts: string[]; - }; - expect(elapsed).toBeLessThan(180); - expect(details.degraded).toBe(true); - expect(details.timeouts).toEqual(['wiki']); - expect(details.results.some((r) => r.corpus === 'memory')).toBe(true); - expect(result.content[0]!.text).toMatch(/partial results/); - }); - - it('wiki_get reports a clean timeout error when the SDK script hangs', async () => { - const h = makeHarness({ memoryWikiCompat: { enabled: true, fanoutTimeoutMs: 50 } }); - h.setResponse('query_sdk', jsonContent({ id: 7 })); - h.setDelay('query_sdk', 200); - - const started = Date.now(); - const result = await h.invoke('wiki_get', { lookup: 'watcher:7' }); - const elapsed = Date.now() - started; - - const details = result.details as Record; - expect(elapsed).toBeLessThan(180); - expect(details.degraded).toBe(true); - expect(details.timeout).toBe(true); - expect(details.result).toBeNull(); - expect(result.content[0]!.text).toMatch(/timeout/); - }); - - it('wiki_search corpus=all does not flag degraded when both sides return in time', async () => { - const h = makeHarness({ memoryWikiCompat: { enabled: true, fanoutTimeoutMs: 200 } }); - h.setResponse('search_memory', jsonContent({ content: [{ id: 1, title: 'm', text_content: 'x' }] })); - h.setResponse('query_sdk', jsonContent({ watchers: { watchers: [] }, claims: null, syntheses: null })); - - const result = await h.invoke('wiki_search', { query: 'hi', corpus: 'all' }); - const details = result.details as { degraded: boolean; timeouts: string[] }; - expect(details.degraded).toBe(false); - expect(details.timeouts).toEqual([]); - }); -}); diff --git a/packages/server/src/__tests__/setup/test-fixtures.ts b/packages/server/src/__tests__/setup/test-fixtures.ts index 23400d6bd..5bc4d835d 100644 --- a/packages/server/src/__tests__/setup/test-fixtures.ts +++ b/packages/server/src/__tests__/setup/test-fixtures.ts @@ -8,16 +8,11 @@ import { serializeSigned } from 'hono/utils/cookie'; import { hashClientSecret } from '../../auth/oauth/clients'; import { generateSecureToken, hashToken } from '../../auth/oauth/utils'; -import { pgTextArray } from '../../db/client'; +import { pgBigintArray, pgTextArray } from '../../db/client'; import { ensureUniqueConnectionSlug } from '../../utils/connections'; import { generateSlug } from '../../utils/entity-management'; import { getTestDb } from './test-db'; -/** Format a JS number array as a PostgreSQL bigint[] literal. */ -function pgBigintArray(values: number[]): string { - return '{' + values.map((v) => String(Math.trunc(v))).join(',') + '}'; -} - const TEST_SYSTEM_ORG_ID = 'default'; const TEST_AUTH_SECRET = 'test-auth-secret-for-testing-only'; diff --git a/packages/server/src/auth/__tests__/config.test.ts b/packages/server/src/auth/__tests__/config.test.ts index a4410eceb..f1a0114c1 100644 --- a/packages/server/src/auth/__tests__/config.test.ts +++ b/packages/server/src/auth/__tests__/config.test.ts @@ -7,7 +7,6 @@ import { collectEnabledLoginProviderConfigs, getAuthConfig, getLoginProviderScopes, - isSupportedLoginProvider, } from '../config'; describe('login provider helpers', () => { @@ -30,12 +29,6 @@ describe('login provider helpers', () => { expect(getLoginProviderScopes('reddit')).toBeNull(); }); - it('recognizes supported login providers only when scopes are declared', () => { - expect(isSupportedLoginProvider('google')).toBe(false); - expect(isSupportedLoginProvider('github', ['read:user', 'user:email'])).toBe(true); - expect(isSupportedLoginProvider('twitter', ['users.read', 'users.email'])).toBe(true); - }); - it('dedupes providers and ignores connectors without declared login scopes', () => { const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); diff --git a/packages/server/src/auth/config.ts b/packages/server/src/auth/config.ts index c7fed1118..e86c26f31 100644 --- a/packages/server/src/auth/config.ts +++ b/packages/server/src/auth/config.ts @@ -65,13 +65,6 @@ export function getLoginProviderScopes( return normalizeScopes(explicitScopes); } -export function isSupportedLoginProvider( - provider: string, - explicitScopes?: readonly string[] -): boolean { - return getLoginProviderScopes(provider, explicitScopes) !== null; -} - function getOAuthMethodsFromSchema( authSchema: LoginProviderConfigRow['auth_schema'] ): OAuthMethod[] { diff --git a/packages/server/src/connect/oauth-providers.ts b/packages/server/src/connect/oauth-providers.ts index 7e335721a..76de824c3 100644 --- a/packages/server/src/connect/oauth-providers.ts +++ b/packages/server/src/connect/oauth-providers.ts @@ -18,6 +18,19 @@ interface OAuthProviderConfig { tokenEndpointAuthMethod?: OAuthTokenEndpointAuthMethod; } +/** + * `OAuthProviderConfig` with the endpoint URLs optional. Each consumer needs + * only one of authorize / token / userinfo, so the resolver returns whatever it + * could resolve and the caller null-checks the field it actually uses. + */ +type ResolvedProviderConfig = Omit< + OAuthProviderConfig, + 'authorizationUrl' | 'tokenUrl' +> & { + authorizationUrl?: string; + tokenUrl?: string; +}; + const providers: Record = { google: { authorizationUrl: 'https://accounts.google.com/o/oauth2/v2/auth', @@ -67,7 +80,7 @@ function resolveProviderConfig(params: { userinfoUrl?: string; authParams?: Record; tokenEndpointAuthMethod?: OAuthTokenEndpointAuthMethod; -}): OAuthProviderConfig | null { +}): ResolvedProviderConfig { const builtIn = providers[params.provider] ?? null; const authorizationUrl = params.authorizationUrl ?? builtIn?.authorizationUrl; @@ -80,11 +93,9 @@ function resolveProviderConfig(params: { const tokenEndpointAuthMethod = params.tokenEndpointAuthMethod ?? builtIn?.tokenEndpointAuthMethod ?? 'client_secret_post'; - if (!authorizationUrl || !tokenUrl) return null; - return { - authorizationUrl, - tokenUrl, + ...(authorizationUrl ? { authorizationUrl } : {}), + ...(tokenUrl ? { tokenUrl } : {}), ...(userinfoUrl ? { userinfoUrl } : {}), ...(Object.keys(authParams).length > 0 ? { authParams } : {}), tokenEndpointAuthMethod, @@ -108,9 +119,8 @@ export function buildAuthorizationUrl(params: { provider: params.provider, authorizationUrl: params.authorizationUrl, authParams: params.authParams, - tokenUrl: 'https://example.invalid/token', }); - if (!config) return null; + if (!config.authorizationUrl) return null; const url = new URL(config.authorizationUrl); url.searchParams.set('client_id', params.clientId); @@ -160,9 +170,8 @@ export async function exchangeCodeForTokens(params: { provider: params.provider, tokenUrl: params.tokenUrl, tokenEndpointAuthMethod: params.tokenEndpointAuthMethod, - authorizationUrl: 'https://example.invalid/authorize', }); - if (!config) return null; + if (!config.tokenUrl) return null; const authMethod = config.tokenEndpointAuthMethod ?? 'client_secret_post'; @@ -259,10 +268,8 @@ async function fetchRawUserInfo(params: { const config = resolveProviderConfig({ provider: params.provider, userinfoUrl: params.userinfoUrl, - authorizationUrl: 'https://example.invalid/authorize', - tokenUrl: 'https://example.invalid/token', }); - if (!config?.userinfoUrl) return null; + if (!config.userinfoUrl) return null; try { const headers: Record = { diff --git a/packages/server/src/email/send.ts b/packages/server/src/email/send.ts index c02a39350..35ab29302 100644 --- a/packages/server/src/email/send.ts +++ b/packages/server/src/email/send.ts @@ -19,17 +19,14 @@ export interface TransactionalEmailResult { id: string | null; } -const DEV_FALLBACK_FROM: Record = { - auth: 'Lobu ', - invite: 'Lobu ', -}; +const DEV_FALLBACK_FROM = 'Lobu '; function resolveFrom(env: Env, category: EmailCategory, override?: string): string | null { if (override) return override; const configured = category === 'auth' ? env.EMAIL_FROM_AUTH : env.EMAIL_FROM_INVITES; if (configured) return configured; const runtimeNodeEnv = env.NODE_ENV || process.env.NODE_ENV || 'development'; - if (runtimeNodeEnv !== 'production') return DEV_FALLBACK_FROM[category]; + if (runtimeNodeEnv !== 'production') return DEV_FALLBACK_FROM; return null; } diff --git a/packages/server/src/events/emitter.ts b/packages/server/src/events/emitter.ts index a7ee2e34a..a54077fa3 100644 --- a/packages/server/src/events/emitter.ts +++ b/packages/server/src/events/emitter.ts @@ -6,8 +6,6 @@ interface InvalidationEvent { /** Query keys to invalidate (e.g. ['resolve-path'], ['workspace-bootstrap']) */ keys: string[]; - /** Optional: specific resource that changed */ - resource?: { type: string; id: string | number }; } type Listener = (event: InvalidationEvent) => void; diff --git a/packages/server/src/gateway/__tests__/utils/rate-limiter.test.ts b/packages/server/src/gateway/__tests__/utils/rate-limiter.test.ts deleted file mode 100644 index 34d22f838..000000000 --- a/packages/server/src/gateway/__tests__/utils/rate-limiter.test.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { beforeAll, beforeEach, describe, expect, test } from "bun:test"; -import { getDb } from "../../../db/client.js"; -import { - FixedWindowRateLimiter, - getClientIp, -} from "../../utils/rate-limiter.js"; -import { - ensurePgliteForGatewayTests, - resetTestDatabase, -} from "../helpers/db-setup.js"; - -describe("FixedWindowRateLimiter", () => { - beforeAll(async () => { - await ensurePgliteForGatewayTests(); - }); - - beforeEach(async () => { - await resetTestDatabase(); - }); - - test("allows requests within the window and blocks after the limit", async () => { - const limiter = new FixedWindowRateLimiter(); - - const first = await limiter.consume({ - key: "rate:test:1", - limit: 2, - windowSeconds: 60, - }); - const second = await limiter.consume({ - key: "rate:test:1", - limit: 2, - windowSeconds: 60, - }); - const third = await limiter.consume({ - key: "rate:test:1", - limit: 2, - windowSeconds: 60, - }); - - expect(first.allowed).toBe(true); - expect(first.remaining).toBe(1); - expect(second.allowed).toBe(true); - expect(second.remaining).toBe(0); - expect(third.allowed).toBe(false); - expect(third.retryAfterSeconds).toBeGreaterThan(0); - }); - - test("resets after the time window", async () => { - const limiter = new FixedWindowRateLimiter(); - - await limiter.consume({ - key: "rate:test:2", - limit: 1, - windowSeconds: 60, - }); - const blocked = await limiter.consume({ - key: "rate:test:2", - limit: 1, - windowSeconds: 60, - }); - expect(blocked.allowed).toBe(false); - - // Force the existing window to be in the past so the next consume() - // resets the counter via the CASE branch — no clock-shim needed. - const sql = getDb(); - await sql`UPDATE rate_limits SET expires_at = now() - interval '1 second' WHERE key = 'rate:test:2'`; - - const reset = await limiter.consume({ - key: "rate:test:2", - limit: 1, - windowSeconds: 60, - }); - expect(reset.allowed).toBe(true); - expect(reset.count).toBe(1); - }); - - test("reset clears the tracked key", async () => { - const limiter = new FixedWindowRateLimiter(); - - await limiter.consume({ - key: "rate:test:3", - limit: 1, - windowSeconds: 60, - }); - await limiter.reset("rate:test:3"); - - const next = await limiter.consume({ - key: "rate:test:3", - limit: 1, - windowSeconds: 60, - }); - expect(next.allowed).toBe(true); - expect(next.count).toBe(1); - }); -}); - -describe("getClientIp", () => { - test("prefers x-forwarded-for, then x-real-ip, then unknown", () => { - expect( - getClientIp({ - forwardedFor: "203.0.113.1, 10.0.0.1", - realIp: "198.51.100.1", - }) - ).toBe("203.0.113.1"); - - expect( - getClientIp({ - realIp: "198.51.100.1", - }) - ).toBe("198.51.100.1"); - - expect(getClientIp({})).toBe("unknown"); - }); -}); diff --git a/packages/server/src/gateway/cli/gateway.ts b/packages/server/src/gateway/cli/gateway.ts index 2a57ce953..b29cb7da0 100644 --- a/packages/server/src/gateway/cli/gateway.ts +++ b/packages/server/src/gateway/cli/gateway.ts @@ -1,8 +1,5 @@ #!/usr/bin/env bun -import type { Server } from "node:http"; -import { createServer } from "node:http"; -import { getRequestListener } from "@hono/node-server"; import { OpenAPIHono } from "@hono/zod-openapi"; import { createLogger } from "@lobu/core"; import { apiReference } from "@scalar/hono-api-reference"; @@ -49,8 +46,6 @@ import { createSlackRoutes } from "../routes/public/slack.js"; const logger = createLogger("gateway-startup"); -let httpServer: Server | null = null; - interface CreateGatewayAppOptions { secretProxy: any; workerGateway: any; @@ -67,8 +62,8 @@ interface CreateGatewayAppOptions { /** * Create the Hono app with all gateway routes. - * Returns the app without starting an HTTP server — the caller can mount it - * on their own server (embedded mode) or pass it to `startGatewayServer()`. + * Returns the app without starting an HTTP server — the caller mounts it on its + * own server (embedded mode; see `src/server.ts` / `src/lobu/gateway.ts`). */ export function createGatewayApp( options: CreateGatewayAppOptions @@ -883,146 +878,3 @@ Agents can be configured with custom MCP (Model Context Protocol) servers: return app; } - -/** - * Start an HTTP server for the gateway Hono app. - * Used in standalone mode. In embedded mode, the host creates its own server. - */ -function startGatewayServer(app: OpenAPIHono, port = 8080): Server { - const honoListener = getRequestListener(app.fetch); - const server = createServer(honoListener); - server.listen(port); - logger.debug(`Server listening on port ${port}`); - return server; -} - -/** - * Start the gateway with the provided configuration - */ -export async function startGateway(config: GatewayConfig): Promise { - logger.info("Starting Lobu Gateway"); - - const { startFilteringProxy } = await import("../proxy/proxy-manager.js"); - await startFilteringProxy(); - - const { Orchestrator } = await import("../orchestration/index.js"); - const { Gateway } = await import("../gateway-main.js"); - - logger.debug("Creating orchestrator"); - const orchestrator = new Orchestrator(config.orchestration); - await orchestrator.start(); - logger.debug("Orchestrator started"); - - const gateway = new Gateway(config); - - const { ApiPlatform } = await import("../api/platform.js"); - const apiPlatform = new ApiPlatform(); - gateway.registerPlatform(apiPlatform); - logger.debug("API platform registered"); - - await gateway.start(); - logger.debug("Gateway started"); - - // Get core services - const coreServices = gateway.getCoreServices(); - - // Wire grant store to HTTP proxy for domain grant checks - const grantStore = coreServices.getGrantStore(); - if (grantStore) { - const { setProxyGrantStore } = await import("../proxy/http-proxy.js"); - setProxyGrantStore(grantStore); - logger.debug("Grant store connected to HTTP proxy"); - } - - // Wire policy store + egress judge into the HTTP proxy for judged-domain - // rules declared by skills or agent config. - const policyStore = coreServices.getPolicyStore(); - if (policyStore) { - const { setProxyPolicyStore } = await import("../proxy/http-proxy.js"); - setProxyPolicyStore(policyStore); - logger.debug("Policy store connected to HTTP proxy"); - } - - await orchestrator.injectCoreServices( - coreServices.getSecretStore(), - coreServices.getProviderCatalogService(), - coreServices.getGrantStore() ?? undefined, - coreServices.getPolicyStore() ?? undefined - ); - logger.debug("Orchestrator configured with core services"); - - // Note: file-based reload + connection-seeding has been removed — - // lobu.toml is no longer read at gateway boot. Agents and connections - // enter Postgres via `lobu apply` (CLI) or the web UI. - - const { ChatInstanceManager } = await import( - "../connections/chat-instance-manager.js" - ); - const { ChatResponseBridge } = await import( - "../connections/chat-response-bridge.js" - ); - const chatInstanceManager = new ChatInstanceManager(); - try { - await chatInstanceManager.initialize(coreServices); - - for (const adapter of chatInstanceManager.createPlatformAdapters()) { - gateway.registerPlatform(adapter); - } - logger.debug("ChatInstanceManager initialized"); - - const unifiedConsumer = gateway.getUnifiedConsumer(); - if (unifiedConsumer) { - const chatResponseBridge = new ChatResponseBridge(chatInstanceManager); - unifiedConsumer.setChatResponseBridge(chatResponseBridge); - logger.debug("ChatResponseBridge wired to unified thread consumer"); - } - } catch (error) { - logger.warn( - { error: String(error) }, - "ChatInstanceManager initialization failed — connections feature disabled" - ); - } - - if (!httpServer) { - const app = createGatewayApp({ - secretProxy: coreServices.getSecretProxy(), - workerGateway: coreServices.getWorkerGateway(), - mcpProxy: coreServices.getMcpProxy(), - interactionService: coreServices.getInteractionService(), - platformRegistry: gateway.getPlatformRegistry(), - coreServices, - chatInstanceManager, - }); - httpServer = startGatewayServer(app); - } - - logger.info("Lobu Gateway is running!"); - - const cleanup = async () => { - logger.info("Shutting down gateway..."); - - // Hard deadline: force exit after 30s if graceful shutdown stalls - const hardDeadline = setTimeout(() => { - logger.error("Graceful shutdown timed out after 30s, forcing exit"); - process.exit(1); - }, 30_000); - hardDeadline.unref(); - - await chatInstanceManager.shutdown(); - await orchestrator.stop(); - await gateway.stop(); - if (httpServer) { - httpServer.close(); - } - logger.info("Gateway shutdown complete"); - process.exit(0); - }; - - process.on("SIGINT", cleanup); - process.on("SIGTERM", cleanup); - - process.on("SIGUSR1", () => { - const status = gateway.getStatus(); - logger.info("Health check:", JSON.stringify(status, null, 2)); - }); -} diff --git a/packages/server/src/gateway/cli/index.ts b/packages/server/src/gateway/cli/index.ts deleted file mode 100644 index 092877a53..000000000 --- a/packages/server/src/gateway/cli/index.ts +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env bun - -import { ConfigError, createLogger, initSentry, initTracing } from "@lobu/core"; -import { Command } from "commander"; -import { - buildGatewayConfig, - displayGatewayConfig, - loadEnvFile, -} from "../config/index.js"; -import { startGateway } from "./gateway.js"; - -const logger = createLogger("cli"); - -async function main() { - const program = new Command(); - - program - .name("lobu-gateway") - .description( - "Lobu gateway service — API-driven platform connections via Chat SDK" - ) - .version("1.0.0"); - - program - .option("--env ", "Path to .env file (default: .env)") - .option("--validate", "Validate configuration and exit") - .option("--show-config", "Display parsed configuration and exit") - .action(async (options) => { - try { - loadEnvFile(options.env); - - await initSentry(); - - initTracing({ - serviceName: "lobu-gateway", - serviceVersion: process.env.npm_package_version || "2.0.0", - otlpEndpoint: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, - enabled: !!process.env.OTEL_EXPORTER_OTLP_ENDPOINT, - }); - - const config = buildGatewayConfig(); - - if (options.validate) { - console.log("Configuration is valid"); - displayGatewayConfig(config); - process.exit(0); - } - - if (options.showConfig) { - displayGatewayConfig(config); - process.exit(0); - } - - await startGateway(config); - } catch (error) { - if (error instanceof ConfigError) { - logger.error("Configuration error:", error.message); - process.exit(1); - } - logger.error( - "Failed to start gateway:", - error instanceof Error ? error.message : String(error) - ); - process.exit(1); - } - }); - - await program.parseAsync(process.argv); -} - -main().catch((error) => { - logger.error("CLI error:", error); - process.exit(1); -}); diff --git a/packages/server/src/gateway/config/index.ts b/packages/server/src/gateway/config/index.ts index 5bb8e643a..b6b3b6fe9 100644 --- a/packages/server/src/gateway/config/index.ts +++ b/packages/server/src/gateway/config/index.ts @@ -14,7 +14,6 @@ import { getRequiredEnv, TIME, } from "@lobu/core"; -import { config as dotenvConfig } from "dotenv"; import type { OrchestratorConfig } from "../orchestration/base-deployment-manager.js"; const __filename = fileURLToPath(import.meta.url); @@ -56,11 +55,6 @@ const DEFAULTS = { ...GATEWAY_DEFAULTS, } as const; -const DISPLAY = { - SEPARATOR_LENGTH: 50, - TOKEN_PREVIEW_LENGTH: 10, -} as const; - /** Recursively makes all properties optional */ type DeepPartial = { [P in keyof T]?: T[P] extends (infer U)[] @@ -138,33 +132,6 @@ export interface GatewayConfig { }; } -export function loadEnvFile(envPath?: string): void { - if (process.env.NODE_ENV === "production") { - logger.debug("Production mode - skipping .env file"); - return; - } - - const envProvided = Boolean(envPath); - const resolvedPath = envProvided - ? path.resolve(process.cwd(), envPath!) - : path.resolve(process.cwd(), ".env"); - - if (existsSync(resolvedPath)) { - // .env is the single source of truth for dev. `override: true` so - // values in the file win over stale shell exports inherited from the - // user's environment. Production (`NODE_ENV=production`) skips this - // path entirely, so real deployments are unaffected. - dotenvConfig({ path: resolvedPath, override: true }); - logger.debug(`Loaded environment variables from ${resolvedPath}`); - } else if (envProvided) { - logger.warn( - `Specified env file ${resolvedPath} was not found; continuing without it.` - ); - } else { - logger.debug("No .env file found; relying on process environment."); - } -} - /** * Derive the internal gateway URL for worker→gateway communication. * Embedded workers are subprocesses on the same host; the gateway is @@ -520,36 +487,3 @@ export function buildGatewayConfig( return config; } -/** - * Display gateway configuration (platform-agnostic parts only) - * Platform-specific display should be handled by platform modules - */ -export function displayGatewayConfig(config: GatewayConfig): void { - const separator = "=".repeat(DISPLAY.SEPARATOR_LENGTH); - - console.log("Gateway Configuration:"); - console.log(separator); - - console.log("\nQueues:"); - console.log(` Retry Limit: ${config.queues.retryLimit}`); - console.log(` Retry Delay: ${config.queues.retryDelay}s`); - - console.log("\nMCP:"); - console.log( - ` Public Gateway: ${config.mcp.publicGatewayUrl || "(not set)"}` - ); - - console.log("\nOrchestration:"); - console.log( - ` Max Deployments: ${config.orchestration.worker.maxDeployments}` - ); - - console.log("\nHealth:"); - console.log(` Socket Check Interval: ${config.health.checkIntervalMs}ms`); - console.log(` Socket Stale Threshold: ${config.health.staleThresholdMs}ms`); - console.log( - ` Protect Active Workers: ${config.health.protectActiveWorkers}` - ); - - console.log(`\n${separator}`); -} diff --git a/packages/server/src/gateway/connections/platform-strategies/index.ts b/packages/server/src/gateway/connections/platform-strategies/index.ts index 0d5637834..75b5b60bd 100644 --- a/packages/server/src/gateway/connections/platform-strategies/index.ts +++ b/packages/server/src/gateway/connections/platform-strategies/index.ts @@ -450,15 +450,7 @@ class SlackResponseStrategy implements PlatformResponseStrategy { } } -/** - * Telegram currently uses default behavior (streaming through the Chat SDK). - * Kept as a named subclass so future Telegram-specific tweaks have an obvious - * home and the bridge's strategy map reads explicitly. - */ -class TelegramResponseStrategy extends DefaultResponseStrategy {} - const slackStrategy = new SlackResponseStrategy(); -const telegramStrategy = new TelegramResponseStrategy(); const defaultStrategy = new DefaultResponseStrategy(); export function getResponseStrategy( @@ -467,8 +459,6 @@ export function getResponseStrategy( switch (platform) { case "slack": return slackStrategy; - case "telegram": - return telegramStrategy; default: return defaultStrategy; } diff --git a/packages/server/src/gateway/routes/public/agent-config.ts b/packages/server/src/gateway/routes/public/agent-config.ts index 970768393..04d6cf411 100644 --- a/packages/server/src/gateway/routes/public/agent-config.ts +++ b/packages/server/src/gateway/routes/public/agent-config.ts @@ -33,7 +33,7 @@ import { } from "../../modules/module-system.js"; import type { GrantStore } from "../../permissions/grant-store.js"; import { errorResponse } from "../shared/helpers.js"; -import { createTokenVerifier } from "../shared/token-verifier.js"; +import { createTokenVerifier } from "../shared/agent-ownership.js"; import { verifySettingsSessionOrToken } from "./settings-auth.js"; const TAG = "Configuration"; diff --git a/packages/server/src/gateway/routes/public/agent-history.ts b/packages/server/src/gateway/routes/public/agent-history.ts index ee4c2c5b0..56c77515f 100644 --- a/packages/server/src/gateway/routes/public/agent-history.ts +++ b/packages/server/src/gateway/routes/public/agent-history.ts @@ -13,7 +13,7 @@ import { Hono } from "hono"; import type { UserAgentsStore } from "../../auth/user-agents-store.js"; import type { WorkerConnectionManager } from "../../gateway/connection-manager.js"; import { errorResponse } from "../shared/helpers.js"; -import { createTokenVerifier } from "../shared/token-verifier.js"; +import { createTokenVerifier } from "../shared/agent-ownership.js"; import { verifySettingsSession } from "./settings-auth.js"; const logger = createLogger("agent-history-routes"); diff --git a/packages/server/src/gateway/routes/public/channels.ts b/packages/server/src/gateway/routes/public/channels.ts index 0b508d544..d925ae395 100644 --- a/packages/server/src/gateway/routes/public/channels.ts +++ b/packages/server/src/gateway/routes/public/channels.ts @@ -12,7 +12,7 @@ import { Hono } from "hono"; import type { AgentMetadataStore } from "../../auth/agent-metadata-store.js"; import type { UserAgentsStore } from "../../auth/user-agents-store.js"; import type { ChannelBindingService } from "../../channels/binding-service.js"; -import { createTokenVerifier } from "../shared/token-verifier.js"; +import { createTokenVerifier } from "../shared/agent-ownership.js"; import { verifySettingsSession } from "./settings-auth.js"; const logger = createLogger("channel-binding-routes"); diff --git a/packages/server/src/gateway/routes/shared/agent-ownership.ts b/packages/server/src/gateway/routes/shared/agent-ownership.ts index 9a7c3c11d..68aa1ddbc 100644 --- a/packages/server/src/gateway/routes/shared/agent-ownership.ts +++ b/packages/server/src/gateway/routes/shared/agent-ownership.ts @@ -13,24 +13,16 @@ interface AgentOwnershipResult { ownerUserId?: string; } -// Platforms whose user IDs come from an OAuth provider (so the session's -// `oauthUserId` is the canonical lookup key). All other platforms hand us a -// deterministic user ID directly (e.g. Telegram's claim-code flow), so -// `session.userId` is authoritative. Add an entry here if you wire up a new -// OAuth-based platform. -const OAUTH_PLATFORMS: ReadonlySet = new Set(); - +// `external` sessions carry an OAuth-provider user ID, so prefer that as the +// canonical lookup key; every other platform hands us a deterministic user ID +// directly (e.g. Telegram's claim-code flow), so `session.userId` is +// authoritative. export function resolveSettingsLookupUserId( session: SettingsTokenPayload ): string { - if (session.platform === "external") { - return session.oauthUserId || session.userId; - } - - const isDeterministic = !OAUTH_PLATFORMS.has(session.platform); - return isDeterministic - ? session.userId - : session.oauthUserId || session.userId; + return session.platform === "external" + ? session.oauthUserId || session.userId + : session.userId; } function sessionMatchesMetadataOwner( @@ -105,3 +97,20 @@ export async function verifyOwnedAgentAccess( ownerUserId: metadata.owner.userId, }; } + +/** + * Create a token verifier function scoped to a given config. + * + * The returned async function accepts a decoded settings token payload and an + * agentId, then returns the payload if the caller is authorised, or null. + */ +export function createTokenVerifier(config: AgentOwnershipConfig) { + return async ( + payload: SettingsTokenPayload | null, + agentId: string + ): Promise => { + if (!payload) return null; + const result = await verifyOwnedAgentAccess(payload, agentId, config); + return result.authorized ? payload : null; + }; +} diff --git a/packages/server/src/gateway/routes/shared/token-verifier.ts b/packages/server/src/gateway/routes/shared/token-verifier.ts deleted file mode 100644 index d8bf62dc2..000000000 --- a/packages/server/src/gateway/routes/shared/token-verifier.ts +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Shared token verification utility for public routes. - * - * Verifies a settings token against an agentId by checking direct agentId match, - * user ownership via UserAgentsStore, or canonical metadata owner fallback. - */ - -import type { AgentConfigStore } from "@lobu/core"; -import type { SettingsTokenPayload } from "../../auth/settings/token-service.js"; -import type { UserAgentsStore } from "../../auth/user-agents-store.js"; -import { verifyOwnedAgentAccess } from "./agent-ownership.js"; - -interface TokenVerifierConfig { - userAgentsStore?: UserAgentsStore; - agentMetadataStore?: Pick; -} - -/** - * Create a token verifier function scoped to a given config. - * - * The returned async function accepts a decoded settings token payload and an - * agentId, then returns the payload if the caller is authorised, or null. - */ -export function createTokenVerifier(config: TokenVerifierConfig) { - return async ( - payload: SettingsTokenPayload | null, - agentId: string - ): Promise => { - if (!payload) return null; - - const result = await verifyOwnedAgentAccess(payload, agentId, config); - return result.authorized ? payload : null; - }; -} diff --git a/packages/server/src/gateway/utils/rate-limiter.ts b/packages/server/src/gateway/utils/rate-limiter.ts index 04da94690..9a9d9a459 100644 --- a/packages/server/src/gateway/utils/rate-limiter.ts +++ b/packages/server/src/gateway/utils/rate-limiter.ts @@ -1,123 +1,11 @@ import { getDb } from "../../db/client.js"; -interface FixedWindowRateLimitOptions { - key: string; - limit: number; - windowSeconds: number; -} - -interface FixedWindowRateLimitResult { - allowed: boolean; - count: number; - limit: number; - remaining: number; - retryAfterSeconds: number; - resetAt: number; -} - /** - * Fixed-window rate limiter backed by `public.rate_limits`. - * - * Each row is `(key, count, window_started_at, expires_at)` — one row per - * key. consume() does an upsert that: - * - inserts (count=1, window_started_at=now, expires_at=now+windowSeconds) - * when no row exists, - * - resets the row when the existing window has already expired, - * - bumps `count` when the window is still live. + * Sweep expired `public.rate_limits` rows. Safe to call periodically. * - * Same fixed-window-counter semantics as a typical INCR + EXPIRE loop. - */ -export class FixedWindowRateLimiter { - async consume( - options: FixedWindowRateLimitOptions - ): Promise { - const sql = getDb(); - const windowSeconds = options.windowSeconds; - const intervalMs = windowSeconds * 1000; - - // ON CONFLICT: if the row's window already expired, reset count and - // window markers; otherwise bump count by 1. The CASE expression keeps - // this atomic in a single statement. - const rows = await sql` - INSERT INTO rate_limits (key, count, window_started_at, expires_at, updated_at) - VALUES ( - ${options.key}, - 1, - now(), - now() + (${intervalMs} || ' milliseconds')::interval, - now() - ) - ON CONFLICT (key) DO UPDATE SET - count = CASE - WHEN rate_limits.expires_at <= now() THEN 1 - ELSE rate_limits.count + 1 - END, - window_started_at = CASE - WHEN rate_limits.expires_at <= now() THEN now() - ELSE rate_limits.window_started_at - END, - expires_at = CASE - WHEN rate_limits.expires_at <= now() - THEN now() + (${intervalMs} || ' milliseconds')::interval - ELSE rate_limits.expires_at - END, - updated_at = now() - RETURNING count, expires_at - `; - - const row = rows[0] as { count: number; expires_at: Date | string }; - const expiresAt = - row.expires_at instanceof Date - ? row.expires_at.getTime() - : Date.parse(String(row.expires_at)); - const ttlSeconds = Math.max( - 1, - Math.ceil((expiresAt - Date.now()) / 1000) - ); - return this.buildResult(options, row.count, ttlSeconds, expiresAt); - } - - async reset(key: string): Promise { - const sql = getDb(); - await sql`DELETE FROM rate_limits WHERE key = ${key}`; - } - - private buildResult( - options: FixedWindowRateLimitOptions, - count: number, - ttlSeconds: number, - expiresAtMs: number - ): FixedWindowRateLimitResult { - return { - allowed: count <= options.limit, - count, - limit: options.limit, - remaining: Math.max(0, options.limit - count), - retryAfterSeconds: Math.max(1, ttlSeconds), - resetAt: expiresAtMs, - }; - } -} - -export function getClientIp(headers: { - forwardedFor?: string; - realIp?: string; -}): string { - const forwarded = headers.forwardedFor?.split(",")[0]?.trim().toLowerCase(); - if (forwarded) { - return forwarded; - } - - const realIp = headers.realIp?.trim().toLowerCase(); - if (realIp) { - return realIp; - } - - return "unknown"; -} - -/** - * Sweep expired rate_limits rows. Safe to call periodically. + * The `rate_limits` table is a fixed-window counter — one row per key, with + * `(count, window_started_at, expires_at)`. This helper just drains stale rows + * so the table doesn't grow unbounded. */ export async function sweepExpiredRateLimits(): Promise { const sql = getDb(); diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 2c26b2e61..b4c1a5ec8 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -71,6 +71,7 @@ import { getConfiguredPublicOrigin, getSubdomainZone, } from './utils/public-origin'; +import { getSchedulerHealth } from './scheduled/scheduler-health'; import { getClientIP, getRateLimiter, RateLimitPresets } from './utils/rate-limiter'; import { getRuntimeInfo } from './utils/runtime-info'; import { getWorkspaceProvider } from './workspace'; @@ -402,7 +403,6 @@ app.get('/health', (c) => { */ app.get('/health/scheduler', async (c) => { try { - const { getSchedulerHealth } = await import('./scheduled/scheduler-health'); const health = await getSchedulerHealth(c.env); return c.json(health, health.healthy ? 200 : 503); } catch (error) { diff --git a/packages/server/src/lobu/stores/postgres-stores.ts b/packages/server/src/lobu/stores/postgres-stores.ts index db81e13c9..26fb69a5f 100644 --- a/packages/server/src/lobu/stores/postgres-stores.ts +++ b/packages/server/src/lobu/stores/postgres-stores.ts @@ -1,4 +1,5 @@ import { + decrypt, inferGrantKind, type AgentAccessStore, type AgentConfigStore, @@ -121,23 +122,18 @@ function isRedactedSecretValue(value: unknown): value is string { function decryptLegacyEncryptedConfig( config: Record ): Record { - try { - const { decrypt } = require('@lobu/core'); - const result = { ...config }; - for (const [key, value] of Object.entries(result)) { - if (typeof value === 'string' && value.startsWith(ENC_PREFIX)) { - try { - result[key] = decrypt(value.slice(ENC_PREFIX.length)); - } catch { - // Leave encrypted if decryption fails — surfaces as a - // resolveConfigForRuntime error at boot time. - } + const result = { ...config }; + for (const [key, value] of Object.entries(result)) { + if (typeof value === 'string' && value.startsWith(ENC_PREFIX)) { + try { + result[key] = decrypt(value.slice(ENC_PREFIX.length)); + } catch { + // Leave encrypted if decryption fails — surfaces as a + // resolveConfigForRuntime error at boot time. } } - return result; - } catch { - return config; } + return result; } function rowToConnection(row: Record): StoredConnection { diff --git a/packages/server/src/sandbox/namespaces/entities.ts b/packages/server/src/sandbox/namespaces/entities.ts index 98af0f788..8de9b3ab5 100644 --- a/packages/server/src/sandbox/namespaces/entities.ts +++ b/packages/server/src/sandbox/namespaces/entities.ts @@ -8,6 +8,7 @@ import type { Env } from "../../index"; import { manageEntity } from "../../tools/admin/manage_entity"; import type { ToolContext } from "../../tools/registry"; +import { search } from "../../tools/search"; import { createActionCaller } from "./action-call"; export interface EntityListFilter { @@ -125,8 +126,7 @@ export function buildEntitiesNamespace( listLinks(input) { return action("list_links", input); }, - async search(query, options) { - const { search } = await import("../../tools/search"); + search(query, options) { return search( { query, limit: options?.limit } as never, env, diff --git a/packages/server/src/sandbox/namespaces/knowledge.ts b/packages/server/src/sandbox/namespaces/knowledge.ts index ebf22a079..9366aa017 100644 --- a/packages/server/src/sandbox/namespaces/knowledge.ts +++ b/packages/server/src/sandbox/namespaces/knowledge.ts @@ -7,7 +7,11 @@ */ import type { Env } from "../../index"; +import { deleteContent } from "../../tools/delete_content"; +import { getContent } from "../../tools/get_content"; import type { ToolContext } from "../../tools/registry"; +import { saveContent } from "../../tools/save_content"; +import { search } from "../../tools/search"; export interface KnowledgeSearchInput { query?: string; @@ -61,20 +65,16 @@ export function buildKnowledgeNamespace( env: Env ): KnowledgeNamespace { return { - async search(input) { - const { search } = await import("../../tools/search"); + search(input) { return search(input as never, env, ctx) as Promise; }, - async save(input) { - const { saveContent } = await import("../../tools/save_content"); + save(input) { return saveContent(input as never, env, ctx) as Promise; }, - async read(input) { - const { getContent } = await import("../../tools/get_content"); + read(input) { return getContent(input as never, env, ctx) as Promise; }, - async delete(input) { - const { deleteContent } = await import("../../tools/delete_content"); + delete(input) { const args = typeof input === "number" ? { event_id: input } : input ?? {}; return deleteContent(args as never, env, ctx) as Promise; diff --git a/packages/server/src/sandbox/namespaces/watchers.ts b/packages/server/src/sandbox/namespaces/watchers.ts index 242b5734a..46cfb3e0f 100644 --- a/packages/server/src/sandbox/namespaces/watchers.ts +++ b/packages/server/src/sandbox/namespaces/watchers.ts @@ -13,6 +13,7 @@ import { manageWatchers, type ManageWatchersArgs, } from "../../tools/admin/manage_watchers"; +import { getWatcher } from "../../tools/get_watchers"; import type { ToolContext } from "../../tools/registry"; import { createActionCaller } from "./action-call"; @@ -171,8 +172,7 @@ export function buildWatchersNamespace( return { manage: (input) => manage(input as Record), list: (filter) => listWatchers((filter ?? {}) as never, env, ctx) as Promise, - async get(watcher_id) { - const { getWatcher } = await import("../../tools/get_watchers"); + get(watcher_id) { return getWatcher( { watcher_id: asWatcherIdString(watcher_id) } as never, env, diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 335dfb4f3..c38d18d71 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -28,9 +28,16 @@ import path from 'node:path'; import { fileURLToPath } from 'node:url'; import { getRequestListener } from '@hono/node-server'; import { Hono } from 'hono'; +import { closeDbSingleton, probeListenNotify } from './db/client'; import { mountViteDev } from './dev-vite'; import type { Env } from './index'; import { app as mainApp } from './index'; +import { + getLobuCoreServices, + initLobuGateway, + stopLobuGateway, +} from './lobu/gateway'; +import { bootTaskScheduler } from './scheduled/jobs'; import { assertExternalDepsResolvable } from '../../connector-worker/src/runtime-deps'; import { getEnvFromProcess } from './utils/env'; import logger from './utils/logger'; @@ -85,7 +92,6 @@ async function main() { // the poll interval — not an outage. Log loudly so ops can fix the pooler // config, but do not refuse to boot. if (process.env.SKIP_LISTEN_NOTIFY_PROBE !== '1') { - const { probeListenNotify } = await import('./db/client'); try { await probeListenNotify(); logger.info('[DB] LISTEN/NOTIFY probe ok'); @@ -101,7 +107,6 @@ async function main() { await initWorkspaceProvider(); // Initialize embedded Lobu gateway (requires DATABASE_URL) - const { initLobuGateway } = await import('./lobu/gateway'); const lobuApp = await initLobuGateway(); if (lobuApp) { app.route('/lobu', lobuApp); @@ -114,8 +119,6 @@ async function main() { // token refresh, MCP DB cleanup, watcher automation, etc. — runs as a row // in `public.runs` (run_type='task') with cron-driven self-rescheduling. // Cross-pod coordination is the runs-queue claim path. - const { getLobuCoreServices } = await import('./lobu/gateway'); - const { bootTaskScheduler } = await import('./scheduled/jobs'); const taskScheduler = await bootTaskScheduler(getLobuCoreServices(), env); const port = parseInt(process.env.PORT || '8787', 10); @@ -142,9 +145,7 @@ async function main() { logger.info({ signal }, 'Received shutdown signal, stopping gracefully...'); await vite?.close(); taskScheduler.stop(); - const { stopLobuGateway } = await import('./lobu/gateway'); await stopLobuGateway(); - const { closeDbSingleton } = await import('./db/client'); await closeDbSingleton(); httpServer.close(); process.exit(0); diff --git a/packages/server/src/watchers/reaction-executor.ts b/packages/server/src/watchers/reaction-executor.ts index 5fd32e7c5..b9a336b63 100644 --- a/packages/server/src/watchers/reaction-executor.ts +++ b/packages/server/src/watchers/reaction-executor.ts @@ -15,6 +15,7 @@ import type { ReactionContext } from '@lobu/connector-sdk'; import type { Env } from '../index'; import { buildClientSDK } from '../sandbox/client-sdk'; import { runScript } from '../sandbox/run-script'; +import { compileSource } from '../utils/compiler-core'; import logger from '../utils/logger'; const REACTION_TIMEOUT_MS = 60_000; @@ -100,7 +101,6 @@ export async function executeReaction(options: ExecuteReactionOptions): Promise< * pre-validation. */ export async function compileReactionScript(source: string): Promise { - const { compileSource } = await import('../utils/compiler-core'); // Match `runScript`'s execute-time esbuild config exactly so save-time and // runtime accept the same set of imports. Drift here used to externalize // `@lobu/reactions`, which the runtime recompile would then fail to