diff --git a/assistant/src/__tests__/non-member-access-request.test.ts b/assistant/src/__tests__/non-member-access-request.test.ts new file mode 100644 index 00000000000..f07120edfa8 --- /dev/null +++ b/assistant/src/__tests__/non-member-access-request.test.ts @@ -0,0 +1,282 @@ +/** + * Tests for the non-member access request notification flow. + * + * When a non-member messages the assistant on a channel, the system should: + * 1. Deny the message with the standard rejection reply + * 2. Notify the guardian (if a guardian binding exists) + * 3. Create a guardian approval request for the access request + * 4. Deduplicate: don't create duplicate requests for repeated messages + */ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; + +import { afterAll, beforeEach, describe, expect, mock, test } from 'bun:test'; + +// --------------------------------------------------------------------------- +// Test isolation: in-memory SQLite via temp directory +// --------------------------------------------------------------------------- + +const testDir = mkdtempSync(join(tmpdir(), 'non-member-access-request-test-')); + +mock.module('../util/platform.js', () => ({ + getRootDir: () => testDir, + getDataDir: () => testDir, + isMacOS: () => process.platform === 'darwin', + isLinux: () => process.platform === 'linux', + isWindows: () => process.platform === 'win32', + getSocketPath: () => join(testDir, 'test.sock'), + getPidPath: () => join(testDir, 'test.pid'), + getDbPath: () => join(testDir, 'test.db'), + getLogPath: () => join(testDir, 'test.log'), + ensureDataDir: () => {}, + normalizeAssistantId: (id: string) => id === 'self' ? 'self' : id, + readHttpToken: () => 'test-bearer-token', +})); + +mock.module('../util/logger.js', () => ({ + getLogger: () => new Proxy({} as Record, { + get: () => () => {}, + }), +})); + +// Mock security check to always pass +mock.module('../security/secret-ingress.js', () => ({ + checkIngressForSecrets: () => ({ blocked: false }), +})); + +// Mock ingress member store: findMember always returns null (non-member), +// updateLastSeen is a no-op. +mock.module('../memory/ingress-member-store.js', () => ({ + findMember: () => null, + updateLastSeen: () => {}, + upsertMember: () => {}, +})); + +mock.module('../config/env.js', () => ({ + getGatewayInternalBaseUrl: () => 'http://127.0.0.1:7830', +})); + +// Track emitNotificationSignal calls +const emitSignalCalls: Array> = []; +mock.module('../notifications/emit-signal.js', () => ({ + emitNotificationSignal: async (params: Record) => { + emitSignalCalls.push(params); + return { + signalId: 'mock-signal-id', + deduplicated: false, + dispatched: true, + reason: 'mock', + deliveryResults: [], + }; + }, +})); + +// Track deliverChannelReply calls +const deliverReplyCalls: Array<{ url: string; payload: Record }> = []; +mock.module('../runtime/gateway-client.js', () => ({ + deliverChannelReply: async (url: string, payload: Record) => { + deliverReplyCalls.push({ url, payload }); + }, +})); + +import { + createBinding, + findPendingAccessRequestForRequester, +} from '../memory/channel-guardian-store.js'; +import { initializeDb, resetDb } from '../memory/db.js'; +import { handleChannelInbound } from '../runtime/routes/channel-routes.js'; + +initializeDb(); + +afterAll(() => { + resetDb(); + try { rmSync(testDir, { recursive: true }); } catch { /* best effort */ } +}); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const TEST_BEARER_TOKEN = 'test-token'; + +function resetState(): void { + const { getDb } = require('../memory/db.js'); + const db = getDb(); + db.run('DELETE FROM channel_guardian_approval_requests'); + db.run('DELETE FROM channel_guardian_bindings'); + db.run('DELETE FROM channel_inbound_events'); + db.run('DELETE FROM conversations'); + db.run('DELETE FROM notification_events'); + emitSignalCalls.length = 0; + deliverReplyCalls.length = 0; +} + +function buildInboundRequest(overrides: Record = {}): Request { + const body: Record = { + sourceChannel: 'telegram', + interface: 'telegram', + externalChatId: 'chat-123', + externalMessageId: `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + content: 'Hello, can I use this assistant?', + senderExternalUserId: 'user-unknown-456', + senderName: 'Alice Unknown', + senderUsername: 'alice_unknown', + replyCallbackUrl: 'http://localhost:7830/deliver/telegram', + ...overrides, + }; + + return new Request('http://localhost:8080/channels/inbound', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Gateway-Origin': TEST_BEARER_TOKEN, + }, + body: JSON.stringify(body), + }); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('non-member access request notification', () => { + beforeEach(() => { + resetState(); + }); + + test('non-member message is denied with rejection reply', async () => { + const req = buildInboundRequest(); + const resp = await handleChannelInbound(req, undefined, TEST_BEARER_TOKEN); + const json = await resp.json() as Record; + + expect(json.denied).toBe(true); + expect(json.reason).toBe('not_a_member'); + + // Rejection reply was delivered + expect(deliverReplyCalls.length).toBe(1); + expect((deliverReplyCalls[0].payload as Record).text).toContain("you haven't been approved"); + }); + + test('guardian is notified when a non-member messages and a guardian binding exists', async () => { + // Set up a guardian binding for this channel + createBinding({ + assistantId: 'self', + channel: 'telegram', + guardianExternalUserId: 'guardian-user-789', + guardianDeliveryChatId: 'guardian-chat-789', + }); + + const req = buildInboundRequest(); + const resp = await handleChannelInbound(req, undefined, TEST_BEARER_TOKEN); + const json = await resp.json() as Record; + + // Message is still denied + expect(json.denied).toBe(true); + expect(json.reason).toBe('not_a_member'); + + // Rejection reply was delivered + expect(deliverReplyCalls.length).toBe(1); + + // A notification signal was emitted + expect(emitSignalCalls.length).toBe(1); + expect(emitSignalCalls[0].sourceEventName).toBe('ingress.access_request'); + expect(emitSignalCalls[0].sourceChannel).toBe('telegram'); + const payload = emitSignalCalls[0].contextPayload as Record; + expect(payload.senderExternalUserId).toBe('user-unknown-456'); + expect(payload.senderName).toBe('Alice Unknown'); + + // An approval request was created + const pending = findPendingAccessRequestForRequester( + 'self', + 'telegram', + 'user-unknown-456', + 'ingress_access_request', + ); + expect(pending).not.toBeNull(); + expect(pending!.status).toBe('pending'); + expect(pending!.requesterExternalUserId).toBe('user-unknown-456'); + expect(pending!.guardianExternalUserId).toBe('guardian-user-789'); + expect(pending!.toolName).toBe('ingress_access_request'); + }); + + test('no duplicate approval requests for repeated messages from same non-member', async () => { + createBinding({ + assistantId: 'self', + channel: 'telegram', + guardianExternalUserId: 'guardian-user-789', + guardianDeliveryChatId: 'guardian-chat-789', + }); + + // First message + const req1 = buildInboundRequest(); + await handleChannelInbound(req1, undefined, TEST_BEARER_TOKEN); + + // Second message from the same user + const req2 = buildInboundRequest({ + externalMessageId: `msg-second-${Date.now()}`, + content: 'Please let me in!', + }); + await handleChannelInbound(req2, undefined, TEST_BEARER_TOKEN); + + // Both messages should be denied with rejection replies + expect(deliverReplyCalls.length).toBe(2); + + // Only one notification signal should be emitted (second is deduplicated) + expect(emitSignalCalls.length).toBe(1); + + // Only one approval request should exist + const pending = findPendingAccessRequestForRequester( + 'self', + 'telegram', + 'user-unknown-456', + 'ingress_access_request', + ); + expect(pending).not.toBeNull(); + }); + + test('deny works without error when no guardian binding exists', async () => { + // No guardian binding — should deny without notification + const req = buildInboundRequest(); + const resp = await handleChannelInbound(req, undefined, TEST_BEARER_TOKEN); + const json = await resp.json() as Record; + + expect(json.denied).toBe(true); + expect(json.reason).toBe('not_a_member'); + + // Rejection reply was still delivered + expect(deliverReplyCalls.length).toBe(1); + + // No notification signal was emitted + expect(emitSignalCalls.length).toBe(0); + + // No approval request was created + const pending = findPendingAccessRequestForRequester( + 'self', + 'telegram', + 'user-unknown-456', + 'ingress_access_request', + ); + expect(pending).toBeNull(); + }); + + test('no notification when senderExternalUserId is absent', async () => { + createBinding({ + assistantId: 'self', + channel: 'telegram', + guardianExternalUserId: 'guardian-user-789', + guardianDeliveryChatId: 'guardian-chat-789', + }); + + // Message without senderExternalUserId — can't identify the requester. + // The ACL check requires senderExternalUserId to look up members, + // so without it the non-member gate is bypassed entirely. + const req = buildInboundRequest({ + senderExternalUserId: undefined, + }); + await handleChannelInbound(req, undefined, TEST_BEARER_TOKEN); + + // No access request notification should fire (no identity to notify about) + expect(emitSignalCalls.length).toBe(0); + }); +}); diff --git a/assistant/src/memory/channel-guardian-store.ts b/assistant/src/memory/channel-guardian-store.ts index 4478dce9c25..1f777993985 100644 --- a/assistant/src/memory/channel-guardian-store.ts +++ b/assistant/src/memory/channel-guardian-store.ts @@ -724,6 +724,40 @@ export function createApprovalRequest(params: { return rowToApprovalRequest(row); } +/** + * Check for an existing pending (non-expired) approval request for a specific + * requester on a channel. Used to deduplicate access requests — repeated + * messages from the same non-member should not create duplicate approval + * requests while one is already pending. + */ +export function findPendingAccessRequestForRequester( + assistantId: string, + channel: string, + requesterExternalUserId: string, + toolName: string, +): GuardianApprovalRequest | null { + const db = getDb(); + const now = Date.now(); + + const row = db + .select() + .from(channelGuardianApprovalRequests) + .where( + and( + eq(channelGuardianApprovalRequests.assistantId, assistantId), + eq(channelGuardianApprovalRequests.channel, channel), + eq(channelGuardianApprovalRequests.requesterExternalUserId, requesterExternalUserId), + eq(channelGuardianApprovalRequests.toolName, toolName), + eq(channelGuardianApprovalRequests.status, 'pending'), + gt(channelGuardianApprovalRequests.expiresAt, now), + ), + ) + .orderBy(desc(channelGuardianApprovalRequests.createdAt)) + .get(); + + return row ? rowToApprovalRequest(row) : null; +} + export function getPendingApprovalForRun(runId: string): GuardianApprovalRequest | null { const db = getDb(); const now = Date.now(); diff --git a/assistant/src/runtime/routes/inbound-message-handler.ts b/assistant/src/runtime/routes/inbound-message-handler.ts index 70110e6bc15..52112d2bcb9 100644 --- a/assistant/src/runtime/routes/inbound-message-handler.ts +++ b/assistant/src/runtime/routes/inbound-message-handler.ts @@ -12,6 +12,7 @@ import * as attachmentsStore from '../../memory/attachments-store.js'; import * as channelDeliveryStore from '../../memory/channel-delivery-store.js'; import { createApprovalRequest, + findPendingAccessRequestForRequester, } from '../../memory/channel-guardian-store.js'; import { recordConversationSeenSignal } from '../../memory/conversation-attention-store.js'; import * as conversationStore from '../../memory/conversation-store.js'; @@ -270,6 +271,19 @@ export async function handleChannelInbound( log.error({ err, externalChatId }, 'Failed to deliver ACL rejection reply'); } } + + // Notify the guardian about the access request so they can approve/deny. + // Only fires when a guardian binding exists and no duplicate pending + // request already exists for this requester. + notifyGuardianOfAccessRequest({ + canonicalAssistantId, + sourceChannel, + externalChatId, + senderExternalUserId: body.senderExternalUserId, + senderName: body.senderName, + senderUsername: body.senderUsername, + }); + return Response.json({ accepted: true, denied: true, reason: 'not_a_member' }); } } @@ -1028,6 +1042,104 @@ export async function handleChannelInbound( }); } +// --------------------------------------------------------------------------- +// Non-member access request notification +// --------------------------------------------------------------------------- + +/** + * Fire-and-forget: look up the guardian binding and, if present, create an + * approval request + emit a notification signal so the guardian can + * approve/deny the unknown user. Deduplicates by checking for an existing + * pending approval for the same (requester, assistant, channel). + */ +function notifyGuardianOfAccessRequest(params: { + canonicalAssistantId: string; + sourceChannel: ChannelId; + externalChatId: string; + senderExternalUserId?: string; + senderName?: string; + senderUsername?: string; +}): void { + const { + canonicalAssistantId, + sourceChannel, + externalChatId, + senderExternalUserId, + senderName, + senderUsername, + } = params; + + if (!senderExternalUserId) return; + + const binding = getGuardianBinding(canonicalAssistantId, sourceChannel); + if (!binding) { + log.debug({ sourceChannel, canonicalAssistantId }, 'No guardian binding for access request notification'); + return; + } + + // Deduplicate: skip if there is already a pending approval request for + // the same requester on this channel. + const existing = findPendingAccessRequestForRequester( + canonicalAssistantId, + sourceChannel, + senderExternalUserId, + 'ingress_access_request', + ); + if (existing) { + log.debug( + { sourceChannel, senderExternalUserId, existingId: existing.id }, + 'Skipping duplicate access request notification', + ); + return; + } + + const senderIdentifier = senderName || senderUsername || senderExternalUserId; + + createApprovalRequest({ + runId: `ingress-access-request-${Date.now()}`, + conversationId: `access-req-${sourceChannel}-${senderExternalUserId}`, + assistantId: canonicalAssistantId, + channel: sourceChannel, + requesterExternalUserId: senderExternalUserId, + requesterChatId: externalChatId, + guardianExternalUserId: binding.guardianExternalUserId, + guardianChatId: binding.guardianDeliveryChatId, + toolName: 'ingress_access_request', + riskLevel: 'access_request', + reason: `${senderIdentifier} is requesting access to the assistant`, + expiresAt: Date.now() + GUARDIAN_APPROVAL_TTL_MS, + }); + + void emitNotificationSignal({ + sourceEventName: 'ingress.access_request', + sourceChannel, + sourceSessionId: `access-req-${sourceChannel}-${senderExternalUserId}`, + assistantId: canonicalAssistantId, + attentionHints: { + requiresAction: true, + urgency: 'high', + isAsyncBackground: false, + visibleInSourceNow: false, + }, + contextPayload: { + sourceChannel, + externalChatId, + senderExternalUserId, + senderName: senderName ?? null, + senderUsername: senderUsername ?? null, + senderIdentifier, + }, + // Deduplicate at the notification pipeline level too, keyed on the + // requester identity so repeated messages don't flood the guardian. + dedupeKey: `access-request:${canonicalAssistantId}:${sourceChannel}:${senderExternalUserId}`, + }); + + log.info( + { sourceChannel, senderExternalUserId, senderIdentifier }, + 'Guardian notified of non-member access request', + ); +} + // --------------------------------------------------------------------------- // Background message processing // ---------------------------------------------------------------------------