diff --git a/bun.lock b/bun.lock index 7a45939be..b670b16a3 100644 --- a/bun.lock +++ b/bun.lock @@ -16,7 +16,7 @@ }, "packages/agent-worker": { "name": "@lobu/worker", - "version": "9.2.0", + "version": "9.4.1", "bin": { "lobu-worker": "./dist/index.js", }, @@ -43,7 +43,7 @@ }, "packages/cli": { "name": "@lobu/cli", - "version": "9.2.0", + "version": "9.4.1", "bin": { "lobu": "bin/lobu.js", }, @@ -128,7 +128,7 @@ }, "packages/client": { "name": "@lobu/client", - "version": "9.2.0", + "version": "9.4.1", "devDependencies": { "@hey-api/client-fetch": "^0.13.1", "@hey-api/openapi-ts": "^0.86.5", @@ -137,7 +137,7 @@ }, "packages/connector-sdk": { "name": "@lobu/connector-sdk", - "version": "9.2.0", + "version": "9.4.1", "dependencies": { "@lobu/core": "workspace:*", "@sinclair/typebox": "^0.34.41", @@ -161,7 +161,7 @@ }, "packages/connector-worker": { "name": "@lobu/connector-worker", - "version": "9.2.0", + "version": "9.4.1", "bin": { "connector-worker": "./dist/bin.js", }, @@ -198,7 +198,7 @@ }, "packages/core": { "name": "@lobu/core", - "version": "9.2.0", + "version": "9.4.1", "dependencies": { "@opentelemetry/api": "^1.9.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.57.0", @@ -217,7 +217,7 @@ }, "packages/embeddings": { "name": "@lobu/embeddings", - "version": "9.2.0", + "version": "9.4.1", "dependencies": { "@hono/node-server": "^1.13.7", "@xenova/transformers": "^2.17.2", @@ -249,7 +249,7 @@ }, "packages/openclaw-plugin": { "name": "@lobu/openclaw-plugin", - "version": "9.2.0", + "version": "9.4.1", "dependencies": { "@lobu/core": "workspace:*", }, @@ -334,7 +334,7 @@ }, "packages/pgvector-embedded": { "name": "@lobu/pgvector-embedded", - "version": "9.2.0", + "version": "9.4.1", "devDependencies": { "@types/node": "20.19.9", "typescript": "^5.7.2", @@ -342,7 +342,7 @@ }, "packages/promptfoo-provider": { "name": "@lobu/promptfoo-provider", - "version": "9.2.0", + "version": "9.4.1", "devDependencies": { "@types/node": "^20.10.0", "typescript": "^5.3.3", diff --git a/examples/lobu-crm/funnel-digest.reaction.ts b/examples/lobu-crm/funnel-digest.reaction.ts index 0b902ec4f..ae49186c8 100644 --- a/examples/lobu-crm/funnel-digest.reaction.ts +++ b/examples/lobu-crm/funnel-digest.reaction.ts @@ -3,15 +3,15 @@ * * Runs after the weekly Monday-9am window completes. `ctx.extracted_data` is * whatever the watcher's `extraction_schema` produced — funnel snapshot, top - * action, stale leads, etc. We persist the digest as a `funnel_digest` event - * linked to every lead the watcher knows about so the next digest can compare - * stage_counts week-over-week without re-running classification. - * - * Pair with `notification_priority: high` on the watcher — the OS notification - * fires regardless of whether this script succeeds; this just produces durable - * knowledge. + * action, stale leads, etc. We: + * 1. persist the digest as a `funnel_digest` event linked to every lead the + * watcher knows about, so the next digest can compare stage_counts + * week-over-week without re-running classification; and + * 2. push it to the team via `client.notifications.send` — which fans out to + * the org's active bot connections (the #leads Slack connection) and the + * in-app inbox. `watcher_source` attributes it to this window. */ -import type { ReactionContext } from "@lobu/connector-sdk"; +import type { ReactionClient, ReactionContext } from "@lobu/connector-sdk"; interface DigestData { top_action?: string; @@ -20,7 +20,10 @@ interface DigestData { gap?: string; } -export default async (ctx: ReactionContext, client: any): Promise => { +export default async ( + ctx: ReactionContext, + client: ReactionClient +): Promise => { const data = ctx.extracted_data as DigestData; const stageSummary = Object.entries(data.stage_counts ?? {}) .map(([stage, n]) => `${stage}: ${n}`) @@ -48,4 +51,13 @@ export default async (ctx: ReactionContext, client: any): Promise => { top_action: data.top_action ?? null, }, }); + + await client.notifications.send({ + title: `Weekly funnel digest — ${ctx.window.window_end.slice(0, 10)}`, + body: content, + watcher_source: { + watcher_id: ctx.window.watcher_id, + window_id: ctx.window.id, + }, + }); }; diff --git a/examples/lobu-crm/inbound-triage.reaction.ts b/examples/lobu-crm/inbound-triage.reaction.ts index 04aa30db8..210ff32f4 100644 --- a/examples/lobu-crm/inbound-triage.reaction.ts +++ b/examples/lobu-crm/inbound-triage.reaction.ts @@ -2,8 +2,10 @@ * Reaction for the `inbound-triage` watcher. * * Fires every 2h after the watcher LLM extracts new and enriched leads from - * GitHub/X/HN signals. Persists a `lead_interaction` event per run so the - * next digest can count them. + * GitHub/X/HN signals. Persists a `lead_interaction` event per run so the next + * digest can count them, and — when the run is notable — pushes the recommended + * actions to the team via `client.notifications.send` (fans out to the #leads + * Slack connection + the in-app inbox). */ import type { ReactionClient, ReactionContext } from "@lobu/connector-sdk"; @@ -28,12 +30,14 @@ export default async ( const actions = data.recommended_actions ?? []; if (actions.length === 0) return; + const summary = [ + `Triage run ${ctx.window.window_end.slice(0, 16)} — ${actions.length} action(s)`, + ...actions.map((a, i) => `${i + 1}. ${a}`), + ].join("\n"); + await client.knowledge.save({ entity_ids: ctx.entities.map((e) => e.id), - content: [ - `Triage run ${ctx.window.window_end.slice(0, 16)} — ${actions.length} action(s)`, - ...actions.map((a, i) => `${i + 1}. ${a}`), - ].join("\n"), + content: summary, semantic_type: "lead_interaction", metadata: { window_id: ctx.window.id, @@ -41,4 +45,13 @@ export default async ( action_count: actions.length, }, }); + + await client.notifications.send({ + title: `Inbound triage — ${actions.length} action(s), ${data.new_leads?.length ?? 0} new lead(s)`, + body: summary, + watcher_source: { + watcher_id: ctx.window.watcher_id, + window_id: ctx.window.id, + }, + }); }; diff --git a/packages/connector-sdk/src/index.ts b/packages/connector-sdk/src/index.ts index c9a033ab6..b083dcdc4 100644 --- a/packages/connector-sdk/src/index.ts +++ b/packages/connector-sdk/src/index.ts @@ -146,6 +146,7 @@ export { browserNetworkSync } from './browser-network.js'; export type { ReactionContext } from './reaction-sdk.js'; export type { ReactionClient } from './reaction-client-types.js'; export type { + CardElement, EntityCreateInput, EntityLinkInput, EntityListFilter, @@ -153,6 +154,7 @@ export type { KnowledgeReadInput, KnowledgeSaveInput, KnowledgeSearchInput, + NotificationsSendInput, } from './reaction-client-types.js'; export type { Env } from './types.js'; diff --git a/packages/connector-sdk/src/reaction-client-types.ts b/packages/connector-sdk/src/reaction-client-types.ts index b1d56bdcb..f76ce42f6 100644 --- a/packages/connector-sdk/src/reaction-client-types.ts +++ b/packages/connector-sdk/src/reaction-client-types.ts @@ -16,6 +16,16 @@ import type { ReactionContext } from "./reaction-sdk.js"; export type { ReactionContext }; +/** + * A rich card for chat delivery, as a plain serializable object — a `chat` + * `CardElement` built with the card primitives (`Card`, `Section`, `Field`, + * `Actions`, `Button`, `Select`, …). Typed loosely here so the SDK's published + * declarations don't force consumers to install `chat`; the gateway validates + * and renders it to each platform's native format (Block Kit / Adaptive Cards / + * Google Chat Cards). + */ +export type CardElement = Record; + // ── Knowledge ──────────────────────────────────────────────────────────────── export interface KnowledgeSearchInput { @@ -80,15 +90,44 @@ export interface EntityListFilter { sort_order?: "asc" | "desc"; } +export interface NotificationsSendInput { + /** Notification title (≤200 chars). */ + title: string; + /** Body text (≤1000 chars). */ + body?: string; + /** + * Optional rich card built with the `chat` card primitives (`Card`, + * `Section`, `Field`, `Actions`, `Button`, `Select`, …). When set, + * bot-connection delivery posts this card — rendered to each platform's + * native format (Slack Block Kit, Teams Adaptive Cards, Google Chat Cards) — + * instead of the markdown body; the in-app inbox entry still uses title/body. + */ + card?: CardElement; + /** + * Who to notify. `"admins"` (default): org admins/owners. `"all"`: every + * member. Or an array of specific user IDs. + */ + recipients?: "admins" | "all" | string[]; + /** Relative URL the notification links to (e.g. `/acme/entities`). */ + resource_url?: string; + /** Deliver only through this specific bot connection (its id). */ + connection_id?: string; + /** Arbitrary JSON payload appended to the body as formatted JSON. */ + data?: Record; + /** Attribution when sent from a watcher reaction. */ + watcher_source?: { watcher_id: number; window_id: number }; +} + // ── Client ─────────────────────────────────────────────────────────────────── /** * The client object available in reaction scripts. * - * `client.knowledge` — read/write/search knowledge events - * `client.entities` — CRUD entities and relationships - * `client.query` — raw SQL (results as JSON rows) - * `client.log` — structured logging (appears in watcher run logs) + * `client.knowledge` — read/write/search knowledge events + * `client.entities` — CRUD entities and relationships + * `client.notifications` — push a notification to the org's inbox + bot connections (Slack/Telegram) + * `client.query` — raw SQL (results as JSON rows) + * `client.log` — structured logging (appears in watcher run logs) */ export interface ReactionClient { knowledge: { @@ -125,6 +164,15 @@ export interface ReactionClient { search(query: string, options?: { limit?: number }): Promise; }; + notifications: { + /** + * Send a notification: writes it to the org inbox and fans it out to the + * org's active bot connections (Slack/Telegram). This is how a reaction + * surfaces its digest to a chat channel. + */ + send(input: NotificationsSendInput): Promise<{ notified_count: number }>; + }; + /** Run a read-only SQL query against the org's Postgres. */ query(sql: string): Promise; diff --git a/packages/server/src/__tests__/integration/notifications/bot-delivery.test.ts b/packages/server/src/__tests__/integration/notifications/bot-delivery.test.ts new file mode 100644 index 000000000..17779e44d --- /dev/null +++ b/packages/server/src/__tests__/integration/notifications/bot-delivery.test.ts @@ -0,0 +1,139 @@ +/** + * Integration test for the notification → bot-connection delivery path. + * + * Exercises `resolveBotDeliveryTargets` against a real DB: it JOINs the org's + * active chat connections to their channel bindings and returns the channel(s) + * each notification should post to. This is the path that was a silent no-op + * after #846 removed the HTTP endpoints the old implementation called. + */ + +import { afterAll, beforeEach, describe, expect, it } from 'vitest'; +import { getTestDb, cleanupTestDatabase } from '../../setup/test-db'; +import { createTestAgent, createTestOrganization } from '../../setup/test-fixtures'; +import { resolveBotDeliveryTargets } from '../../../notifications/service'; + +async function seedSlackConnection(opts: { + organizationId: string; + agentId: string; + connectionId: string; + status?: string; +}): Promise { + const sql = getTestDb(); + await sql` + INSERT INTO agent_connections + (id, organization_id, agent_id, platform, config, settings, metadata, status, created_at, updated_at) + VALUES ( + ${opts.connectionId}, ${opts.organizationId}, ${opts.agentId}, 'slack', + ${sql.json({})}, ${sql.json({})}, ${sql.json({})}, ${opts.status ?? 'active'}, NOW(), NOW() + ) + `; +} + +async function seedBinding(opts: { + organizationId: string; + agentId: string; + channelId: string; + teamId?: string; +}): Promise { + const sql = getTestDb(); + await sql` + INSERT INTO agent_channel_bindings + (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES ( + ${opts.organizationId}, ${opts.agentId}, 'slack', ${opts.channelId}, ${opts.teamId ?? 'T_TEST'}, NOW() + ) + `; +} + +describe('resolveBotDeliveryTargets', () => { + beforeEach(async () => { + await cleanupTestDatabase(); + }); + afterAll(async () => { + await cleanupTestDatabase(); + }); + + it('resolves an active connection to its bound channel', async () => { + const org = await createTestOrganization(); + const agent = await createTestAgent({ organizationId: org.id, agentId: 'crm' }); + await seedSlackConnection({ + organizationId: org.id, + agentId: agent.agentId, + connectionId: 'conn-1', + }); + await seedBinding({ + organizationId: org.id, + agentId: agent.agentId, + channelId: 'slack:C0LEADS', + }); + + const targets = await resolveBotDeliveryTargets(org.id); + + expect(targets).toEqual([ + { connectionId: 'conn-1', platform: 'slack', channelKey: 'slack:C0LEADS' }, + ]); + }); + + it('returns nothing for a connection with no binding', async () => { + const org = await createTestOrganization(); + const agent = await createTestAgent({ organizationId: org.id, agentId: 'crm' }); + await seedSlackConnection({ + organizationId: org.id, + agentId: agent.agentId, + connectionId: 'conn-1', + }); + // No binding seeded. + + expect(await resolveBotDeliveryTargets(org.id)).toEqual([]); + }); + + it('omits inactive connections', async () => { + const org = await createTestOrganization(); + const agent = await createTestAgent({ organizationId: org.id, agentId: 'crm' }); + await seedSlackConnection({ + organizationId: org.id, + agentId: agent.agentId, + connectionId: 'conn-1', + status: 'stopped', + }); + await seedBinding({ + organizationId: org.id, + agentId: agent.agentId, + channelId: 'slack:C0LEADS', + }); + + expect(await resolveBotDeliveryTargets(org.id)).toEqual([]); + }); + + it('prefixes a bare channel id with the platform', async () => { + const org = await createTestOrganization(); + const agent = await createTestAgent({ organizationId: org.id, agentId: 'crm' }); + await seedSlackConnection({ + organizationId: org.id, + agentId: agent.agentId, + connectionId: 'conn-1', + }); + await seedBinding({ + organizationId: org.id, + agentId: agent.agentId, + channelId: 'C0BARE', + }); + + const targets = await resolveBotDeliveryTargets(org.id); + expect(targets).toEqual([ + { connectionId: 'conn-1', platform: 'slack', channelKey: 'slack:C0BARE' }, + ]); + }); + + it('honors the connectionId filter', async () => { + const org = await createTestOrganization(); + const agent = await createTestAgent({ organizationId: org.id, agentId: 'crm' }); + for (const id of ['conn-1', 'conn-2']) { + await seedSlackConnection({ organizationId: org.id, agentId: agent.agentId, connectionId: id }); + } + await seedBinding({ organizationId: org.id, agentId: agent.agentId, channelId: 'slack:C1' }); + + const targets = await resolveBotDeliveryTargets(org.id, 'conn-2'); + expect(targets.map((t) => t.connectionId)).toEqual(['conn-2']); + }); +}); diff --git a/packages/server/src/__tests__/integration/sandbox/run-script-runtime.test.ts b/packages/server/src/__tests__/integration/sandbox/run-script-runtime.test.ts index 76ed9243e..dfb961d45 100644 --- a/packages/server/src/__tests__/integration/sandbox/run-script-runtime.test.ts +++ b/packages/server/src/__tests__/integration/sandbox/run-script-runtime.test.ts @@ -92,6 +92,37 @@ describe("sandbox runtime", () => { expect(result.sdkCalls).toBe(1); }); + it("dispatches client.notifications.send from a reaction script", async () => { + // Guards the gap fix: before `notifications.send` was added to the SDK + + // method-metadata, the sandbox proxy wouldn't advertise it and a reaction + // calling it threw. This proves a reaction can now push a notification. + let captured: unknown; + const stubSdk = { + notifications: { + send: async (input: unknown) => { + captured = input; + return { notified_count: 1 }; + }, + }, + log: () => undefined, + } as unknown as ClientSDK; + + const result = await runScript({ + source: + "export default async (ctx, client) => client.notifications.send({ title: 'Digest', body: 'x', watcher_source: { watcher_id: 7, window_id: 9 } });", + sdk: stubSdk, + }); + + expect(result.success).toBe(true); + expect(result.returnValue).toEqual({ notified_count: 1 }); + expect(result.sdkCalls).toBe(1); + expect(captured).toEqual({ + title: "Digest", + body: "x", + watcher_source: { watcher_id: 7, window_id: 9 }, + }); + }); + it("enforces wall-clock timeout while awaiting SDK calls", async () => { const stubSdk = { entities: { diff --git a/packages/server/src/gateway/__tests__/chat-instance-manager-slack.test.ts b/packages/server/src/gateway/__tests__/chat-instance-manager-slack.test.ts index 8fd0a6f5c..727188c0a 100644 --- a/packages/server/src/gateway/__tests__/chat-instance-manager-slack.test.ts +++ b/packages/server/src/gateway/__tests__/chat-instance-manager-slack.test.ts @@ -191,3 +191,74 @@ describe("ChatInstanceManager Slack marketplace support", () => { ); }); }); + +describe("ChatInstanceManager.postMessageToChannel", () => { + test("posts markdown to the resolved channel as the bot", async () => { + const ChatInstanceManager = await loadChatInstanceManager(); + const manager = new ChatInstanceManager() as any; + const post = mock(async () => ({ ts: "1.2" })); + const channel = mock((_key: string) => ({ post })); + manager.instances.set("conn-1", { chat: { channel } }); + + await manager.postMessageToChannel("conn-1", "slack:C0123ABCD", { + markdown: "Weekly funnel digest", + }); + + expect(channel).toHaveBeenCalledWith("slack:C0123ABCD"); + expect(post).toHaveBeenCalledWith({ markdown: "Weekly funnel digest" }); + }); + + test("posts a rich card built with the chat primitives", async () => { + const { Card, Field, Fields, Actions, LinkButton } = await import("chat"); + const ChatInstanceManager = await loadChatInstanceManager(); + const manager = new ChatInstanceManager() as any; + const post = mock(async () => ({ ts: "2.0" })); + const channel = mock((_key: string) => ({ post })); + manager.instances.set("conn-1", { chat: { channel } }); + + const card = Card({ + title: "Weekly funnel digest", + children: [ + Fields([Field({ label: "New leads", value: "3" })]), + Actions([LinkButton({ url: "https://app.lobu.ai/lobu-crm/entities", label: "View leads" })]), + ], + }); + + await manager.postMessageToChannel("conn-1", "slack:C0123ABCD", { card }); + + expect(post).toHaveBeenCalledWith({ card }); + expect(card.type).toBe("card"); + }); + + test("lazily starts the connection when it isn't loaded on this pod, then posts", async () => { + // Multi-replica: the connection was created/restarted on another pod, so + // this pod has no live instance until we start it from the store. + const ChatInstanceManager = await loadChatInstanceManager(); + const manager = new ChatInstanceManager() as any; + const post = mock(async () => ({ ts: "9.9" })); + const channel = mock((_key: string) => ({ post })); + manager.connectionStore = { + getConnection: async () => ({ id: "conn-x", status: "active" }), + }; + manager.restartConnection = mock(async (id: string) => { + manager.instances.set(id, { chat: { channel } }); + }); + + await manager.postMessageToChannel("conn-x", "slack:C9", { markdown: "hi" }); + + expect(manager.restartConnection).toHaveBeenCalledWith("conn-x"); + expect(channel).toHaveBeenCalledWith("slack:C9"); + expect(post).toHaveBeenCalledWith({ markdown: "hi" }); + }); + + test("throws when the connection is stopped and cannot be started", async () => { + const ChatInstanceManager = await loadChatInstanceManager(); + const manager = new ChatInstanceManager() as any; + manager.connectionStore = { + getConnection: async () => ({ id: "missing", status: "stopped" }), + }; + await expect( + manager.postMessageToChannel("missing", "slack:C0", { markdown: "x" }) + ).rejects.toThrow(/No active chat instance/); + }); +}); diff --git a/packages/server/src/gateway/connections/chat-instance-manager.ts b/packages/server/src/gateway/connections/chat-instance-manager.ts index 141f72ad3..c0eb76f76 100644 --- a/packages/server/src/gateway/connections/chat-instance-manager.ts +++ b/packages/server/src/gateway/connections/chat-instance-manager.ts @@ -14,7 +14,7 @@ import { randomUUID } from "node:crypto"; import type { Readable } from "node:stream"; -import { Chat } from "chat"; +import { type AdapterPostableMessage, Chat } from "chat"; import type { AgentConnectionStore, StoredConnection, @@ -535,6 +535,46 @@ export class ChatInstanceManager { return this.instances.get(id); } + /** + * Post a message to a channel as the bot — a one-shot outbound post, NOT an + * inbound message that triggers an agent run (that's `routePlatformMessage`). + * Used by the notification fan-out (`deliverToBotConnections`) to surface a + * watcher digest / approval in a bound channel. + * + * `content` is any `chat` `AdapterPostableMessage` — `{ markdown }` (rendered + * to each platform's native format rather than HTML-escaped), `{ card }` (a + * `CardElement` → Block Kit / Adaptive Cards / Google Chat Cards), or plain + * text. All ride the same Chat SDK primitives, so one call works across every + * connected platform. + * + * `channelKey` is the platform-prefixed channel id, e.g. "slack:C0123ABCD". + * Multi-replica: a connection created or restarted on another replica has no + * live instance on this pod, so we lazily start it from the store first + * (`ensureConnectionRunning` is a no-op when it's already running and won't + * revive a `stopped` connection). That lets any pod that fires the + * notification deliver it — no cross-pod routing needed. + */ + async postMessageToChannel( + connectionId: string, + channelKey: string, + content: AdapterPostableMessage + ): Promise { + const running = await this.ensureConnectionRunning(connectionId); + const instance = running ? this.instances.get(connectionId) : undefined; + if (!instance) { + throw new Error( + `No active chat instance for connection ${connectionId} (could not start it on this pod)` + ); + } + const channel = instance.chat?.channel?.(channelKey); + if (!channel) { + throw new Error( + `Could not resolve channel ${channelKey} for connection ${connectionId}` + ); + } + await channel.post(content); + } + /** * Surface the channels with stored history for a given connection. Used * by the local-test-default-target route; falls back to constructing a diff --git a/packages/server/src/notifications/service.ts b/packages/server/src/notifications/service.ts index 6049e63b1..fc9804fa6 100644 --- a/packages/server/src/notifications/service.ts +++ b/packages/server/src/notifications/service.ts @@ -1,6 +1,6 @@ +import type { CardElement } from 'chat'; import { getDb, pgTextArray } from '../db/client'; -import { isLobuGatewayRunning } from '../lobu/gateway'; -import { getLobuServiceToken } from '../lobu/service-token'; +import { getChatInstanceManager, isLobuGatewayRunning } from '../lobu/gateway'; import logger from '../utils/logger'; interface CreateNotificationParams { @@ -20,6 +20,12 @@ interface CreateNotificationParams { resourceUrl?: string | null; /** When set, deliver only through this specific bot connection */ connectionId?: string | null; + /** + * Optional rich card (`chat` `CardElement`) for bot-connection delivery. When + * set, the bound channel gets this card instead of the markdown body; the + * in-app inbox entry still uses title/body. + */ + card?: CardElement | null; } interface NotificationRow { @@ -37,84 +43,97 @@ interface NotificationRow { } /** - * Forward a notification to active bot connections via Lobu's messaging API. + * Forward a notification to the org's active chat-bot connections so it lands + * in the bound channel — e.g. a watcher digest posting to #leads. * - * Fetches active connections and their default targets from Lobu's internal API, - * then sends via /api/v1/messaging/send with platform-specific routing. + * Resolves connections + their channel bindings straight from Postgres and + * posts in-process via the chat manager. Every app pod loads every active + * connection at boot, so the locally-held instance can post regardless of + * which pod fired the notification — correct under N>1 replicas, no cross-pod + * routing needed. (The previous implementation called `/api/internal/connections` + * + `/api/v1/messaging/send` over HTTP — both removed in #846, so it had been + * a silent no-op.) + * + * Best-effort: a connection with no live instance or no binding is skipped + * without failing the others. A connection bound to several channels posts to + * each. + */ +export interface BotDeliveryTarget { + connectionId: string; + platform: string; + /** Platform-prefixed channel id ready for `chat.channel()`, e.g. "slack:C0123ABCD". */ + channelKey: string; +} + +/** + * Resolve where a notification should be posted: the org's active chat + * connections JOINed to their channel bindings. A connection with no binding + * has no target and is omitted; a connection bound to several channels yields + * one target each. Exported for testing the delivery path against a real DB. */ +export async function resolveBotDeliveryTargets( + organizationId: string, + connectionId?: string | null +): Promise { + const sql = getDb(); + const rows = (await sql` + SELECT ac.id, ac.platform, b.channel_id + FROM agent_connections ac + JOIN agent_channel_bindings b + ON b.organization_id = ac.organization_id + AND b.agent_id = ac.agent_id + AND b.platform = ac.platform + WHERE ac.organization_id = ${organizationId} + AND ac.status = 'active' + ${connectionId ? sql`AND ac.id = ${connectionId}` : sql``} + -- Deliver in binding-creation order so earlier bindings (the primary + -- channel) are attempted first when an agent is bound to several. + ORDER BY b.created_at ASC + `) as Array<{ id: string; platform: string; channel_id: string }>; + + return rows.map((row) => ({ + connectionId: row.id, + platform: row.platform, + // Bindings store the platform-prefixed id ("slack:C0123ABCD"); older rows + // may hold the bare id, so prefix defensively. + channelKey: row.channel_id.includes(':') + ? row.channel_id + : `${row.platform}:${row.channel_id}`, + })); +} + async function deliverToBotConnections( params: Omit ): Promise { if (!isLobuGatewayRunning()) return; - - const port = process.env.PORT || '8787'; - const lobuBaseUrl = `http://127.0.0.1:${port}/lobu`; + const manager = getChatInstanceManager(); + if (!manager) return; const text = params.body ? `${params.title}\n\n${params.body}` : params.title; + // A rich card takes precedence over the markdown body for the channel post. + const content = params.card ? { card: params.card } : { markdown: text }; try { - // Fetch connections and targets in parallel - const [connRes, targetsRes] = await Promise.all([ - fetch(`${lobuBaseUrl}/api/internal/connections`), - fetch(`${lobuBaseUrl}/api/internal/connections/test-targets`), - ]); - if (!connRes.ok) return; - - const connBody = (await connRes.json()) as { - connections: Array<{ - id: string; - platform: string; - agentId: string; - status: string; - }>; - }; - const targets = targetsRes.ok - ? ((await targetsRes.json()) as Array<{ platform: string; defaultTarget: string }>) - : []; - - const targetMap = new Map(targets.map((t) => [t.platform, t.defaultTarget])); - - let connections = connBody.connections.filter((c) => c.status === 'active'); - if (params.connectionId) { - connections = connections.filter((c) => c.id === params.connectionId); - } - if (connections.length === 0) return; - - // Mint the service token once per org (it's org-scoped, not per-connection). - const token = await getLobuServiceToken(params.organizationId); + const targets = await resolveBotDeliveryTargets( + params.organizationId, + params.connectionId + ); + if (targets.length === 0) return; await Promise.allSettled( - connections.map((conn) => { - const target = targetMap.get(conn.platform); - // Platform-specific routing - const routing: Record = {}; - if (conn.platform === 'telegram' && target) { - routing.telegram = { chatId: target }; - } else if (conn.platform === 'slack' && target) { - routing.slack = { channel: target }; - } - return fetch(`${lobuBaseUrl}/api/v1/messaging/send`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - ...(token ? { Authorization: `Bearer ${token}` } : {}), - }, - body: JSON.stringify({ - agentId: conn.agentId, - message: text, - platform: conn.platform, - ...routing, - }), - }).catch((err) => + targets.map(async ({ connectionId, channelKey }) => { + try { + await manager.postMessageToChannel(connectionId, channelKey, content); + } catch (err) { logger.warn( - { err, connectionId: conn.id }, - '[Notifications] Failed to send via Lobu connection' - ) - ); + { err, connectionId, channelKey }, + '[Notifications] Failed to post to bot connection channel' + ); + } }) ); } catch (err) { - logger.warn({ err }, '[Notifications] Failed to deliver to embedded Lobu'); + logger.warn({ err }, '[Notifications] Failed to deliver to bot connections'); } } diff --git a/packages/server/src/sandbox/client-sdk.ts b/packages/server/src/sandbox/client-sdk.ts index bb344cf8a..f40188baa 100644 --- a/packages/server/src/sandbox/client-sdk.ts +++ b/packages/server/src/sandbox/client-sdk.ts @@ -25,6 +25,7 @@ import { buildEntitySchemaNamespace, buildFeedsNamespace, buildKnowledgeNamespace, + buildNotificationsNamespace, buildOperationsNamespace, buildOrganizationsNamespace, buildViewTemplatesNamespace, @@ -37,6 +38,7 @@ import type { EntitiesNamespace } from "./namespaces/entities"; import type { EntitySchemaNamespace } from "./namespaces/entity-schema"; import type { FeedsNamespace } from "./namespaces/feeds"; import type { KnowledgeNamespace } from "./namespaces/knowledge"; +import type { NotificationsNamespace } from "./namespaces/notifications"; import type { OperationsNamespace } from "./namespaces/operations"; import type { OrganizationsNamespace } from "./namespaces/organizations"; import type { ViewTemplatesNamespace } from "./namespaces/view-templates"; @@ -53,6 +55,7 @@ export interface ClientSDK { classifiers: ClassifiersNamespace; viewTemplates: ViewTemplatesNamespace; knowledge: KnowledgeNamespace; + notifications: NotificationsNamespace; organizations: OrganizationsNamespace; org(slugOrId: string): Promise; @@ -168,6 +171,7 @@ export function buildClientSDK( classifiers: buildClassifiersNamespace(ctx, env), viewTemplates: buildViewTemplatesNamespace(ctx, env), knowledge: buildKnowledgeNamespace(ctx, env), + notifications: buildNotificationsNamespace(ctx, env), organizations: buildOrganizationsNamespace(ctx), }; diff --git a/packages/server/src/sandbox/method-metadata.ts b/packages/server/src/sandbox/method-metadata.ts index 3ed3f70f6..0adaca381 100644 --- a/packages/server/src/sandbox/method-metadata.ts +++ b/packages/server/src/sandbox/method-metadata.ts @@ -205,6 +205,23 @@ export default async (_ctx, client) => { };`, }, + // notifications + "notifications.send": { + summary: + "Send a notification to org users. Writes an `agent_message` notification (in-app inbox) and fans it out to the org's active bot connections (Slack/Telegram) — the way a watcher reaction surfaces its digest to a chat channel. Pass an optional `card` (a `chat` CardElement) for rich cross-platform rendering, and `watcher_source` when firing from a reaction.", + access: "write", + example: + "await client.notifications.send({ title: 'Weekly funnel digest', body: '3 new leads...', watcher_source: { watcher_id: ctx.window.watcher_id, window_id: ctx.window.id } });", + usageExample: `// Push a watcher digest to the org's Slack/Telegram connections + inbox. +export default async (ctx, client) => { + await client.notifications.send({ + title: 'Weekly funnel digest', + body: 'Top action: send the Acme pilot offer\\nNew leads: 3', + watcher_source: { watcher_id: ctx.window.watcher_id, window_id: ctx.window.id }, + }); +};`, + }, + // watchers "watchers.manage": { summary: diff --git a/packages/server/src/sandbox/namespaces/index.ts b/packages/server/src/sandbox/namespaces/index.ts index d0ee1fb9c..bac998c3c 100644 --- a/packages/server/src/sandbox/namespaces/index.ts +++ b/packages/server/src/sandbox/namespaces/index.ts @@ -9,6 +9,7 @@ export { buildEntitiesNamespace } from "./entities"; export { buildEntitySchemaNamespace } from "./entity-schema"; export { buildFeedsNamespace } from "./feeds"; export { buildKnowledgeNamespace } from "./knowledge"; +export { buildNotificationsNamespace } from "./notifications"; export { buildOperationsNamespace } from "./operations"; export { buildOrganizationsNamespace } from "./organizations"; export { buildViewTemplatesNamespace } from "./view-templates"; diff --git a/packages/server/src/sandbox/namespaces/notifications.ts b/packages/server/src/sandbox/namespaces/notifications.ts new file mode 100644 index 000000000..fe16b2bb6 --- /dev/null +++ b/packages/server/src/sandbox/namespaces/notifications.ts @@ -0,0 +1,51 @@ +/** + * ClientSDK `notifications` namespace — a thin wrapper over the `notify` tool + * that lets reactions (and `run_sdk` scripts) write a notification to the inbox + * and fan it out to the org's bot connections (Slack/Telegram). + */ + +import type { CardElement } from "chat"; +import type { Env } from "../../index"; +import { notify } from "../../tools/admin/notify"; +import type { ToolContext } from "../../tools/registry"; +import { createActionCaller } from "./action-call"; + +export interface NotificationsSendInput { + /** Notification title (≤200 chars). */ + title: string; + /** Body text (≤1000 chars). */ + body?: string; + /** + * Optional rich card (`chat` `CardElement`) for bot-connection delivery, + * rendered to each platform's native format (Block Kit / Adaptive Cards / …). + */ + card?: CardElement; + /** + * Who to notify. `"admins"` (default): org admins/owners. `"all"`: every + * member. Or an array of specific user IDs. + */ + recipients?: "admins" | "all" | string[]; + /** Relative URL the notification links to (e.g. `/acme/entities`). */ + resource_url?: string; + /** Deliver only through this specific bot connection (its id). */ + connection_id?: string; + /** Arbitrary JSON payload appended to the body as formatted JSON. */ + data?: Record; + /** Attribution when sent from a watcher reaction — both ids are numeric. */ + watcher_source?: { watcher_id: number; window_id: number }; +} + +export interface NotificationsNamespace { + send(input: NotificationsSendInput): Promise<{ notified_count: number }>; +} + +export function buildNotificationsNamespace( + ctx: ToolContext, + env: Env, +): NotificationsNamespace { + const { action } = createActionCaller(notify, env, ctx); + + return { + send: (input) => action("send", input), + }; +} diff --git a/packages/server/src/tools/admin/notify.ts b/packages/server/src/tools/admin/notify.ts index a588fefde..780862132 100644 --- a/packages/server/src/tools/admin/notify.ts +++ b/packages/server/src/tools/admin/notify.ts @@ -8,6 +8,7 @@ */ import { type Static, Type } from '@sinclair/typebox'; +import type { CardElement } from 'chat'; import { getDb, pgTextArray } from '../../db/client'; import { emit } from '../../events/emitter'; import type { Env } from '../../index'; @@ -53,6 +54,12 @@ const SendAction = Type.Object({ description: 'Arbitrary JSON payload stored in notification body as formatted JSON', }) ), + card: Type.Optional( + Type.Record(Type.String(), Type.Any(), { + description: + 'A `chat` CardElement (built with the card primitives) for rich bot-connection delivery. When set, the bound channel gets this card instead of the markdown body.', + }) + ), watcher_source: Type.Optional( Type.Object( { @@ -134,6 +141,7 @@ async function handleSend( body, resourceUrl: args.resource_url ?? null, connectionId: args.connection_id ?? null, + card: (args.card as CardElement | undefined) ?? null, }); emit(ctx.organizationId, { keys: ['notifications', 'notifications-unread-count'] });