Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions gitnexus/src/core/lbug/pool-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ interface PoolEntry {

const pool = new Map<string, PoolEntry>();

/**
* Listeners notified when a pool entry is torn down (LRU eviction, idle
* timeout, explicit close). Used by upper layers (e.g. the BM25 search
* module) to invalidate per-repo caches that must not outlive the pool
* entry that produced them.
*
* Listeners run synchronously inside `closeOne` after the pool entry has
* been removed; throwing listeners are isolated so one bad listener does
* not prevent others from firing or break teardown.
*/
type PoolCloseListener = (repoId: string) => void;
const poolCloseListeners = new Set<PoolCloseListener>();

/**
* Subscribe to pool-close events. Returns a disposer that removes the
* listener (handy for tests).
*/
export function addPoolCloseListener(listener: PoolCloseListener): () => void {
poolCloseListeners.add(listener);
return () => {
poolCloseListeners.delete(listener);
};
}

/**
* Shared Database cache keyed by resolved dbPath.
* Multiple repoIds pointing to the same path share one native Database
Expand Down Expand Up @@ -159,6 +183,16 @@ function closeOne(repoId: string): void {
}

pool.delete(repoId);

// Notify listeners AFTER the pool entry is gone so any cache-invalidation
// they perform is consistent with `isLbugReady(repoId) === false`.
for (const listener of poolCloseListeners) {
try {
listener(repoId);
} catch {
// Isolate listener failures — teardown must complete.
}
}
}

/**
Expand Down
66 changes: 60 additions & 6 deletions gitnexus/src/core/search/bm25-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,48 @@ const FTS_INDEXES: ReadonlyArray<{
* Per-process cache for the MCP pool path: tracks which `(repoId, table)`
* pairs have been ensured. The CLI/pipeline path gets its own cache inside
* `lbug-adapter.ts` keyed by table/index, scoped to the singleton connection.
*
* IMPORTANT: an entry is added ONLY when the index was confirmed to exist
* (CREATE_FTS_INDEX succeeded, or failed with `'already exists'`). Other
* failures (transient lock errors, missing extension, etc.) leave the key
* unset so the next query retries instead of silently caching the failure.
*
* Entries for a given repoId are invalidated when its pool is closed —
* see the `addPoolCloseListener` registration in `searchFTSFromLbug`.
*/
const ensuredPoolFTS = new Set<string>();

/**
* Drop all ensured-FTS cache entries for a given repoId.
*
* Called from the pool-close listener so that a pool teardown / recreation
* forces the next `searchFTSFromLbug` call to re-issue `CREATE_FTS_INDEX`
* against the fresh connection rather than trust stale ensure-state from a
* previous pool lifetime.
*
* Exported for tests; the listener wiring is internal.
*/
export function invalidateEnsuredFTSForRepo(repoId: string): void {
const prefix = `${repoId}:`;
for (const key of ensuredPoolFTS) {
if (key.startsWith(prefix)) ensuredPoolFTS.delete(key);
}
}

/**
* Tracks whether we've already wired the pool-close listener for this
* process. The pool adapter is dynamically imported, so registration
* happens lazily on the first MCP-pool-backed FTS query.
*/
let poolCloseListenerRegistered = false;
function registerPoolCloseListenerOnce(
addPoolCloseListener: (listener: (repoId: string) => void) => void,
): void {
if (poolCloseListenerRegistered) return;
poolCloseListenerRegistered = true;
addPoolCloseListener((repoId) => invalidateEnsuredFTSForRepo(repoId));
}

async function ensureFTSIndexViaExecutor(
executor: (cypher: string) => Promise<any[]>,
repoId: string,
Expand All @@ -58,16 +97,25 @@ async function ensureFTSIndexViaExecutor(
await executor(
`CALL CREATE_FTS_INDEX('${table}', '${indexName}', [${propList}], stemmer := 'porter')`,
);
// Index was created successfully — safe to cache.
ensuredPoolFTS.add(key);
} catch (e: any) {
// 'already exists' is the happy path (index persists on disk between
// process invocations) — anything else we swallow because FTS is
// best-effort: queryFTS itself returns [] on missing-index errors.
// process invocations) — cache it. Anything else is treated as a
// transient failure: surface a one-time warning and leave the key
// unset so the NEXT query retries rather than silently using a
// cached failure (which previously disabled BM25 for the whole
// process for that repo).
const msg = String(e?.message ?? '');
if (!msg.includes('already exists')) {
// Best-effort — continue without index, queryFTS will fall back to [].
if (msg.includes('already exists')) {
ensuredPoolFTS.add(key);
} else {
console.warn(
`[gitnexus] FTS index ensure failed for repo "${repoId}" table "${table}" ` +
`(index "${indexName}"): ${msg || e}. Will retry on next query.`,
);
}
}
ensuredPoolFTS.add(key);
}

/**
Expand Down Expand Up @@ -131,7 +179,13 @@ export const searchFTSFromLbug = async (
// Use MCP connection pool via dynamic import
// IMPORTANT: FTS queries run sequentially to avoid connection contention.
// The MCP pool supports multiple connections, but FTS is best run serially.
const { executeQuery } = await import('../lbug/pool-adapter.js');
const poolMod = await import('../lbug/pool-adapter.js');
const { executeQuery, addPoolCloseListener } = poolMod;
// Register the pool-close listener lazily on first use so a teardown of
// the pool entry (LRU eviction, idle timeout, explicit close) drops the
// matching `ensuredPoolFTS` entries. Without this, stale ensure-state
// can outlive the pool that produced it.
registerPoolCloseListenerOnce(addPoolCloseListener);
const executor = (cypher: string) => executeQuery(repoId, cypher);

// Lazy-create FTS indexes on first query for this repo (analyze no longer
Expand Down
144 changes: 143 additions & 1 deletion gitnexus/test/unit/bm25-search.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { searchFTSFromLbug, type BM25SearchResult } from '../../src/core/search/bm25-index.js';
import {
searchFTSFromLbug,
invalidateEnsuredFTSForRepo,
type BM25SearchResult,
} from '../../src/core/search/bm25-index.js';

vi.mock('../../src/core/lbug/lbug-adapter.js', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../src/core/lbug/lbug-adapter.js')>();
Expand All @@ -9,6 +13,22 @@ vi.mock('../../src/core/lbug/lbug-adapter.js', async (importOriginal) => {
};
});

// Pool adapter is dynamically imported by the MCP-pool path of
// `searchFTSFromLbug`. We mock it so we can drive the executor and the
// pool-close listener without spinning up a real LadybugDB pool.
const poolCloseListeners: Array<(repoId: string) => void> = [];
const mockExecuteQuery = vi.fn();
vi.mock('../../src/core/lbug/pool-adapter.js', () => ({
executeQuery: (repoId: string, cypher: string) => mockExecuteQuery(repoId, cypher),
addPoolCloseListener: (listener: (repoId: string) => void) => {
poolCloseListeners.push(listener);
return () => {
const idx = poolCloseListeners.indexOf(listener);
if (idx !== -1) poolCloseListeners.splice(idx, 1);
};
},
}));

describe('BM25 search', () => {
describe('searchFTSFromLbug', () => {
it('returns empty array when LadybugDB is not initialized', async () => {
Expand Down Expand Up @@ -169,4 +189,126 @@ describe('BM25 search', () => {
expect(results[1].rank).toBe(2);
});
});

describe('ensureFTS cache (MCP pool path)', () => {
const REPO = 'test-repo-fts-cache';

beforeEach(() => {
// Clean state so cases don't bleed into each other.
mockExecuteQuery.mockReset();
invalidateEnsuredFTSForRepo(REPO);
// Suppress the surfaced warn so test output stays readable.
vi.spyOn(console, 'warn').mockImplementation(() => {});
});

it('does NOT cache a transient CREATE_FTS_INDEX failure — second call retries', async () => {
// First call: every CREATE_FTS_INDEX fails transiently; QUERY_FTS_INDEX returns nothing.
mockExecuteQuery.mockImplementation(async (_repo: string, cypher: string) => {
if (cypher.includes('CREATE_FTS_INDEX')) {
throw new Error('transient lock error: Could not set lock');
}
return [];
});

const r1 = await searchFTSFromLbug('anything', 5, REPO);
expect(Array.isArray(r1)).toBe(true);

const createCallsAfterFirst = mockExecuteQuery.mock.calls.filter((c) =>
String(c[1]).includes('CREATE_FTS_INDEX'),
).length;
// 5 FTS index tables — all five attempted on first call.
expect(createCallsAfterFirst).toBe(5);

// Second call: CREATE succeeds this time. The bug being fixed: if the
// first failure was cached, we'd see ZERO additional CREATE calls.
mockExecuteQuery.mockReset();
mockExecuteQuery.mockResolvedValue([]);

await searchFTSFromLbug('anything', 5, REPO);

const createCallsOnRetry = mockExecuteQuery.mock.calls.filter((c) =>
String(c[1]).includes('CREATE_FTS_INDEX'),
).length;
expect(createCallsOnRetry).toBe(5);
});

it("treats 'already exists' as success and caches it (no retry on second call)", async () => {
mockExecuteQuery.mockImplementation(async (_repo: string, cypher: string) => {
if (cypher.includes('CREATE_FTS_INDEX')) {
throw new Error("Catalog exception: index 'file_fts' already exists");
}
return [];
});

await searchFTSFromLbug('anything', 5, REPO);
mockExecuteQuery.mockReset();
mockExecuteQuery.mockResolvedValue([]);

await searchFTSFromLbug('anything', 5, REPO);

const createCallsOnSecond = mockExecuteQuery.mock.calls.filter((c) =>
String(c[1]).includes('CREATE_FTS_INDEX'),
).length;
expect(createCallsOnSecond).toBe(0);
});

it('invalidateEnsuredFTSForRepo drops cached entries so next call re-issues CREATE', async () => {
// Prime the cache with successful creates.
mockExecuteQuery.mockResolvedValue([]);
await searchFTSFromLbug('anything', 5, REPO);

mockExecuteQuery.mockReset();
mockExecuteQuery.mockResolvedValue([]);

// Without invalidation: no re-CREATE.
await searchFTSFromLbug('anything', 5, REPO);
expect(
mockExecuteQuery.mock.calls.filter((c) => String(c[1]).includes('CREATE_FTS_INDEX')).length,
).toBe(0);

// After invalidation: next call re-issues CREATE for all 5 tables.
invalidateEnsuredFTSForRepo(REPO);
mockExecuteQuery.mockReset();
mockExecuteQuery.mockResolvedValue([]);
await searchFTSFromLbug('anything', 5, REPO);
expect(
mockExecuteQuery.mock.calls.filter((c) => String(c[1]).includes('CREATE_FTS_INDEX')).length,
).toBe(5);
});

it('a pool-close listener fired by the pool adapter invalidates this repo only', async () => {
const OTHER = 'other-repo';

mockExecuteQuery.mockResolvedValue([]);
// Prime both repos.
await searchFTSFromLbug('anything', 5, REPO);
await searchFTSFromLbug('anything', 5, OTHER);

// Confirm at least one listener was registered by the search module.
expect(poolCloseListeners.length).toBeGreaterThanOrEqual(1);

// Simulate the pool adapter closing REPO.
for (const l of poolCloseListeners) l(REPO);

mockExecuteQuery.mockReset();
mockExecuteQuery.mockResolvedValue([]);

await searchFTSFromLbug('anything', 5, REPO);
const createForRepo = mockExecuteQuery.mock.calls.filter(
(c) => c[0] === REPO && String(c[1]).includes('CREATE_FTS_INDEX'),
).length;
expect(createForRepo).toBe(5);

// OTHER repo's cache must remain intact — no re-CREATE for it.
mockExecuteQuery.mockReset();
mockExecuteQuery.mockResolvedValue([]);
await searchFTSFromLbug('anything', 5, OTHER);
const createForOther = mockExecuteQuery.mock.calls.filter(
(c) => c[0] === OTHER && String(c[1]).includes('CREATE_FTS_INDEX'),
).length;
expect(createForOther).toBe(0);

invalidateEnsuredFTSForRepo(OTHER);
});
});
});
Loading