Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion gitnexus-shared/src/graph/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ export type RelationshipType =
| 'HANDLES_TOOL'
| 'ENTRY_POINT_OF'
| 'WRAPS'
| 'QUERIES';
| 'QUERIES'
| 'ENQUEUES'
| 'PROCESSES';

export interface GraphNode {
id: string;
Expand Down
2 changes: 2 additions & 0 deletions gitnexus-shared/src/lbug/schema-constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ export const REL_TYPES = [
'ENTRY_POINT_OF',
'WRAPS',
'QUERIES',
'ENQUEUES',
'PROCESSES',
] as const;

export type RelType = (typeof REL_TYPES)[number];
Expand Down
18 changes: 18 additions & 0 deletions gitnexus/src/core/ingestion/call-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import type {
ExtractedAssignment,
ExtractedRoute,
ExtractedFetchCall,
ExtractedQueuePattern,
FileConstructorBindings,
} from './workers/parse-worker.js';
import { normalizeFetchURL, routeMatches } from './route-extractors/nextjs.js';
Expand Down Expand Up @@ -3300,3 +3301,20 @@ export const extractFetchCallsFromFiles = async (

return result;
};

export const processQueuePatterns = (graph: KnowledgeGraph, patterns: ExtractedQueuePattern[]): { queuesCreated: number; edgesCreated: number } => {
if (patterns.length === 0) return { queuesCreated: 0, edgesCreated: 0 };
const byQueue = new Map<string, ExtractedQueuePattern[]>();
for (const p of patterns) { const e = byQueue.get(p.queueName); if (e) e.push(p); else byQueue.set(p.queueName, [p]); }
let queuesCreated = 0, edgesCreated = 0;
for (const [qn, qp] of byQueue) {
const qid = generateId('CodeElement', 'queue:' + qn);
if (!graph.getNode(qid)) { graph.addNode({ id: qid, label: 'CodeElement', properties: { name: qn, filePath: qp[0].filePath, description: 'Queue: ' + qn } }); queuesCreated++; }
for (const pt of qp) {
const fid = generateId('File', pt.filePath), rt = (pt.role === 'producer' || pt.role === 'workflow') ? 'ENQUEUES' : 'PROCESSES';
graph.addRelationship({ id: generateId(rt, fid+'->'+qid+':'+pt.lineNumber), sourceId: fid, targetId: qid, type: rt, confidence: 0.9, reason: pt.role+'-'+(pt.method ?? 'handler') });
edgesCreated++;
}
}
return { queuesCreated, edgesCreated };
};
6 changes: 6 additions & 0 deletions gitnexus/src/core/ingestion/parsing-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import type {
FileConstructorBindings,
FileScopeBindings,
ExtractedORMQuery,
ExtractedQueuePattern,
} from './workers/parse-worker.js';
import { getTreeSitterBufferSize, TREE_SITTER_MAX_BUFFER } from './constants.js';

Expand All @@ -60,6 +61,7 @@ export interface WorkerExtractedData {
decoratorRoutes: ExtractedDecoratorRoute[];
toolDefs: ExtractedToolDef[];
ormQueries: ExtractedORMQuery[];
queuePatterns: ExtractedQueuePattern[];
constructorBindings: FileConstructorBindings[];
fileScopeBindings: FileScopeBindings[];
}
Expand Down Expand Up @@ -94,6 +96,7 @@ const processParsingWithWorkers = async (
decoratorRoutes: [],
toolDefs: [],
ormQueries: [],
queuePatterns: [],
constructorBindings: [],
fileScopeBindings: [],
};
Expand All @@ -118,6 +121,7 @@ const processParsingWithWorkers = async (
const allDecoratorRoutes: ExtractedDecoratorRoute[] = [];
const allToolDefs: ExtractedToolDef[] = [];
const allORMQueries: ExtractedORMQuery[] = [];
const allQueuePatterns: ExtractedQueuePattern[] = [];
const allConstructorBindings: FileConstructorBindings[] = [];
const fileScopeBindingsByFile: FileScopeBindings[] = [];
for (const result of chunkResults) {
Expand Down Expand Up @@ -154,6 +158,7 @@ const processParsingWithWorkers = async (
for (const item of result.decoratorRoutes) allDecoratorRoutes.push(item);
for (const item of result.toolDefs) allToolDefs.push(item);
if (result.ormQueries) for (const item of result.ormQueries) allORMQueries.push(item);
if (result.queuePatterns) for (const item of result.queuePatterns) allQueuePatterns.push(item);
for (const item of result.constructorBindings) allConstructorBindings.push(item);
if (result.fileScopeBindings)
for (const item of result.fileScopeBindings) fileScopeBindingsByFile.push(item);
Expand Down Expand Up @@ -185,6 +190,7 @@ const processParsingWithWorkers = async (
decoratorRoutes: allDecoratorRoutes,
toolDefs: allToolDefs,
ormQueries: allORMQueries,
queuePatterns: allQueuePatterns,
constructorBindings: allConstructorBindings,
fileScopeBindings: fileScopeBindingsByFile,
};
Expand Down
1 change: 1 addition & 0 deletions gitnexus/src/core/ingestion/pipeline-phases/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export { parsePhase, type ParseOutput } from './parse.js';
export { routesPhase, type RoutesOutput, type RouteEntry } from './routes.js';
export { toolsPhase, type ToolsOutput, type ToolDef } from './tools.js';
export { ormPhase, type ORMOutput } from './orm.js';
export { queuesPhase, type QueuesOutput } from './queues.js';
export { crossFilePhase, type CrossFileOutput } from './cross-file.js';
export { mroPhase, type MROOutput } from './mro.js';
export { communitiesPhase, type CommunitiesOutput } from './communities.js';
Expand Down
9 changes: 9 additions & 0 deletions gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import type {
ExtractedDecoratorRoute,
ExtractedFetchCall,
ExtractedORMQuery,
ExtractedQueuePattern,
ExtractedRoute,
ExtractedToolDef,
FileConstructorBindings,
Expand All @@ -68,6 +69,7 @@ import { fileURLToPath, pathToFileURL } from 'node:url';
import { isDev } from '../utils/env.js';
import { synthesizeWildcardImportBindings, needsSynthesis } from './wildcard-synthesis.js';
import { extractORMQueriesInline } from './orm-extraction.js';
import { extractQueuePatternsInline } from './queue-extraction.js';

// ── Constants ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -106,6 +108,7 @@ export async function runChunkedParseAndResolve(
allDecoratorRoutes: ExtractedDecoratorRoute[];
allToolDefs: ExtractedToolDef[];
allORMQueries: ExtractedORMQuery[];
allQueuePatterns: ExtractedQueuePattern[];
bindingAccumulator: BindingAccumulator;
resolutionContext: ReturnType<typeof createResolutionContext>;
usedWorkerPool: boolean;
Expand Down Expand Up @@ -248,6 +251,7 @@ export async function runChunkedParseAndResolve(
const allDecoratorRoutes: ExtractedDecoratorRoute[] = [];
const allToolDefs: ExtractedToolDef[] = [];
const allORMQueries: ExtractedORMQuery[] = [];
const allQueuePatterns: ExtractedQueuePattern[] = [];
const deferredWorkerCalls: ExtractedCall[] = [];
const deferredWorkerHeritage: ExtractedHeritage[] = [];
const deferredConstructorBindings: FileConstructorBindings[] = [];
Expand Down Expand Up @@ -393,6 +397,9 @@ export async function runChunkedParseAndResolve(
if (chunkWorkerData.ormQueries?.length) {
for (const item of chunkWorkerData.ormQueries) allORMQueries.push(item);
}
if (chunkWorkerData.queuePatterns?.length) {
for (const item of chunkWorkerData.queuePatterns) allQueuePatterns.push(item);
}
} else {
await processImports(graph, chunkFiles, astCache, ctx, undefined, repoPath, allPaths);
sequentialChunkPaths.push(chunkPaths);
Expand Down Expand Up @@ -509,6 +516,7 @@ export async function runChunkedParseAndResolve(
}
for (const f of chunkFiles) {
extractORMQueriesInline(f.path, f.content, allORMQueries);
extractQueuePatternsInline(f.path, f.content, allQueuePatterns);
}
astCache.clear();
cachedSequentialChunkFiles[chunkIdx] = [];
Expand Down Expand Up @@ -589,6 +597,7 @@ export async function runChunkedParseAndResolve(
allDecoratorRoutes,
allToolDefs,
allORMQueries,
allQueuePatterns,
bindingAccumulator,
resolutionContext: ctx,
// Whether a worker pool was actually live for this run. False means the
Expand Down
2 changes: 2 additions & 0 deletions gitnexus/src/core/ingestion/pipeline-phases/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import type {
ExtractedDecoratorRoute,
ExtractedToolDef,
ExtractedORMQuery,
ExtractedQueuePattern,
} from '../workers/parse-worker.js';
import type { createResolutionContext } from '../model/resolution-context.js';
import { runChunkedParseAndResolve } from './parse-impl.js';
Expand All @@ -47,6 +48,7 @@ export interface ParseOutput {
readonly allDecoratorRoutes: readonly ExtractedDecoratorRoute[];
readonly allToolDefs: readonly ExtractedToolDef[];
readonly allORMQueries: readonly ExtractedORMQuery[];
readonly allQueuePatterns: readonly ExtractedQueuePattern[];
bindingAccumulator: BindingAccumulator;
/** Resolution context from the parse phase — carries importMap, namedImportMap, etc. */
resolutionContext: ReturnType<typeof createResolutionContext>;
Expand Down
97 changes: 97 additions & 0 deletions gitnexus/src/core/ingestion/pipeline-phases/queue-extraction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Inline queue pattern extraction (sequential fallback path).
*
* Extracts BullMQ and Temporal queue patterns from source content using
* regex patterns. Used by the sequential parse path when workers are
* not available — the worker path extracts queue patterns via
* extractQueuePatterns in parse-worker.ts instead.
*
* @module
*/

import type { ExtractedQueuePattern } from '../workers/parse-worker.js';

// ── Regex patterns ─────────────────────────────────────────────────────────

const BULLMQ_ADD_RE = /(\w+)\.(add|addBulk)\s*\(/g;
const BULLMQ_WORKER_RE = /new\s+Worker\s*\(\s*['\"](\w[\w-]*)['\"]/g;
const TEMPORAL_ACTIVITY_RE = /activities\.(\w+)\s*\(/g;
const TEMPORAL_WORKFLOW_START_RE = /client\.workflow\.(start|execute)\s*\(\s*(\w+)/g;

// ── Extraction function ───────────────────────────────────────────────────

/**
* Extract BullMQ and Temporal queue patterns from file content using regex.
*
* Fast-path: skips files that don't contain queue-related markers.
* Results are appended to the `out` array (push pattern avoids allocation).
*
* @param filePath Relative path of the source file
* @param content File content string
* @param out Output array to append extracted patterns to
*/
export function extractQueuePatternsInline(
filePath: string,
content: string,
out: ExtractedQueuePattern[],
): void {
const hasBullMQ = content.includes('new Queue') || content.includes('new Worker');
const hasTemporal = content.includes('activities.') || content.includes('client.workflow.');
if (!hasBullMQ && !hasTemporal) return;

if (hasBullMQ) {
const queueVarMap = new Map<string, string>();
const assignRe = /(?:const|let|var)\s+(\w+)\s*=\s*new\s+Queue\s*\(\s*['\"](\w[\w-]*)['\"]/g;
assignRe.lastIndex = 0;
let m;
while ((m = assignRe.exec(content)) !== null) {
queueVarMap.set(m[1], m[2]);
}
BULLMQ_ADD_RE.lastIndex = 0;
while ((m = BULLMQ_ADD_RE.exec(content)) !== null) {
const qn = queueVarMap.get(m[1]);
if (qn) {
out.push({
filePath,
role: 'producer',
queueName: qn,
method: m[2],
lineNumber: content.substring(0, m.index).split('\n').length - 1,
});
}
}
BULLMQ_WORKER_RE.lastIndex = 0;
while ((m = BULLMQ_WORKER_RE.exec(content)) !== null) {
out.push({
filePath,
role: 'consumer',
queueName: m[1],
lineNumber: content.substring(0, m.index).split('\n').length - 1,
});
}
}

if (hasTemporal) {
let m;
TEMPORAL_ACTIVITY_RE.lastIndex = 0;
while ((m = TEMPORAL_ACTIVITY_RE.exec(content)) !== null) {
out.push({
filePath,
role: 'activity',
queueName: m[1],
handlerName: m[1],
lineNumber: content.substring(0, m.index).split('\n').length - 1,
});
}
TEMPORAL_WORKFLOW_START_RE.lastIndex = 0;
while ((m = TEMPORAL_WORKFLOW_START_RE.exec(content)) !== null) {
out.push({
filePath,
role: 'workflow',
queueName: m[2],
method: m[1],
lineNumber: content.substring(0, m.index).split('\n').length - 1,
});
}
}
}
92 changes: 92 additions & 0 deletions gitnexus/src/core/ingestion/pipeline-phases/queues.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Phase: queues
*
* Processes async queue patterns (BullMQ + Temporal) and creates
* ENQUEUES / PROCESSES edges and Queue CodeElement nodes.
*
* @deps parse
* @reads allQueuePatterns (from parse)
* @writes graph (CodeElement nodes, ENQUEUES/PROCESSES edges)
*/

import type { PipelinePhase, PipelineContext, PhaseResult } from './types.js';
import { getPhaseOutput } from './types.js';
import type { ParseOutput } from './parse.js';
import { generateId } from '../../../lib/utils.js';
import type { ExtractedQueuePattern } from '../workers/parse-worker.js';
import type { KnowledgeGraph } from '../../graph/types.js';
import { isDev } from '../utils/env.js';

export interface QueuesOutput {
queuesCreated: number;
edgesCreated: number;
}

export const queuesPhase: PipelinePhase<QueuesOutput> = {
name: 'queues',
deps: ['parse'],

async execute(
ctx: PipelineContext,
deps: ReadonlyMap<string, PhaseResult<unknown>>,
): Promise<QueuesOutput> {
const { allQueuePatterns } = getPhaseOutput<ParseOutput>(deps, 'parse');

if (allQueuePatterns.length === 0) {
return { queuesCreated: 0, edgesCreated: 0 };
}

return processQueuePatterns(ctx.graph, allQueuePatterns);
},
};

function processQueuePatterns(
graph: KnowledgeGraph,
patterns: readonly ExtractedQueuePattern[],
): QueuesOutput {
const queueNodes = new Map<string, string>();
const seenEdges = new Set<string>();
let edgesCreated = 0;

for (const pt of patterns) {
let queueNodeId = queueNodes.get(pt.queueName);
if (!queueNodeId) {
queueNodeId = generateId('CodeElement', `Queue:${pt.queueName}`);
graph.addNode({
id: queueNodeId,
label: 'CodeElement',
properties: {
name: pt.queueName,
filePath: '',
description: `Queue: ${pt.queueName}`,
},
});
queueNodes.set(pt.queueName, queueNodeId);
}

const edgeType =
pt.role === 'producer' || pt.role === 'workflow' ? 'ENQUEUES' : 'PROCESSES';
const fileId = generateId('File', pt.filePath);
const edgeKey = `${fileId}->${queueNodeId}:${edgeType}`;
if (seenEdges.has(edgeKey)) continue;
seenEdges.add(edgeKey);

graph.addRelationship({
id: generateId(edgeType, edgeKey),
sourceId: fileId,
targetId: queueNodeId,
type: edgeType,
confidence: 0.9,
reason: `queue-${pt.role}`,
});
edgesCreated++;
}

if (isDev) {
console.log(
`Queues: ${edgesCreated} edges (ENQUEUES/PROCESSES), ${queueNodes.size} queue nodes (${patterns.length} total patterns)`,
);
}

return { queuesCreated: queueNodes.size, edgesCreated };
}
2 changes: 2 additions & 0 deletions gitnexus/src/core/ingestion/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
routesPhase,
toolsPhase,
ormPhase,
queuesPhase,
crossFilePhase,
mroPhase,
communitiesPhase,
Expand Down Expand Up @@ -79,6 +80,7 @@ function buildPhaseList(options?: PipelineOptions): PipelinePhase[] {
routesPhase,
toolsPhase,
ormPhase,
queuesPhase,
crossFilePhase,
];

Expand Down
Loading
Loading