Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ All chat platforms (Telegram, Slack, Discord, WhatsApp, Teams) run through Chat

**Webhooks via the Chat SDK adapter are the default transport.** Don't add new per-platform alternative transports (Slack Socket Mode, Discord Gateway WebSocket bridges, etc.) or extra runtime SDKs. The lone exception is Telegram, whose connection config exposes an optional `polling` mode (`mode: "auto" | "webhook" | "polling"`) implemented inside the Chat SDK adapter — still no extra SDK. Local dev for webhook-only platforms uses a tunnel (cloudflared / ngrok / Tailscale Funnel); Lobu Cloud users get a public URL for free. Sticking to the Chat SDK keeps one delivery story, one set of retries, and zero extra dependencies.

`mode: "polling"` is rejected at connection-create time when `LOBU_CLOUD_MODE=1` — a polling worker long-polls Telegram's edge from the gateway pod and shares that connection across tenants, so a misbehaving polling connection in one org degrades delivery for every other tenant. Self-hosters (`LOBU_CLOUD_MODE` unset/0) keep the polling option for tunnel-less dev.

#### Orchestration
- **Embedded-only deployment.** Gateway, workers, embeddings, and the Lobu memory backend run in a single Node process (`lobu run`, or `bun run dev` in the monorepo). Workers spawn as `child_process.spawn` subprocesses on the same host; on Linux the spawn path uses `systemd-run --user --scope` for cgroup limits + IPAddressDeny + capability drops. There is no Docker or Kubernetes deployment manager.
- Postgres (with `pgvector`; optionally `postgis` for geo enrichment) is the only user-provided external. The Node process connects out via `DATABASE_URL`. Runtime state — queues, chat connection rows, grant cache, MCP proxy sessions — lives in dedicated Postgres tables.
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/agent-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ export interface AgentMetadata {
owner: { platform: string; userId: string };
isWorkspaceAgent?: boolean;
workspaceId?: string;
/**
* Owning organization id. Optional in the type for back-compat with
* in-memory stores that predate per-tenant scoping; populated by the
* postgres-backed store. The public Agent API route reads this to stamp
* worker tokens with the agent's org so the egress proxy can scope
* per-tenant gates (grants, judge cache, judge policy).
*/
organizationId?: string;
createdAt: number;
lastUsedAt?: number;
}
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/worker/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ export interface WorkerTokenData {
channelId: string;
teamId?: string;
agentId?: string;
/**
* Owning organization of the agent the token was minted for. Used by the
* HTTP proxy to scope per-tenant caches (e.g. egress-judge verdict cache)
* so org A's decisions can never satisfy org B's requests. Optional only
* because some internal/preflight call sites mint tokens before the owning
* org has been resolved; production agent runs always set it.
*/
organizationId?: string;
connectionId?: string;
deploymentName: string;
timestamp: number;
Expand All @@ -33,6 +41,7 @@ export function generateWorkerToken(
channelId: string;
teamId?: string;
agentId?: string;
organizationId?: string;
connectionId?: string;
platform?: string;
sessionKey?: string;
Expand All @@ -49,6 +58,7 @@ export function generateWorkerToken(
channelId: options.channelId,
teamId: options.teamId,
agentId: options.agentId,
organizationId: options.organizationId,
connectionId: options.connectionId,
deploymentName,
timestamp: Date.now(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe("EgressJudge timeout", () => {
const judge = new EgressJudge({ client, judgeTimeoutMs: 30 });
const started = Date.now();
const decision = await judge.decide(
{ agentId: "agent-a", hostname: "api.github.com" },
{ agentId: "agent-a", organizationId: "org-1", hostname: "api.github.com" },
rule()
);
const elapsed = Date.now() - started;
Expand All @@ -62,7 +62,7 @@ describe("EgressJudge timeout", () => {
// Distinct hostnames so the verdict cache never short-circuits the path.
for (let i = 0; i < 5; i++) {
const decision = await judge.decide(
{ agentId: "agent-a", hostname: `h${i}.example.com` },
{ agentId: "agent-a", organizationId: "org-1", hostname: `h${i}.example.com` },
rule()
);
expect(decision.verdict).toBe("deny");
Expand All @@ -72,7 +72,7 @@ describe("EgressJudge timeout", () => {
expect(client.calls).toBe(2);

const afterOpen = await judge.decide(
{ agentId: "agent-a", hostname: "another.example.com" },
{ agentId: "agent-a", organizationId: "org-1", hostname: "another.example.com" },
rule()
);
expect(afterOpen.verdict).toBe("deny");
Expand All @@ -88,7 +88,7 @@ describe("EgressJudge timeout", () => {
const judge = new EgressJudge({ client });
const started = Date.now();
const decision = await judge.decide(
{ agentId: "agent-a", hostname: "api.github.com" },
{ agentId: "agent-a", organizationId: "org-1", hostname: "api.github.com" },
rule()
);
expect(decision.verdict).toBe("deny");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ describe("BaseDeploymentManager.syncNetworkConfigGrants", () => {
})
);

let resolved = policyStore.resolve("agent-1", "api.example.com");
let resolved = policyStore.resolve("test-org", "agent-1", "api.example.com");
expect(resolved?.policy).toContain("initial policy");
expect(resolved?.policy).toContain("operator policy");

Expand All @@ -173,12 +173,12 @@ describe("BaseDeploymentManager.syncNetworkConfigGrants", () => {
})
);

resolved = policyStore.resolve("agent-1", "api.example.com");
resolved = policyStore.resolve("test-org", "agent-1", "api.example.com");
expect(resolved?.policy).toContain("updated policy");
expect(resolved?.policy).not.toContain("initial policy");

await barebones.syncNetworkConfigGrants(buildPayload({}));
expect(policyStore.resolve("agent-1", "api.example.com")).toBeUndefined();
expect(policyStore.resolve("test-org", "agent-1", "api.example.com")).toBeUndefined();
});

test("skips redundant writes when the pattern set has not changed", async () => {
Expand Down
15 changes: 12 additions & 3 deletions packages/server/src/gateway/__tests__/egress-judge-cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ describe("VerdictCache", () => {
test("stores and retrieves a verdict", () => {
const cache = new VerdictCache(60_000, 100);
const key = VerdictCache.key({
orgId: "org-1",
policyHash: "abc",
hostname: "example.com",
method: "GET",
Expand All @@ -21,11 +22,13 @@ describe("VerdictCache", () => {

test("key is case-insensitive for hostname and method", () => {
const a = VerdictCache.key({
orgId: "org-1",
policyHash: "h",
hostname: "Example.COM",
method: "get",
});
const b = VerdictCache.key({
orgId: "org-1",
policyHash: "h",
hostname: "example.com",
method: "GET",
Expand All @@ -34,14 +37,20 @@ describe("VerdictCache", () => {
});

test("different policy hashes do not collide", () => {
const a = VerdictCache.key({ policyHash: "h1", hostname: "x.com" });
const b = VerdictCache.key({ policyHash: "h2", hostname: "x.com" });
const a = VerdictCache.key({ orgId: "org-1", policyHash: "h1", hostname: "x.com" });
const b = VerdictCache.key({ orgId: "org-1", policyHash: "h2", hostname: "x.com" });
expect(a).not.toBe(b);
});

test("different orgIds do not collide even for identical policy+request", () => {
const a = VerdictCache.key({ orgId: "org-a", policyHash: "h", hostname: "x.com" });
const b = VerdictCache.key({ orgId: "org-b", policyHash: "h", hostname: "x.com" });
expect(a).not.toBe(b);
});

test("expires entries after the TTL", async () => {
const cache = new VerdictCache(10, 100);
const key = VerdictCache.key({ policyHash: "h", hostname: "x.com" });
const key = VerdictCache.key({ orgId: "org-1", policyHash: "h", hostname: "x.com" });
cache.set(key, { verdict: "allow", reason: "ok" });
expect(cache.get(key)).toBeDefined();
await new Promise((r) => setTimeout(r, 20));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ beforeAll(async () => {
// `example.com` is used for the allow-path HTTP tests because its real
// server closes the socket after responding — tests that use hosts with
// keep-alive (api.github.com, etc.) hang the raw socket reader.
policyStore.set("agent-a", {
policyStore.set("org-a", "agent-a", {
judgedDomains: [
{ domain: "example.com" },
{ domain: ".slack.com", judge: "strict" },
Expand Down Expand Up @@ -86,6 +86,7 @@ function createValidToken(deploymentName: string): string {
channelId: "test-channel",
platform: "test",
agentId: "agent-a",
organizationId: "org-a",
});
}

Expand Down
Loading
Loading