diff --git a/.gitignore b/.gitignore index 5b613a25f..942c1b2d3 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,9 @@ logs/security/ data/* !data/.gitkeep +# Test and coverage outputs +coverage/ + # Verification scripts verify-*.js VERIFICATION_GUIDE.md diff --git a/biome.json b/biome.json index c50e2f6a7..21f3fd9a5 100644 --- a/biome.json +++ b/biome.json @@ -1,25 +1,19 @@ { "$schema": "https://biomejs.dev/schemas/2.3.14/schema.json", - "assist": { - "actions": { - "source": { - "organizeImports": "on" - } - } + "files": { + "includes": [ + "src/**/*.js", + "tests/**/*.js", + "!node_modules/**", + "!coverage/**", + "!logs/**", + "!data/**" + ] }, "linter": { "enabled": true, "rules": { - "recommended": true, - "correctness": { - "noUnusedVariables": "warn", - "noUnusedImports": "warn" - }, - "style": { - "useConst": "error" - }, "suspicious": { - "noVar": "error", "noConsole": "error" } } @@ -28,8 +22,7 @@ "enabled": true, "indentStyle": "space", "indentWidth": 2, - "lineWidth": 100, - "lineEnding": "lf" + "lineWidth": 100 }, "javascript": { "formatter": { @@ -37,8 +30,5 @@ "trailingCommas": "all", "semicolons": "always" } - }, - "files": { - "includes": ["**/*.js", "**/*.json", "**/*.md", "!coverage"] } } diff --git a/config.json b/config.json index 3e79b24f4..1c0bca0fc 100644 --- a/config.json +++ b/config.json @@ -4,7 +4,9 @@ "model": "claude-sonnet-4-20250514", "maxTokens": 1024, "systemPrompt": "You are Volvox Bot, the friendly AI assistant for the Volvox developer community Discord server.\n\nYou're witty, snarky (but warm), and deeply knowledgeable about programming, software development, and tech.\n\nKey traits:\n- Helpful but not boring\n- Can roast people lightly when appropriate\n- Enthusiastic about cool tech and projects\n- Supportive of beginners learning to code\n- Concise - this is Discord, not an essay\n\n⚠️ CRITICAL RULES:\n- NEVER type @.everyone or @.here (remove the dots) - these ping hundreds of people\n- NEVER use mass mention pings under any circumstances\n- If you need to address the group, say \"everyone\" or \"folks\" without the @ symbol\n\nKeep responses under 2000 chars. Use Discord markdown when helpful.", - "channels": [] + "channels": [], + "historyLength": 20, + "historyTTLDays": 30 }, "chimeIn": { "enabled": false, diff --git a/package.json b/package.json index 1ca19d889..53a638271 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bill-bot", - "packageManager": "pnpm@10.29.2", + "packageManager": "pnpm@10.28.2", "version": "1.0.0", "description": "Volvox Discord bot - AI chat, welcome messages, and moderation", "main": "src/index.js", @@ -9,11 +9,12 @@ "start": "node src/index.js", "dev": "node --watch src/index.js", "deploy": "node src/deploy-commands.js", - "lint": "biome check .", - "lint:fix": "biome check . --fix", - "format": "biome format . --write", "test": "vitest run", - "test:coverage": "vitest run --coverage" + "test:watch": "vitest", + "test:coverage": "vitest run --coverage", + "lint": "biome check .", + "lint:fix": "biome check . --write", + "format": "biome format . --write" }, "dependencies": { "discord.js": "^14.25.1", diff --git a/src/db.js b/src/db.js index 55b210025..7551e9481 100644 --- a/src/db.js +++ b/src/db.js @@ -52,10 +52,10 @@ function getSslConfig(connectionString) { * @returns {Promise} The connection pool */ export async function initDb() { - if (pool) return pool; if (initializing) { throw new Error('initDb is already in progress'); } + if (pool) return pool; initializing = true; try { @@ -96,6 +96,27 @@ export async function initDb() { ) `); + await pool.query(` + CREATE TABLE IF NOT EXISTS conversations ( + id SERIAL PRIMARY KEY, + channel_id TEXT NOT NULL, + role TEXT NOT NULL CHECK (role IN ('user', 'assistant', 'system')), + content TEXT NOT NULL, + username TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() + ) + `); + + await pool.query(` + CREATE INDEX IF NOT EXISTS idx_conversations_channel_created + ON conversations (channel_id, created_at) + `); + + await pool.query(` + CREATE INDEX IF NOT EXISTS idx_conversations_created_at + ON conversations (created_at) + `); + info('Database schema initialized'); } catch (err) { // Clean up the pool so getPool() doesn't return an unusable instance diff --git a/src/index.js b/src/index.js index 00cca6eeb..a6d94b0bb 100644 --- a/src/index.js +++ b/src/index.js @@ -18,7 +18,14 @@ import { Client, Collection, Events, GatewayIntentBits } from 'discord.js'; import { config as dotenvConfig } from 'dotenv'; import { closeDb, initDb } from './db.js'; import { error, info, warn } from './logger.js'; -import { getConversationHistory, setConversationHistory } from './modules/ai.js'; +import { + getConversationHistory, + initConversationHistory, + setConversationHistory, + setPool, + startConversationCleanup, + stopConversationCleanup, +} from './modules/ai.js'; import { loadConfig } from './modules/config.js'; import { registerEventHandlers } from './modules/events.js'; import { HealthMonitor } from './utils/health.js'; @@ -234,11 +241,14 @@ async function gracefulShutdown(signal) { } } - // 2. Save state after pending requests complete + // 2. Stop conversation cleanup timer + stopConversationCleanup(); + + // 3. Save state after pending requests complete info('Saving conversation state'); saveState(); - // 3. Close database pool + // 4. Close database pool info('Closing database connection'); try { await closeDb(); @@ -246,11 +256,11 @@ async function gracefulShutdown(signal) { error('Failed to close database pool', { error: err.message }); } - // 4. Destroy Discord client + // 5. Destroy Discord client info('Disconnecting from Discord'); client.destroy(); - // 5. Log clean exit + // 6. Log clean exit info('Shutdown complete'); process.exit(0); } @@ -275,7 +285,6 @@ process.on('unhandledRejection', (err) => { type: typeof err, }); }); - // Start bot const token = process.env.DISCORD_TOKEN; if (!token) { @@ -294,8 +303,9 @@ if (!token) { */ async function startup() { // Initialize database + let dbPool = null; if (process.env.DATABASE_URL) { - await initDb(); + dbPool = await initDb(); info('Database initialized'); } else { warn('DATABASE_URL not set — using config.json only (no persistence)'); @@ -305,9 +315,22 @@ async function startup() { config = await loadConfig(); info('Configuration loaded', { sections: Object.keys(config) }); - // Load previous conversation state + // Set up AI module's DB pool reference + if (dbPool) { + setPool(dbPool); + } + + // TODO: loadState() is migration-only for file->DB persistence transition. + // When DB is available, initConversationHistory() effectively overwrites this state. + // Once all environments are DB-backed, remove this call and loadState/saveState helpers. loadState(); + // Hydrate conversation history from DB (overwrites file state if DB is available) + await initConversationHistory(); + + // Start periodic conversation cleanup + startConversationCleanup(); + // Register event handlers with live config reference registerEventHandlers(client, config, healthMonitor); diff --git a/src/modules/ai.js b/src/modules/ai.js index cf8a18401..6d28e226c 100644 --- a/src/modules/ai.js +++ b/src/modules/ai.js @@ -1,13 +1,91 @@ /** * AI Module * Handles AI chat functionality powered by Claude via OpenClaw + * Conversation history is persisted to PostgreSQL with in-memory cache */ -import { info, error as logError } from '../logger.js'; +import { info, error as logError, warn as logWarn } from '../logger.js'; +import { getConfig } from './config.js'; -// Conversation history per channel (simple in-memory store) +// Conversation history per channel (in-memory cache) let conversationHistory = new Map(); -const MAX_HISTORY = 20; + +/** Default history length if not configured */ +const DEFAULT_HISTORY_LENGTH = 20; + +/** Default TTL in days for conversation cleanup */ +const DEFAULT_HISTORY_TTL_DAYS = 30; + +/** Cleanup interval: 6 hours in milliseconds */ +const CLEANUP_INTERVAL_MS = 6 * 60 * 60 * 1000; + +/** Reference to the cleanup interval timer */ +let cleanupTimer = null; + +/** In-flight async hydrations keyed by channel ID (dedupes concurrent DB reads) */ +const pendingHydrations = new Map(); + +/** + * Get the configured history length from config + * @returns {number} History length + */ +function getHistoryLength() { + try { + const config = getConfig(); + const len = config?.ai?.historyLength; + if (typeof len === 'number' && len > 0) return len; + } catch { + // Config not loaded yet, use default + } + return DEFAULT_HISTORY_LENGTH; +} + +/** + * Get the configured TTL days from config + * @returns {number} TTL in days + */ +function getHistoryTTLDays() { + try { + const config = getConfig(); + const ttl = config?.ai?.historyTTLDays; + if (typeof ttl === 'number' && ttl > 0) return ttl; + } catch { + // Config not loaded yet, use default + } + return DEFAULT_HISTORY_TTL_DAYS; +} + +// Use a lazy-loaded pool getter to avoid import issues +let _getPoolFn = null; + +/** + * Set the pool getter function (for dependency injection / testing) + * @param {Function} fn - Function that returns the pool or null + */ +export function _setPoolGetter(fn) { + _getPoolFn = fn; +} + +/** + * Get the database pool safely + * @returns {import('pg').Pool|null} + */ +function getPool() { + if (_getPoolFn) return _getPoolFn(); + return _poolRef; +} + +/** @type {import('pg').Pool|null} */ +let _poolRef = null; + +/** + * Initialize the pool reference for the AI module + * Called during startup after DB is initialized + * @param {import('pg').Pool|null} pool + */ +export function setPool(pool) { + _poolRef = pool; +} /** * Get the full conversation history map (for state persistence) @@ -23,11 +101,10 @@ export function getConversationHistory() { */ export function setConversationHistory(history) { conversationHistory = history; + pendingHydrations.clear(); } // OpenClaw API endpoint/token (exported for shared use by other modules) -// Preferred env vars: OPENCLAW_API_URL + OPENCLAW_API_KEY -// Backward-compatible aliases: OPENCLAW_URL + OPENCLAW_TOKEN export const OPENCLAW_URL = process.env.OPENCLAW_API_URL || process.env.OPENCLAW_URL || @@ -35,31 +112,259 @@ export const OPENCLAW_URL = export const OPENCLAW_TOKEN = process.env.OPENCLAW_API_KEY || process.env.OPENCLAW_TOKEN || ''; /** - * Get or create conversation history for a channel + * Hydrate conversation history for a channel from DB. + * Dedupes concurrent hydrations and merges DB rows with in-flight in-memory writes. * @param {string} channelId - Channel ID - * @returns {Array} Conversation history + * @returns {Promise} Conversation history */ -export function getHistory(channelId) { +function hydrateHistory(channelId) { + const pending = pendingHydrations.get(channelId); + if (pending) { + return pending; + } + if (!conversationHistory.has(channelId)) { conversationHistory.set(channelId, []); } - return conversationHistory.get(channelId); + + const historyRef = conversationHistory.get(channelId); + const pool = getPool(); + if (!pool) { + return Promise.resolve(historyRef); + } + + const limit = getHistoryLength(); + const hydrationPromise = pool + .query( + `SELECT role, content FROM conversations + WHERE channel_id = $1 + ORDER BY created_at DESC + LIMIT $2`, + [channelId, limit], + ) + .then(({ rows }) => { + if (rows.length > 0) { + const dbHistory = rows.reverse().map((row) => ({ + role: row.role, + content: row.content, + })); + + // Merge DB history with any messages added while hydration was in-flight. + const arr = conversationHistory.get(channelId) || historyRef; + const merged = [...dbHistory, ...arr]; + + // Mutate the existing array in-place so callers holding references + // (e.g. getHistoryAsync callers) observe hydrated contents. + arr.length = 0; + arr.push(...merged.slice(-limit)); + + info('Hydrated history from DB for channel', { + channelId, + count: dbHistory.length, + merged: merged.length, + }); + } + + return conversationHistory.get(channelId) || historyRef; + }) + .catch((err) => { + logWarn('Failed to load history from DB, using in-memory only', { + channelId, + error: err.message, + }); + return conversationHistory.get(channelId) || historyRef; + }) + .finally(() => { + pendingHydrations.delete(channelId); + }); + + pendingHydrations.set(channelId, hydrationPromise); + return hydrationPromise; +} + +/** + * Async version of history retrieval that waits for in-flight hydration. + * @param {string} channelId - Channel ID + * @returns {Promise} Conversation history + */ +export async function getHistoryAsync(channelId) { + if (conversationHistory.has(channelId)) { + const pending = pendingHydrations.get(channelId); + if (pending) { + await pending; + } + return conversationHistory.get(channelId); + } + + return hydrateHistory(channelId); } /** * Add message to conversation history + * Writes to both in-memory cache and DB (write-through) * @param {string} channelId - Channel ID * @param {string} role - Message role (user/assistant) * @param {string} content - Message content + * @param {string} [username] - Optional username */ -export function addToHistory(channelId, role, content) { - const history = getHistory(channelId); +export function addToHistory(channelId, role, content, username) { + if (!conversationHistory.has(channelId)) { + conversationHistory.set(channelId, []); + } + const history = conversationHistory.get(channelId); history.push({ role, content }); - // Trim old messages - while (history.length > MAX_HISTORY) { + const maxHistory = getHistoryLength(); + + // Trim old messages from in-memory cache + while (history.length > maxHistory) { history.shift(); } + + // Write-through to DB (fire-and-forget, don't block) + const pool = getPool(); + if (pool) { + pool + .query( + `INSERT INTO conversations (channel_id, role, content, username) + VALUES ($1, $2, $3, $4)`, + [channelId, role, content, username || null], + ) + .catch((err) => { + logError('Failed to persist message to DB', { + channelId, + role, + username: username || null, + error: err.message, + }); + }); + } +} + +/** + * Initialize conversation history from DB on startup + * Loads last N messages per active channel + * @returns {Promise} + */ +export async function initConversationHistory() { + const pool = getPool(); + if (!pool) { + info('No DB available, skipping conversation history hydration'); + return; + } + + try { + const limit = getHistoryLength(); + const ttl = getHistoryTTLDays(); + + // Single query: fetch the last N messages per channel using ROW_NUMBER() + // Limited to non-expired rows to avoid full table scans. + const { rows } = await pool.query( + `SELECT channel_id, role, content + FROM ( + SELECT channel_id, role, content, created_at, + ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY created_at DESC) AS rn + FROM conversations + WHERE created_at >= NOW() - INTERVAL '1 day' * $2 + ) sub + WHERE rn <= $1 + ORDER BY channel_id, created_at ASC`, + [limit, ttl], + ); + + // Group rows by channel_id + const hydratedByChannel = new Map(); + + for (const row of rows) { + const channelId = row.channel_id; + if (!hydratedByChannel.has(channelId)) { + hydratedByChannel.set(channelId, []); + } + hydratedByChannel.get(channelId).push({ + role: row.role, + content: row.content, + }); + } + + // Replace channel histories with DB snapshots to avoid appending onto + // file-loaded state (which causes duplicate growth across restarts). + for (const [channelId, hydratedHistory] of hydratedByChannel.entries()) { + if (!conversationHistory.has(channelId)) { + conversationHistory.set(channelId, []); + } + const target = conversationHistory.get(channelId); + target.length = 0; + target.push(...hydratedHistory); + } + + info('Conversation history hydrated from DB', { + channels: hydratedByChannel.size, + totalMessages: rows.length, + }); + } catch (err) { + logWarn('Failed to hydrate conversation history from DB', { + error: err.message, + }); + } +} + +/** + * Start periodic cleanup of old conversation messages + * Deletes messages older than ai.historyTTLDays from the DB + */ +export function startConversationCleanup() { + // Only run if we have a DB + const pool = getPool(); + if (!pool) { + info('No DB available, skipping conversation cleanup scheduler'); + return; + } + + // Run cleanup immediately once, then on interval + runCleanup(); + cleanupTimer = setInterval(runCleanup, CLEANUP_INTERVAL_MS); + cleanupTimer.unref(); + info('Conversation cleanup scheduler started', { + intervalHours: CLEANUP_INTERVAL_MS / (60 * 60 * 1000), + }); +} + +/** + * Stop the periodic cleanup timer + */ +export function stopConversationCleanup() { + if (cleanupTimer) { + clearInterval(cleanupTimer); + cleanupTimer = null; + info('Conversation cleanup scheduler stopped'); + } +} + +/** + * Run a single cleanup pass + * @returns {Promise} + */ +async function runCleanup() { + const pool = getPool(); + if (!pool) return; + + try { + const ttlDays = getHistoryTTLDays(); + const result = await pool.query( + `DELETE FROM conversations + WHERE created_at < NOW() - INTERVAL '1 day' * $1`, + [ttlDays], + ); + + if (result.rowCount > 0) { + info('Cleaned up old conversation messages', { + deleted: result.rowCount, + ttlDays, + }); + } + } catch (err) { + logWarn('Conversation cleanup failed', { error: err.message }); + } } /** @@ -78,7 +383,7 @@ export async function generateResponse( config, healthMonitor = null, ) { - const history = getHistory(channelId); + const history = await getHistoryAsync(channelId); const systemPrompt = config.ai?.systemPrompt || @@ -130,8 +435,8 @@ You can use Discord markdown formatting.`; healthMonitor.setAPIStatus('ok'); } - // Update history - addToHistory(channelId, 'user', `${username}: ${userMessage}`); + // Update history with username for DB persistence + addToHistory(channelId, 'user', `${username}: ${userMessage}`, username); addToHistory(channelId, 'assistant', reply); return reply; diff --git a/src/modules/events.js b/src/modules/events.js index dd30091c9..a570163c1 100644 --- a/src/modules/events.js +++ b/src/modules/events.js @@ -3,7 +3,7 @@ * Handles Discord event listeners and handlers */ -import { Events } from 'discord.js'; +import { Client, Events } from 'discord.js'; import { info, error as logError, warn } from '../logger.js'; import { needsSplitting, splitMessage } from '../utils/splitMessage.js'; import { generateResponse } from './ai.js'; @@ -16,7 +16,7 @@ let processHandlersRegistered = false; /** * Register bot ready event handler - * @param {Object} client - Discord client + * @param {Client} client - Discord client * @param {Object} config - Bot configuration * @param {Object} healthMonitor - Health monitor instance */ @@ -43,23 +43,23 @@ export function registerReadyHandler(client, config, healthMonitor) { /** * Register guild member add event handler - * @param {Object} client - Discord client + * @param {Client} client - Discord client * @param {Object} config - Bot configuration */ export function registerGuildMemberAddHandler(client, config) { - client.on('guildMemberAdd', async (member) => { + client.on(Events.GuildMemberAdd, async (member) => { await sendWelcomeMessage(member, client, config); }); } /** * Register message create event handler - * @param {Object} client - Discord client + * @param {Client} client - Discord client * @param {Object} config - Bot configuration * @param {Object} healthMonitor - Health monitor instance */ export function registerMessageCreateHandler(client, config, healthMonitor) { - client.on('messageCreate', async (message) => { + client.on(Events.MessageCreate, async (message) => { // Ignore bots and DMs if (message.author.bot) return; if (!message.guild) return; @@ -131,16 +131,16 @@ export function registerMessageCreateHandler(client, config, healthMonitor) { /** * Register error event handlers - * @param {Object} client - Discord client + * @param {Client} client - Discord client */ export function registerErrorHandlers(client) { - client.on('error', (err) => { + client.on(Events.Error, (err) => { logError('Discord error', { error: err.message, stack: err.stack }); }); if (!processHandlersRegistered) { process.on('unhandledRejection', (err) => { - logError('Unhandled rejection', { error: err?.message, stack: err?.stack }); + logError('Unhandled rejection', { error: err?.message || String(err), stack: err?.stack }); }); processHandlersRegistered = true; } diff --git a/tests/db.test.js b/tests/db.test.js index ca757a3a5..c2a8bef8f 100644 --- a/tests/db.test.js +++ b/tests/db.test.js @@ -10,13 +10,6 @@ const pgMocks = vi.hoisted(() => ({ clientRelease: vi.fn(), })); -vi.mock('../src/logger.js', () => ({ - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), -})); - vi.mock('pg', () => { class Pool { constructor(config) { @@ -77,7 +70,7 @@ describe('db module', () => { afterEach(async () => { try { await dbModule.closeDb(); - } catch { + } catch (err) { // ignore cleanup failures } @@ -92,26 +85,59 @@ describe('db module', () => { } else { delete process.env.DATABASE_SSL; } + vi.clearAllMocks(); }); describe('initDb', () => { - it('should initialize database pool', async () => { + it('should initialize database pool and create schema', async () => { const pool = await dbModule.initDb(); expect(pool).toBeDefined(); + expect(pgMocks.poolConnect).toHaveBeenCalled(); expect(pgMocks.clientQuery).toHaveBeenCalledWith('SELECT NOW()'); expect(pgMocks.clientRelease).toHaveBeenCalled(); - expect(pgMocks.poolQuery).toHaveBeenCalled(); + + // Should have created tables and indexes + const queries = pgMocks.poolQuery.mock.calls.map((c) => c[0]); + expect(queries.some((q) => q.includes('CREATE TABLE IF NOT EXISTS config'))).toBe(true); + expect(queries.some((q) => q.includes('CREATE TABLE IF NOT EXISTS conversations'))).toBe( + true, + ); + expect(queries.some((q) => q.includes('idx_conversations_channel_created'))).toBe(true); + expect(queries.some((q) => q.includes('idx_conversations_created_at'))).toBe(true); }); it('should return existing pool on second call', async () => { const pool1 = await dbModule.initDb(); const pool2 = await dbModule.initDb(); + expect(pool1).toBe(pool2); expect(pgMocks.poolConnect).toHaveBeenCalledTimes(1); }); + it('should reject concurrent initDb calls while initialization is in progress', async () => { + let resolveConnect; + const pendingConnect = new Promise((resolve) => { + resolveConnect = resolve; + }); + + pgMocks.poolConnect.mockImplementationOnce(() => pendingConnect); + + const firstInit = dbModule.initDb(); + const secondInit = dbModule.initDb(); + + await expect(secondInit).rejects.toThrow('initDb is already in progress'); + + resolveConnect({ + query: pgMocks.clientQuery, + release: pgMocks.clientRelease, + }); + + const pool = await firstInit; + expect(pool).toBeDefined(); + }); + it('should throw if DATABASE_URL is not set', async () => { delete process.env.DATABASE_URL; await expect(dbModule.initDb()).rejects.toThrow( @@ -152,6 +178,7 @@ describe('db module', () => { await dbModule.initDb(); pgMocks.poolEnd.mockRejectedValueOnce(new Error('close failed')); await dbModule.closeDb(); + // Should log error but not throw }); }); diff --git a/tests/index.test.js b/tests/index.test.js index 084d4f105..5fed0c7fa 100644 --- a/tests/index.test.js +++ b/tests/index.test.js @@ -28,6 +28,10 @@ const mocks = vi.hoisted(() => ({ ai: { getConversationHistory: vi.fn(), setConversationHistory: vi.fn(), + setPool: vi.fn(), + initConversationHistory: vi.fn(), + startConversationCleanup: vi.fn(), + stopConversationCleanup: vi.fn(), }, config: { @@ -119,6 +123,10 @@ vi.mock('../src/logger.js', () => ({ vi.mock('../src/modules/ai.js', () => ({ getConversationHistory: mocks.ai.getConversationHistory, setConversationHistory: mocks.ai.setConversationHistory, + setPool: mocks.ai.setPool, + initConversationHistory: mocks.ai.initConversationHistory, + startConversationCleanup: mocks.ai.startConversationCleanup, + stopConversationCleanup: mocks.ai.stopConversationCleanup, })); vi.mock('../src/modules/config.js', () => ({ @@ -183,6 +191,10 @@ async function importIndex({ mocks.ai.getConversationHistory.mockReset().mockReturnValue(new Map()); mocks.ai.setConversationHistory.mockReset(); + mocks.ai.setPool.mockReset(); + mocks.ai.initConversationHistory.mockReset().mockResolvedValue(undefined); + mocks.ai.startConversationCleanup.mockReset(); + mocks.ai.stopConversationCleanup.mockReset(); mocks.config.loadConfig.mockReset().mockImplementation(() => { if (loadConfigReject) { diff --git a/tests/modules/ai.test.js b/tests/modules/ai.test.js index edb3ea1ea..d28eea5e1 100644 --- a/tests/modules/ai.test.js +++ b/tests/modules/ai.test.js @@ -1,154 +1,216 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; - -// Mock logger -vi.mock('../../src/logger.js', () => ({ - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// Mock config module +vi.mock('../../src/modules/config.js', () => ({ + getConfig: vi.fn(() => ({ + ai: { + historyLength: 20, + historyTTLDays: 30, + }, + })), })); +import { info, error as logError, warn as logWarn } from '../../src/logger.js'; import { addToHistory, generateResponse, getConversationHistory, - getHistory, - OPENCLAW_TOKEN, - OPENCLAW_URL, + getHistoryAsync, + initConversationHistory, setConversationHistory, + setPool, + startConversationCleanup, + stopConversationCleanup, + _setPoolGetter, } from '../../src/modules/ai.js'; +import { getConfig } from '../../src/modules/config.js'; + +// Mock logger +vi.mock('../../src/logger.js', () => ({ + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), +})); describe('ai module', () => { beforeEach(() => { - // Reset conversation history before each test setConversationHistory(new Map()); + setPool(null); + _setPoolGetter(null); + vi.clearAllMocks(); + // Reset config mock to defaults + getConfig.mockReturnValue({ ai: { historyLength: 20, historyTTLDays: 30 } }); }); - afterEach(() => { - vi.restoreAllMocks(); - }); - - describe('getConversationHistory / setConversationHistory', () => { - it('should get and set conversation history', () => { - const history = new Map([['channel1', [{ role: 'user', content: 'hi' }]]]); - setConversationHistory(history); - expect(getConversationHistory()).toBe(history); - }); - }); - - describe('OPENCLAW_URL and OPENCLAW_TOKEN', () => { - it('should export URL and token constants', () => { - expect(typeof OPENCLAW_URL).toBe('string'); - expect(typeof OPENCLAW_TOKEN).toBe('string'); - }); - }); - - describe('getHistory', () => { - it('should create empty history for new channel', () => { - const history = getHistory('new-channel'); + describe('getHistoryAsync', () => { + it('should create empty history for new channel', async () => { + const history = await getHistoryAsync('new-channel'); expect(history).toEqual([]); }); - it('should return existing history for known channel', () => { + it('should return existing history for known channel', async () => { addToHistory('ch1', 'user', 'hello'); - const history = getHistory('ch1'); + const history = await getHistoryAsync('ch1'); expect(history.length).toBe(1); expect(history[0]).toEqual({ role: 'user', content: 'hello' }); }); + + it('should hydrate DB history in-place when concurrent messages are added', async () => { + let resolveHydration; + const hydrationPromise = new Promise((resolve) => { + resolveHydration = resolve; + }); + + const mockQuery = vi + .fn() + .mockImplementationOnce(() => hydrationPromise) + .mockResolvedValue({}); + const mockPool = { query: mockQuery }; + setPool(mockPool); + + // Start hydration by calling getHistoryAsync (but don't await yet) + const asyncHistoryPromise = getHistoryAsync('race-channel'); + + // We know it's pending, so we can check the in-memory state via getConversationHistory + const historyRef = getConversationHistory().get('race-channel'); + expect(historyRef).toEqual([]); + + // Add a message while DB hydration is still pending + addToHistory('race-channel', 'user', 'concurrent message'); + + // DB returns newest-first; hydrateHistory() reverses into chronological order + resolveHydration({ + rows: [ + { role: 'assistant', content: 'db reply' }, + { role: 'user', content: 'db message' }, + ], + }); + + await hydrationPromise; + await asyncHistoryPromise; + + await vi.waitFor(() => { + expect(historyRef).toEqual([ + { role: 'user', content: 'db message' }, + { role: 'assistant', content: 'db reply' }, + { role: 'user', content: 'concurrent message' }, + ]); + expect(getConversationHistory().get('race-channel')).toBe(historyRef); + }); + }); + + it('should load from DB on cache miss', async () => { + // DB returns newest-first (ORDER BY created_at DESC) + const mockQuery = vi.fn().mockResolvedValue({ + rows: [ + { role: 'assistant', content: 'response' }, + { role: 'user', content: 'from db' }, + ], + }); + const mockPool = { query: mockQuery }; + setPool(mockPool); + + const history = await getHistoryAsync('ch-new'); + expect(history.length).toBe(2); + // After reversing, oldest comes first + expect(history[0].content).toBe('from db'); + expect(history[1].content).toBe('response'); + expect(mockQuery).toHaveBeenCalledWith(expect.stringContaining('SELECT role, content FROM conversations'), [ + 'ch-new', + 20, + ]); + }); }); describe('addToHistory', () => { - it('should add messages to channel history', () => { + it('should add messages to channel history', async () => { addToHistory('ch1', 'user', 'hello'); addToHistory('ch1', 'assistant', 'hi there'); - const history = getHistory('ch1'); + const history = await getHistoryAsync('ch1'); expect(history.length).toBe(2); }); - it('should trim history beyond MAX_HISTORY (20)', () => { + it('should trim history beyond configured historyLength (20)', async () => { for (let i = 0; i < 25; i++) { addToHistory('ch1', 'user', `message ${i}`); } - const history = getHistory('ch1'); + const history = await getHistoryAsync('ch1'); expect(history.length).toBe(20); expect(history[0].content).toBe('message 5'); }); - }); - - describe('generateResponse', () => { - it('should return AI response on success', async () => { - const mockResponse = { - ok: true, - json: vi.fn().mockResolvedValue({ - choices: [{ message: { content: 'Hello!' } }], - }), - }; - vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); - const config = { ai: { model: 'test-model', maxTokens: 512, systemPrompt: 'You are a bot' } }; - const result = await generateResponse('ch1', 'Hi', 'testuser', config); + it('should respect custom historyLength from config', async () => { + getConfig.mockReturnValue({ ai: { historyLength: 5, historyTTLDays: 30 } }); - expect(result).toBe('Hello!'); - expect(globalThis.fetch).toHaveBeenCalled(); + for (let i = 0; i < 10; i++) { + addToHistory('ch1', 'user', `message ${i}`); + } + const history = await getHistoryAsync('ch1'); + expect(history.length).toBe(5); + expect(history[0].content).toBe('message 5'); }); - it('should use default system prompt if not configured', async () => { - const mockResponse = { - ok: true, - json: vi.fn().mockResolvedValue({ - choices: [{ message: { content: 'Response' } }], - }), - }; - vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); + it('should write to DB when pool is available', () => { + const mockQuery = vi.fn().mockResolvedValue({}); + const mockPool = { query: mockQuery }; + setPool(mockPool); - const config = { ai: {} }; - const result = await generateResponse('ch1', 'Hi', 'testuser', config); + addToHistory('ch1', 'user', 'hello', 'testuser'); - expect(result).toBe('Response'); - // Verify fetch was called with default model - const fetchCall = globalThis.fetch.mock.calls[0]; - const body = JSON.parse(fetchCall[1].body); - expect(body.model).toBe('claude-sonnet-4-20250514'); - expect(body.max_tokens).toBe(1024); + expect(mockQuery).toHaveBeenCalledWith(expect.stringContaining('INSERT INTO conversations'), [ + 'ch1', + 'user', + 'hello', + 'testuser', + ]); }); + }); - it('should handle empty choices gracefully', async () => { - const mockResponse = { - ok: true, - json: vi.fn().mockResolvedValue({ choices: [] }), - }; - vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); - - const config = { ai: {} }; - const result = await generateResponse('ch1', 'Hi', 'testuser', config); - expect(result).toBe('I got nothing. Try again?'); + describe('initConversationHistory', () => { + it('should load messages from DB for all channels', async () => { + // Single ROW_NUMBER() query returns rows per-channel in chronological order + const mockQuery = vi.fn().mockResolvedValueOnce({ + rows: [ + { channel_id: 'ch1', role: 'user', content: 'msg1' }, + { channel_id: 'ch1', role: 'assistant', content: 'reply1' }, + { channel_id: 'ch2', role: 'user', content: 'msg2' }, + ], + }); + + const mockPool = { query: mockQuery }; + setPool(mockPool); + + await initConversationHistory(); + + const ch1 = await getHistoryAsync('ch1'); + expect(ch1.length).toBe(2); + expect(ch1[0].content).toBe('msg1'); + expect(ch1[1].content).toBe('reply1'); + + const ch2 = await getHistoryAsync('ch2'); + expect(ch2.length).toBe(1); }); + }); - it('should return fallback on API error', async () => { + describe('generateResponse', () => { + it('should return AI response on success', async () => { const mockResponse = { - ok: false, - status: 500, - statusText: 'Internal Server Error', + ok: true, + json: vi.fn().mockResolvedValue({ + choices: [{ message: { content: 'Hello there!' } }], + }), }; vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); - const mockHealth = { setAPIStatus: vi.fn(), recordAIRequest: vi.fn() }; - const config = { ai: {} }; - const result = await generateResponse('ch1', 'Hi', 'testuser', config, mockHealth); - - expect(result).toContain('trouble thinking'); - expect(mockHealth.setAPIStatus).toHaveBeenCalledWith('error'); - }); - - it('should return fallback on fetch exception', async () => { - vi.spyOn(globalThis, 'fetch').mockRejectedValue(new Error('network failure')); + const config = { ai: { model: 'test-model' } }; + const reply = await generateResponse('ch1', 'Hi', 'user1', config); - const config = { ai: {} }; - const result = await generateResponse('ch1', 'Hi', 'testuser', config); - expect(result).toContain('trouble thinking'); + expect(reply).toBe('Hello there!'); + expect(globalThis.fetch).toHaveBeenCalled(); }); - it('should update health monitor on success', async () => { + it('should include correct headers in fetch request', async () => { const mockResponse = { ok: true, json: vi.fn().mockResolvedValue({ @@ -157,66 +219,27 @@ describe('ai module', () => { }; vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); - const mockHealth = { setAPIStatus: vi.fn(), recordAIRequest: vi.fn() }; const config = { ai: {} }; - await generateResponse('ch1', 'Hi', 'testuser', config, mockHealth); + await generateResponse('ch1', 'Hi', 'user', config); - expect(mockHealth.recordAIRequest).toHaveBeenCalled(); - expect(mockHealth.setAPIStatus).toHaveBeenCalledWith('ok'); + const fetchCall = globalThis.fetch.mock.calls[0]; + expect(fetchCall[1].headers['Content-Type']).toBe('application/json'); }); + }); - it('should update conversation history on success', async () => { - const mockResponse = { - ok: true, - json: vi.fn().mockResolvedValue({ - choices: [{ message: { content: 'Reply' } }], - }), - }; - vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); + describe('cleanup scheduler', () => { + it('should run cleanup query on start', async () => { + const mockQuery = vi.fn().mockResolvedValue({ rowCount: 5 }); + const mockPool = { query: mockQuery }; + setPool(mockPool); - const config = { ai: {} }; - await generateResponse('ch1', 'Hello', 'user1', config); + startConversationCleanup(); - const history = getHistory('ch1'); - expect(history.length).toBe(2); - expect(history[0].role).toBe('user'); - expect(history[0].content).toContain('user1: Hello'); - expect(history[1].role).toBe('assistant'); - expect(history[1].content).toBe('Reply'); - }); + await vi.waitFor(() => { + expect(mockQuery).toHaveBeenCalledWith(expect.stringContaining('DELETE FROM conversations'), [30]); + }); - it('should include Authorization header when token is set', async () => { - vi.resetModules(); - process.env.OPENCLAW_API_KEY = 'test-key-123'; - - try { - vi.mock('../../src/logger.js', () => ({ - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - })); - - const { generateResponse: genResponse, setConversationHistory: setHistory } = await import( - '../../src/modules/ai.js' - ); - setHistory(new Map()); - - const mockResponse = { - ok: true, - json: vi.fn().mockResolvedValue({ - choices: [{ message: { content: 'OK' } }], - }), - }; - vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse); - - await genResponse('ch1', 'Hi', 'user', { ai: {} }); - - const fetchCall = globalThis.fetch.mock.calls[0]; - expect(fetchCall[1].headers.Authorization).toBe('Bearer test-key-123'); - } finally { - delete process.env.OPENCLAW_API_KEY; - } + stopConversationCleanup(); }); }); });