diff --git a/assistant/src/__tests__/handlers-add-trust-rule-metadata.test.ts b/assistant/src/__tests__/handlers-add-trust-rule-metadata.test.ts index 40355adf162..88b585fba47 100644 --- a/assistant/src/__tests__/handlers-add-trust-rule-metadata.test.ts +++ b/assistant/src/__tests__/handlers-add-trust-rule-metadata.test.ts @@ -62,6 +62,7 @@ import { getAllRules, clearAllRules, clearCache } from '../permissions/trust-sto import type { AddTrustRule } from '../daemon/ipc-contract.js'; import type { HandlerContext } from '../daemon/handlers.js'; import type { ServerMessage } from '../daemon/ipc-contract.js'; +import { DebouncerMap } from '../util/debounce.js'; function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { const sent: ServerMessage[] = []; @@ -73,7 +74,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/handlers-cu-observation-blob.test.ts b/assistant/src/__tests__/handlers-cu-observation-blob.test.ts index 15155d11493..1f845194baa 100644 --- a/assistant/src/__tests__/handlers-cu-observation-blob.test.ts +++ b/assistant/src/__tests__/handlers-cu-observation-blob.test.ts @@ -45,6 +45,7 @@ mock.module('../util/logger.js', () => ({ import { handleMessage, type HandlerContext } from '../daemon/handlers.js'; import type { CuObservation, IpcBlobRef, ServerMessage } from '../daemon/ipc-contract.js'; import { ComputerUseSession } from '../daemon/computer-use-session.js'; +import { DebouncerMap } from '../util/debounce.js'; /** Write a blob file to the test blob directory and return the IpcBlobRef. */ function writeBlobFile(content: Buffer, kind: IpcBlobRef['kind'], encoding: IpcBlobRef['encoding']): IpcBlobRef { @@ -86,7 +87,7 @@ function createTestContext(sessionId: string): { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/handlers-ipc-blob-probe.test.ts b/assistant/src/__tests__/handlers-ipc-blob-probe.test.ts index c84a3c461e1..13e44f0f870 100644 --- a/assistant/src/__tests__/handlers-ipc-blob-probe.test.ts +++ b/assistant/src/__tests__/handlers-ipc-blob-probe.test.ts @@ -44,6 +44,7 @@ mock.module('../util/logger.js', () => ({ import { handleMessage, type HandlerContext } from '../daemon/handlers.js'; import type { IpcBlobProbe, ServerMessage } from '../daemon/ipc-contract.js'; +import { DebouncerMap } from '../util/debounce.js'; /** Write a probe file to the test blob directory. */ function writeProbeFile(probeId: string, content: Buffer): string { @@ -63,7 +64,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/handlers-slack-config.test.ts b/assistant/src/__tests__/handlers-slack-config.test.ts index 12055a39b72..1c597275026 100644 --- a/assistant/src/__tests__/handlers-slack-config.test.ts +++ b/assistant/src/__tests__/handlers-slack-config.test.ts @@ -82,6 +82,7 @@ import type { ShareToSlackRequest, ServerMessage, } from '../daemon/ipc-contract.js'; +import { DebouncerMap } from '../util/debounce.js'; function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { const sent: ServerMessage[] = []; @@ -93,7 +94,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/handlers-telegram-config.test.ts b/assistant/src/__tests__/handlers-telegram-config.test.ts index ff66d0cf89a..2e89583c8ab 100644 --- a/assistant/src/__tests__/handlers-telegram-config.test.ts +++ b/assistant/src/__tests__/handlers-telegram-config.test.ts @@ -120,6 +120,7 @@ import type { TelegramConfigRequest, ServerMessage, } from '../daemon/ipc-contract.js'; +import { DebouncerMap } from '../util/debounce.js'; function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { const sent: ServerMessage[] = []; @@ -131,7 +132,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/handlers-twitter-config.test.ts b/assistant/src/__tests__/handlers-twitter-config.test.ts index 99422ba6a17..1a58a50a59e 100644 --- a/assistant/src/__tests__/handlers-twitter-config.test.ts +++ b/assistant/src/__tests__/handlers-twitter-config.test.ts @@ -115,6 +115,7 @@ import type { TwitterIntegrationConfigRequest, ServerMessage, } from '../daemon/ipc-contract.js'; +import { DebouncerMap } from '../util/debounce.js'; function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { const sent: ServerMessage[] = []; @@ -126,7 +127,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/tool-permission-simulate-handler.test.ts b/assistant/src/__tests__/tool-permission-simulate-handler.test.ts index 99090fd9845..44092002b04 100644 --- a/assistant/src/__tests__/tool-permission-simulate-handler.test.ts +++ b/assistant/src/__tests__/tool-permission-simulate-handler.test.ts @@ -60,6 +60,7 @@ import { handleToolPermissionSimulate } from '../daemon/handlers/config.js'; import { addRule, clearAllRules, clearCache } from '../permissions/trust-store.js'; import type { ToolPermissionSimulateRequest, ToolPermissionSimulateResponse, ServerMessage } from '../daemon/ipc-contract.js'; import type { HandlerContext } from '../daemon/handlers.js'; +import { DebouncerMap } from '../util/debounce.js'; function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { const sent: ServerMessage[] = []; @@ -71,7 +72,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/__tests__/twitter-auth-handler.test.ts b/assistant/src/__tests__/twitter-auth-handler.test.ts index a351d03c68a..21da7347a35 100644 --- a/assistant/src/__tests__/twitter-auth-handler.test.ts +++ b/assistant/src/__tests__/twitter-auth-handler.test.ts @@ -144,6 +144,7 @@ import type { TwitterAuthStatusRequest, ServerMessage, } from '../daemon/ipc-contract.js'; +import { DebouncerMap } from '../util/debounce.js'; // Mock global fetch for Twitter /2/users/me const _originalFetch = globalThis.fetch; @@ -162,7 +163,7 @@ function createTestContext(): { ctx: HandlerContext; sent: ServerMessage[] } { cuObservationParseSequence: new Map(), socketSandboxOverride: new Map(), sharedRequestTimestamps: [], - debounceTimers: new Map(), + debounceTimers: new DebouncerMap({ defaultDelayMs: 200 }), suppressConfigReload: false, setSuppressConfigReload: () => {}, updateConfigFingerprint: () => {}, diff --git a/assistant/src/daemon/handlers/config.ts b/assistant/src/daemon/handlers/config.ts index 8fb78612904..a290a076c01 100644 --- a/assistant/src/daemon/handlers/config.ts +++ b/assistant/src/daemon/handlers/config.ts @@ -145,10 +145,7 @@ export function handleModelSet( ctx.setSuppressConfigReload(wasSuppressed); throw err; } - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); // Re-initialize provider with the new model so LLM calls use it const config = getConfig(); @@ -195,10 +192,7 @@ export function handleImageGenModelSet( ctx.setSuppressConfigReload(wasSuppressed); throw err; } - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); ctx.updateConfigFingerprint(); log.info({ model: msg.model }, 'Image generation model updated'); @@ -547,10 +541,7 @@ export function handleIngressConfig( ctx.setSuppressConfigReload(wasSuppressed); throw err; } - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); // Propagate to the gateway's process environment so it picks up the // new URL when it is restarted. For the local-deployment path the diff --git a/assistant/src/daemon/handlers/shared.ts b/assistant/src/daemon/handlers/shared.ts index 4e5939088ca..167ad1ea060 100644 --- a/assistant/src/daemon/handlers/shared.ts +++ b/assistant/src/daemon/handlers/shared.ts @@ -8,6 +8,7 @@ import { estimateBase64Bytes } from '../assistant-attachments.js'; import type { ClientMessage, CuSessionCreate, ServerMessage, SessionTransportMetadata } from '../ipc-protocol.js'; import type { SecretPromptResult } from '../../permissions/secret-prompter.js'; import { getConfig } from '../../config/loader.js'; +import type { DebouncerMap } from '../../util/debounce.js'; const log = getLogger('handlers'); @@ -115,7 +116,7 @@ export interface HandlerContext { cuObservationParseSequence: Map; socketSandboxOverride: Map; sharedRequestTimestamps: number[]; - debounceTimers: Map>; + debounceTimers: DebouncerMap; suppressConfigReload: boolean; setSuppressConfigReload(value: boolean): void; updateConfigFingerprint(): void; diff --git a/assistant/src/daemon/handlers/skills.ts b/assistant/src/daemon/handlers/skills.ts index a39817dab99..384edb81f4b 100644 --- a/assistant/src/daemon/handlers/skills.ts +++ b/assistant/src/daemon/handlers/skills.ts @@ -61,10 +61,7 @@ export function handleSkillsEnable( } invalidateConfigCache(); - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); ctx.updateConfigFingerprint(); @@ -108,10 +105,7 @@ export function handleSkillsDisable( } invalidateConfigCache(); - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); ctx.updateConfigFingerprint(); @@ -165,10 +159,7 @@ export function handleSkillsConfigure( } invalidateConfigCache(); - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); ctx.updateConfigFingerprint(); @@ -226,10 +217,7 @@ export async function handleSkillsInstall( throw err; } invalidateConfigCache(); - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); ctx.updateConfigFingerprint(); } catch (err) { log.warn({ err, skillId }, 'Failed to auto-enable installed skill'); @@ -321,10 +309,7 @@ export async function handleSkillsUninstall( } invalidateConfigCache(); - const existingSuppressTimer = ctx.debounceTimers.get('__suppress_reset__'); - if (existingSuppressTimer) clearTimeout(existingSuppressTimer); - const resetTimer = setTimeout(() => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); - ctx.debounceTimers.set('__suppress_reset__', resetTimer); + ctx.debounceTimers.schedule('__suppress_reset__', () => { ctx.setSuppressConfigReload(false); }, CONFIG_RELOAD_DEBOUNCE_MS); ctx.updateConfigFingerprint(); } diff --git a/assistant/src/daemon/server.ts b/assistant/src/daemon/server.ts index e8343a5430b..50a17c312e0 100644 --- a/assistant/src/daemon/server.ts +++ b/assistant/src/daemon/server.ts @@ -40,6 +40,7 @@ import { tryRouteCallMessage } from '../calls/call-bridge.js'; import { resolveSlash } from './session-slash.js'; import { createUserMessage, createAssistantMessage } from '../agent/message-types.js'; import { registerDaemonCallbacks } from '../work-items/work-item-runner.js'; +import { DebouncerMap } from '../util/debounce.js'; const log = getLogger('server'); @@ -77,8 +78,11 @@ export class DaemonServer { private socketPath: string; private httpPort: number | undefined; private watchers: FSWatcher[] = []; - private debounceTimers = new Map>(); - private static readonly MAX_DEBOUNCE_ENTRIES = 1000; + private debounceTimers = new DebouncerMap({ + defaultDelayMs: 200, + maxEntries: 1000, + protectedKeyPrefix: '__', + }); private suppressConfigReload = false; private lastConfigFingerprint = ''; private lastConfigRefreshTime = 0; @@ -373,16 +377,10 @@ export class DaemonServer { if (!filename) return; const file = String(filename); if (!handlers[file]) return; - const key = `file:${file}`; - const existing = this.debounceTimers.get(key); - if (existing) clearTimeout(existing); - const timer = setTimeout(() => { - this.debounceTimers.delete(key); + this.debounceTimers.schedule(`file:${file}`, () => { log.info({ file }, 'File changed, reloading'); handlers[file](); - }, 200); - this.debounceTimers.set(key, timer); - this.enforceDebounceLimit(); + }); }); this.watchers.push(watcher); log.info({ dir }, `Watching ${label}`); @@ -478,51 +476,22 @@ export class DaemonServer { } private stopFileWatchers(): void { - for (const timer of this.debounceTimers.values()) { - clearTimeout(timer); - } - this.debounceTimers.clear(); + this.debounceTimers.cancelAll(); for (const watcher of this.watchers) { watcher.close(); } this.watchers = []; } - /** - * Evict the oldest file-watcher debounce entries when the map exceeds the safety cap. - * Map iteration order follows insertion order, so the first keys are oldest. - * Protects system timers (keys starting with '__') from eviction, so critical - * timers like '__suppress_reset__' are never cleared during bursts of file events. - */ - private enforceDebounceLimit(): void { - if (this.debounceTimers.size <= DaemonServer.MAX_DEBOUNCE_ENTRIES) return; - const excess = this.debounceTimers.size - DaemonServer.MAX_DEBOUNCE_ENTRIES; - let removed = 0; - for (const [key, timer] of this.debounceTimers) { - if (removed >= excess) break; - // Skip system timers (those with keys starting with '__') - if (key.startsWith('__')) continue; - clearTimeout(timer); - this.debounceTimers.delete(key); - removed++; - } - } - private startSkillsWatchers(evictSessions: () => void): void { const skillsDir = getWorkspaceSkillsDir(); if (!existsSync(skillsDir)) return; const scheduleSkillsReload = (file: string): void => { - const key = `skills:${file}`; - const existing = this.debounceTimers.get(key); - if (existing) clearTimeout(existing); - const timer = setTimeout(() => { - this.debounceTimers.delete(key); + this.debounceTimers.schedule(`skills:${file}`, () => { log.info({ file }, 'Skill file changed, reloading'); evictSessions(); - }, 200); - this.debounceTimers.set(key, timer); - this.enforceDebounceLimit(); + }); }; try { diff --git a/assistant/src/hooks/manager.ts b/assistant/src/hooks/manager.ts index 9628a10b3c3..1e39d9969e7 100644 --- a/assistant/src/hooks/manager.ts +++ b/assistant/src/hooks/manager.ts @@ -3,6 +3,7 @@ import { discoverHooks } from './discovery.js'; import { runHookScript } from './runner.js'; import { getLogger, isDebug } from '../util/logger.js'; import { getHooksDir } from '../util/platform.js'; +import { Debouncer } from '../util/debounce.js'; import type { DiscoveredHook, HookEventName, HookEventData, HookTriggerResult } from './types.js'; const log = getLogger('hooks-manager'); @@ -11,7 +12,7 @@ export class HookManager { private hooks: DiscoveredHook[] = []; private eventIndex = new Map(); private watcher: FSWatcher | null = null; - private debounceTimer: ReturnType | null = null; + private readonly debouncer = new Debouncer(500); initialize(): void { this.hooks = discoverHooks(); @@ -82,12 +83,10 @@ export class HookManager { try { this.watcher = watch(hooksDir, { recursive: true }, (_eventType, filename) => { - if (this.debounceTimer) clearTimeout(this.debounceTimer); - this.debounceTimer = setTimeout(() => { - this.debounceTimer = null; + this.debouncer.schedule(() => { log.info({ filename: String(filename ?? '') }, 'Hooks directory changed, reloading'); this.reload(); - }, 500); + }); }); log.info({ dir: hooksDir }, 'Watching hooks directory for changes'); } catch (err) { @@ -96,10 +95,7 @@ export class HookManager { } stopWatching(): void { - if (this.debounceTimer) { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - } + this.debouncer.cancel(); if (this.watcher) { this.watcher.close(); this.watcher = null; diff --git a/assistant/src/memory/embedding-local.ts b/assistant/src/memory/embedding-local.ts index 11c27e0605c..b576c8363b1 100644 --- a/assistant/src/memory/embedding-local.ts +++ b/assistant/src/memory/embedding-local.ts @@ -1,5 +1,6 @@ import type { EmbeddingBackend, EmbeddingRequestOptions } from './embedding-backend.js'; import { getLogger } from '../util/logger.js'; +import { PromiseGuard } from '../util/promise-guard.js'; const log = getLogger('memory-embedding-local'); @@ -19,7 +20,7 @@ export class LocalEmbeddingBackend implements EmbeddingBackend { readonly provider = 'local' as const; readonly model: string; private extractor: FeatureExtractionPipeline | null = null; - private initPromise: Promise | null = null; + private readonly initGuard = new PromiseGuard(); constructor(model: string) { this.model = model; @@ -50,18 +51,7 @@ export class LocalEmbeddingBackend implements EmbeddingBackend { private async ensureInitialized(): Promise { if (this.extractor) return; - if (this.initPromise) { - await this.initPromise; - return; - } - - this.initPromise = this.initialize(); - try { - await this.initPromise; - } catch (err) { - this.initPromise = null; - throw err; - } + await this.initGuard.run(() => this.initialize()); } private async initialize(): Promise { diff --git a/assistant/src/tools/terminal/parser.ts b/assistant/src/tools/terminal/parser.ts index 26cac301ec5..aace9f80d87 100644 --- a/assistant/src/tools/terminal/parser.ts +++ b/assistant/src/tools/terminal/parser.ts @@ -3,6 +3,7 @@ import { readFileSync, existsSync } from 'node:fs'; import { createHash } from 'node:crypto'; import { getLogger } from '../../util/logger.js'; import { IntegrityError } from '../../util/errors.js'; +import { PromiseGuard } from '../../util/promise-guard.js'; import { Parser, Language, type Node as TSNode } from 'web-tree-sitter'; const log = getLogger('shell-parser'); @@ -73,7 +74,7 @@ function verifyWasmChecksum(filePath: string, label: string): void { } let parserInstance: Parser | null = null; -let initPromise: Promise | null = null; +const initGuard = new PromiseGuard(); /** * Locate a WASM file from a dependency package. @@ -105,27 +106,24 @@ function findWasmPath(pkg: string, file: string): string { async function ensureParser(): Promise { if (parserInstance) return parserInstance; - if (!initPromise) { - initPromise = (async () => { - const treeSitterWasm = findWasmPath('web-tree-sitter', 'web-tree-sitter.wasm'); - const bashWasmPath = findWasmPath('tree-sitter-bash', 'tree-sitter-bash.wasm'); + await initGuard.run(async () => { + const treeSitterWasm = findWasmPath('web-tree-sitter', 'web-tree-sitter.wasm'); + const bashWasmPath = findWasmPath('tree-sitter-bash', 'tree-sitter-bash.wasm'); - verifyWasmChecksum(treeSitterWasm, 'web-tree-sitter.wasm'); - verifyWasmChecksum(bashWasmPath, 'tree-sitter-bash.wasm'); + verifyWasmChecksum(treeSitterWasm, 'web-tree-sitter.wasm'); + verifyWasmChecksum(bashWasmPath, 'tree-sitter-bash.wasm'); - await Parser.init({ - locateFile: () => treeSitterWasm, - }); + await Parser.init({ + locateFile: () => treeSitterWasm, + }); - const Bash = await Language.load(bashWasmPath); - const parser = new Parser(); - parser.setLanguage(Bash); - parserInstance = parser; - log.info('Shell parser initialized (web-tree-sitter + bash, checksums verified)'); - })(); - } + const Bash = await Language.load(bashWasmPath); + const parser = new Parser(); + parser.setLanguage(Bash); + parserInstance = parser; + log.info('Shell parser initialized (web-tree-sitter + bash, checksums verified)'); + }); - await initPromise; return parserInstance!; } diff --git a/assistant/src/util/debounce.ts b/assistant/src/util/debounce.ts new file mode 100644 index 00000000000..d1a578f143a --- /dev/null +++ b/assistant/src/util/debounce.ts @@ -0,0 +1,88 @@ +/** + * Single-key debouncer. Delays execution until no new calls arrive + * within the specified delay period. + */ +export class Debouncer { + private timer: ReturnType | null = null; + + constructor(private readonly delayMs: number) {} + + schedule(fn: () => void): void { + if (this.timer) clearTimeout(this.timer); + this.timer = setTimeout(() => { + this.timer = null; + fn(); + }, this.delayMs); + } + + cancel(): void { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } +} + +/** + * Multi-key debouncer. Each key gets its own independent timer. + * Includes an optional entry limit with eviction of oldest non-protected entries. + */ +export class DebouncerMap { + private timers = new Map>(); + private readonly defaultDelayMs: number; + private readonly maxEntries: number; + private readonly protectedKeyPrefix: string; + + constructor(options: { + defaultDelayMs: number; + maxEntries?: number; + protectedKeyPrefix?: string; + }) { + this.defaultDelayMs = options.defaultDelayMs; + this.maxEntries = options.maxEntries ?? Infinity; + this.protectedKeyPrefix = options.protectedKeyPrefix ?? ''; + } + + schedule(key: string, fn: () => void, delayMs?: number): void { + const existing = this.timers.get(key); + if (existing) clearTimeout(existing); + const timer = setTimeout(() => { + this.timers.delete(key); + fn(); + }, delayMs ?? this.defaultDelayMs); + this.timers.set(key, timer); + this.enforceLimit(); + } + + cancel(key: string): void { + const timer = this.timers.get(key); + if (timer) { + clearTimeout(timer); + this.timers.delete(key); + } + } + + cancelAll(): void { + for (const timer of this.timers.values()) { + clearTimeout(timer); + } + this.timers.clear(); + } + + get size(): number { + return this.timers.size; + } + + private enforceLimit(): void { + if (this.timers.size <= this.maxEntries) return; + const excess = this.timers.size - this.maxEntries; + let removed = 0; + for (const [key, timer] of this.timers) { + if (removed >= excess) break; + if (this.protectedKeyPrefix && key.startsWith(this.protectedKeyPrefix)) continue; + clearTimeout(timer); + this.timers.delete(key); + removed++; + } + } +} diff --git a/assistant/src/util/promise-guard.ts b/assistant/src/util/promise-guard.ts new file mode 100644 index 00000000000..680a4e4ac1a --- /dev/null +++ b/assistant/src/util/promise-guard.ts @@ -0,0 +1,37 @@ +/** + * Guards against concurrent execution of an async factory. + * Multiple concurrent callers share the same in-flight promise. + * On failure, the guard resets so subsequent calls can retry. + */ +export class PromiseGuard { + private promise: Promise | null = null; + + /** Whether a promise is currently in-flight. */ + get active(): boolean { + return this.promise !== null; + } + + /** + * Execute the factory, deduplicating concurrent calls. + * If a call is already in-flight, returns the same promise. + * On failure, clears the cached promise to allow retry. + * + * @param factory - Creates the promise on first call. + * @param onError - Optional callback invoked when the factory rejects (before re-throwing). + */ + run(factory: () => Promise, onError?: (err: unknown) => void): Promise { + if (this.promise) return this.promise; + + this.promise = factory(); + this.promise.catch((err) => { + this.promise = null; + onError?.(err); + }); + return this.promise; + } + + /** Reset the guard, allowing the next call to create a new promise. */ + reset(): void { + this.promise = null; + } +} diff --git a/assistant/src/workspace/git-service.ts b/assistant/src/workspace/git-service.ts index 9a578b853af..f577c44f155 100644 --- a/assistant/src/workspace/git-service.ts +++ b/assistant/src/workspace/git-service.ts @@ -4,6 +4,7 @@ import { execFile } from 'node:child_process'; import { promisify } from 'node:util'; import { getLogger } from '../util/logger.js'; import { getConfig } from '../config/loader.js'; +import { PromiseGuard } from '../util/promise-guard.js'; const execFileAsync = promisify(execFile); const log = getLogger('workspace-git'); @@ -133,7 +134,7 @@ export class WorkspaceGitService { private readonly workspaceDir: string; private readonly mutex: Mutex; private initialized = false; - private initPromise: Promise | null = null; + private readonly initGuard = new PromiseGuard(); private consecutiveFailures = 0; private nextAllowedAttemptMs = 0; private initConsecutiveFailures = 0; @@ -236,81 +237,38 @@ export class WorkspaceGitService { } // If initialization is in progress, wait for it - if (this.initPromise) { - return this.initPromise; + if (this.initGuard.active) { + return this.initGuard.run(() => { throw new Error('unreachable'); }); } // Circuit breaker: skip if multiple recent init attempts have been failing. - // Checked AFTER initPromise so callers waiting on in-progress init aren't + // Checked AFTER initGuard.active so callers waiting on in-progress init aren't // blocked, and only activates after 2+ consecutive failures so that a // single transient failure allows immediate retry. if (this.isInitBreakerOpen()) { throw new Error('Init circuit breaker open: backing off after repeated failures'); } - // Start initialization - this.initPromise = this.mutex.withLock(async () => { - // Double-check after acquiring lock - if (this.initialized) { - return; - } - - const gitDir = join(this.workspaceDir, '.git'); - - if (existsSync(gitDir)) { - // Validate existing repo is not corrupted before marking as ready. - // A corrupted .git directory (e.g. missing HEAD) would cause all - // subsequent git operations to fail with confusing errors. - try { - await this.execGit(['rev-parse', '--git-dir']); - } catch (err: unknown) { - // Distinguish transient failures from genuine corruption. - // Transient errors (timeouts, permissions, missing git binary) - // should NOT destroy .git — they will resolve on retry via - // the initPromise clearing logic. - const errMsg = err instanceof Error ? err.message : String(err); - const execErr = err as ExecError; - const isTimeout = execErr.killed === true - || execErr.signal === 'SIGTERM' - || errMsg.includes('SIGTERM') - || errMsg.includes('timed out'); - const isPermission = execErr.code === 'EACCES' - || errMsg.includes('EACCES') - || errMsg.toLowerCase().includes('permission denied'); - const isMissingBinary = execErr.code === 'ENOENT' - || errMsg.includes('ENOENT'); - - if (isTimeout || isPermission || isMissingBinary) { - // Re-throw so initialization fails gracefully without - // destroying valid git history. - throw err; - } - - // Genuine corruption (e.g. missing HEAD, broken refs) — - // remove corrupted .git and fall through to full init below. - log.warn( - { workspaceDir: this.workspaceDir, err: errMsg }, - 'Corrupted .git directory detected; reinitializing', - ); - const { rmSync } = await import('node:fs'); - rmSync(gitDir, { recursive: true, force: true }); + return this.initGuard.run( + () => this.mutex.withLock(async () => { + // Double-check after acquiring lock + if (this.initialized) { + return; } + const gitDir = join(this.workspaceDir, '.git'); + if (existsSync(gitDir)) { - // .git exists and passed the corruption check, but we still - // need to verify that at least one commit exists. A partial - // init (e.g. git init succeeded but the initial commit failed) - // leaves .git present with an undefined HEAD. In that case, - // fall through to the initial commit logic below. - let headExists = false; + // Validate existing repo is not corrupted before marking as ready. + // A corrupted .git directory (e.g. missing HEAD) would cause all + // subsequent git operations to fail with confusing errors. try { - await this.execGit(['rev-parse', 'HEAD']); - headExists = true; + await this.execGit(['rev-parse', '--git-dir']); } catch (err: unknown) { - // Distinguish transient failures from genuine "no commits". + // Distinguish transient failures from genuine corruption. // Transient errors (timeouts, permissions, missing git binary) - // should NOT fall through to re-initialization — they will - // resolve on retry via the initPromise clearing logic. + // should NOT destroy .git — they will resolve on retry via + // the guard clearing logic. const errMsg = err instanceof Error ? err.message : String(err); const execErr = err as ExecError; const isTimeout = execErr.killed === true @@ -324,68 +282,104 @@ export class WorkspaceGitService { || errMsg.includes('ENOENT'); if (isTimeout || isPermission || isMissingBinary) { + // Re-throw so initialization fails gracefully without + // destroying valid git history. throw err; } - // Genuine "no commits" (unborn HEAD) — fall through to - // create the initial commit. - } - if (headExists) { - // HEAD resolves — repo is fully initialized. - // Run normalization for existing repos that may have been - // created before these helpers existed, or by external tools. - // These calls are OUTSIDE the rev-parse try/catch so that - // normalization errors are not misclassified as "no commits". - this.ensureGitignoreRulesLocked(); - await this.ensureCommitIdentityLocked(); - await this.ensureOnMainLocked(); - this.initialized = true; - this.recordInitSuccess(); - return; + // Genuine corruption (e.g. missing HEAD, broken refs) — + // remove corrupted .git and fall through to full init below. + log.warn( + { workspaceDir: this.workspaceDir, err: errMsg }, + 'Corrupted .git directory detected; reinitializing', + ); + const { rmSync } = await import('node:fs'); + rmSync(gitDir, { recursive: true, force: true }); } - } - // Otherwise fall through to reinitialize / create initial commit - } - // Initialize new git repository - await this.execGit(['init', '-b', 'main']); - - // Run normalization (gitignore + identity + branch enforcement). - // For fresh `git init -b main` the branch is already main, but - // in the corruption-recovery path we fall through here after - // removing .git, so branch enforcement is still useful. - this.ensureGitignoreRulesLocked(); - await this.ensureCommitIdentityLocked(); - await this.ensureOnMainLocked(); - - // Create initial commit synchronously within the lock to prevent - // races with the first commitChanges() call. Without this, the - // initial commit could run concurrently and consume edits meant - // for the first user-requested commit. - const status = await this.getStatusInternal(); - const hasExistingFiles = status.untracked.length > 1 || // More than just .gitignore - status.untracked.some(f => f !== '.gitignore'); + if (existsSync(gitDir)) { + // .git exists and passed the corruption check, but we still + // need to verify that at least one commit exists. A partial + // init (e.g. git init succeeded but the initial commit failed) + // leaves .git present with an undefined HEAD. In that case, + // fall through to the initial commit logic below. + let headExists = false; + try { + await this.execGit(['rev-parse', 'HEAD']); + headExists = true; + } catch (err: unknown) { + // Distinguish transient failures from genuine "no commits". + // Transient errors (timeouts, permissions, missing git binary) + // should NOT fall through to re-initialization — they will + // resolve on retry via the guard clearing logic. + const errMsg = err instanceof Error ? err.message : String(err); + const execErr = err as ExecError; + const isTimeout = execErr.killed === true + || execErr.signal === 'SIGTERM' + || errMsg.includes('SIGTERM') + || errMsg.includes('timed out'); + const isPermission = execErr.code === 'EACCES' + || errMsg.includes('EACCES') + || errMsg.toLowerCase().includes('permission denied'); + const isMissingBinary = execErr.code === 'ENOENT' + || errMsg.includes('ENOENT'); + + if (isTimeout || isPermission || isMissingBinary) { + throw err; + } + // Genuine "no commits" (unborn HEAD) — fall through to + // create the initial commit. + } - await this.execGit(['add', '-A']); + if (headExists) { + // HEAD resolves — repo is fully initialized. + // Run normalization for existing repos that may have been + // created before these helpers existed, or by external tools. + // These calls are OUTSIDE the rev-parse try/catch so that + // normalization errors are not misclassified as "no commits". + this.ensureGitignoreRulesLocked(); + await this.ensureCommitIdentityLocked(); + await this.ensureOnMainLocked(); + this.initialized = true; + this.recordInitSuccess(); + return; + } + } + // Otherwise fall through to reinitialize / create initial commit + } - const message = hasExistingFiles - ? 'Initial commit: migrated existing workspace' - : 'Initial commit: new workspace'; + // Initialize new git repository + await this.execGit(['init', '-b', 'main']); + + // Run normalization (gitignore + identity + branch enforcement). + // For fresh `git init -b main` the branch is already main, but + // in the corruption-recovery path we fall through here after + // removing .git, so branch enforcement is still useful. + this.ensureGitignoreRulesLocked(); + await this.ensureCommitIdentityLocked(); + await this.ensureOnMainLocked(); + + // Create initial commit synchronously within the lock to prevent + // races with the first commitChanges() call. Without this, the + // initial commit could run concurrently and consume edits meant + // for the first user-requested commit. + const status = await this.getStatusInternal(); + const hasExistingFiles = status.untracked.length > 1 || // More than just .gitignore + status.untracked.some(f => f !== '.gitignore'); - await this.execGit(['commit', '-m', message, '--allow-empty']); + await this.execGit(['add', '-A']); - this.initialized = true; - this.recordInitSuccess(); - }); + const message = hasExistingFiles + ? 'Initial commit: migrated existing workspace' + : 'Initial commit: new workspace'; - // If initialization fails, clear the cached promise so subsequent - // calls can retry instead of permanently returning the rejected promise. - this.initPromise.catch(() => { - this.initPromise = null; - this.recordInitFailure(); - }); + await this.execGit(['commit', '-m', message, '--allow-empty']); - return this.initPromise; + this.initialized = true; + this.recordInitSuccess(); + }), + () => this.recordInitFailure(), + ); } /**