diff --git a/gitnexus-web/src/components/QueryFAB.tsx b/gitnexus-web/src/components/QueryFAB.tsx index ccb3c10e92..0b4129103a 100644 --- a/gitnexus-web/src/components/QueryFAB.tsx +++ b/gitnexus-web/src/components/QueryFAB.tsx @@ -26,7 +26,7 @@ const EXAMPLE_QUERIES = [ ]; export const QueryFAB = () => { - const { setHighlightedNodeIds, setQueryResult, queryResult, clearQueryHighlights, graph, runQuery, isDatabaseReady } = useAppState(); + const { setHighlightedNodeIds, setQueryResult, queryResult, clearQueryHighlights, graph, runQuery, isDatabaseReady, generateCypherQuery } = useAppState(); const [isExpanded, setIsExpanded] = useState(false); const [query, setQuery] = useState(''); @@ -34,8 +34,12 @@ export const QueryFAB = () => { const [error, setError] = useState(null); const [showExamples, setShowExamples] = useState(false); const [showResults, setShowResults] = useState(true); + const [showAIInput, setShowAIInput] = useState(false); + const [aiQuestion, setAIQuestion] = useState(''); + const [isGenerating, setIsGenerating] = useState(false); const textareaRef = useRef(null); + const aiInputRef = useRef(null); const panelRef = useRef(null); useEffect(() => { @@ -44,6 +48,12 @@ export const QueryFAB = () => { } }, [isExpanded]); + useEffect(() => { + if (showAIInput && aiInputRef.current) { + aiInputRef.current.focus(); + } + }, [showAIInput]); + useEffect(() => { const handleClickOutside = (e: MouseEvent) => { if (panelRef.current && !panelRef.current.contains(e.target as Node)) { @@ -151,9 +161,28 @@ export const QueryFAB = () => { textareaRef.current?.focus(); }; + const handleGenerateQuery = useCallback(async () => { + if (!aiQuestion.trim() || isGenerating) return; + setIsGenerating(true); + setError(null); + + const result = await generateCypherQuery(aiQuestion.trim()); + + if ('error' in result) { + setError(result.error); + } else { + setQuery(result.query); + setShowAIInput(false); + setAIQuestion(''); + textareaRef.current?.focus(); + } + setIsGenerating(false); + }, [aiQuestion, isGenerating, generateCypherQuery]); + const handleClose = () => { setIsExpanded(false); setShowExamples(false); + setShowAIInput(false); clearQueryHighlights(); setError(null); }; @@ -244,46 +273,103 @@ export const QueryFAB = () => { /> -
-
+ {showAIInput && ( +
+ setAIQuestion(e.target.value)} + onKeyDown={(e) => { + if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); handleGenerateQuery(); } + if (e.key === 'Escape') { setShowAIInput(false); setAIQuestion(''); } + }} + placeholder="e.g. find all classes that extend BaseService" + disabled={isGenerating} + className=" + flex-1 px-3 py-1.5 + bg-surface border border-purple-500/30 rounded-lg + text-sm text-text-primary + placeholder:text-text-muted + focus:border-purple-500/50 focus:ring-2 focus:ring-purple-500/20 + outline-none + disabled:opacity-50 + transition-all + " + /> +
+ )} + +
+
+
+ + + {showExamples && ( +
+ {EXAMPLE_QUERIES.map((example) => ( + + ))} +
+ )} +
+ + - - {showExamples && ( -
- {EXAMPLE_QUERIES.map((example) => ( - - ))} -
- )}
diff --git a/gitnexus-web/src/core/llm/agent.ts b/gitnexus-web/src/core/llm/agent.ts index c2776559ff..2829a7429e 100644 --- a/gitnexus-web/src/core/llm/agent.ts +++ b/gitnexus-web/src/core/llm/agent.ts @@ -261,7 +261,7 @@ export const createChatModel = (config: ProviderConfig): BaseChatModel => { maxTokens: bedrockConfig.maxTokens, streaming: true, proxyBaseUrl: bedrockConfig.proxyBaseUrl, - }) as unknown as BaseChatModel; + }) as BaseChatModel; } default: diff --git a/gitnexus-web/src/core/llm/bedrock-browser.ts b/gitnexus-web/src/core/llm/bedrock-browser.ts index 9002488664..999b78e53d 100644 --- a/gitnexus-web/src/core/llm/bedrock-browser.ts +++ b/gitnexus-web/src/core/llm/bedrock-browser.ts @@ -56,22 +56,22 @@ interface BedrockTool { // ─── Message format conversion ────────────────────────────────────────────── function toBedrockMessages(messages: BaseMessage[]): { - system: Array<{ type: 'text'; text: string }>; + system: Array<{ text: string }>; messages: any[]; } { - const system: Array<{ type: 'text'; text: string }> = []; + const system: Array<{ text: string }> = []; const out: any[] = []; for (const msg of messages) { const type = msg._getType(); if (type === 'system') { - system.push({ type: 'text', text: String(msg.content) }); + system.push({ text: String(msg.content) }); continue; } if (type === 'human') { - const textBlock = { type: 'text' as const, text: String(msg.content) }; + const textBlock = { text: String(msg.content) }; const last = out[out.length - 1]; if (last?.role === 'user') { // Merge into the previous user turn — Bedrock requires strictly alternating roles. @@ -89,16 +89,17 @@ function toBedrockMessages(messages: BaseMessage[]): { // Text part if (ai.content && typeof ai.content === 'string' && ai.content.trim()) { - content.push({ type: 'text', text: ai.content }); + content.push({ text: ai.content }); } - // Tool calls → toolUse blocks + // Tool calls → nested toolUse blocks (Converse API format) for (const tc of ai.tool_calls ?? []) { content.push({ - type: 'toolUse', - toolUseId: tc.id, - name: tc.name, - input: tc.args, + toolUse: { + toolUseId: tc.id, + name: tc.name, + input: tc.args, + }, }); } @@ -110,12 +111,13 @@ function toBedrockMessages(messages: BaseMessage[]): { if (type === 'tool') { const tm = msg as ToolMessage; - // Bedrock expects tool results as a user message with toolResult blocks + // Bedrock expects tool results as a user message with nested toolResult blocks const last = out[out.length - 1]; const resultBlock = { - type: 'toolResult', - toolUseId: tm.tool_call_id, - content: [{ type: 'text', text: String(tm.content) }], + toolResult: { + toolUseId: tm.tool_call_id, + content: [{ text: String(tm.content) }], + }, }; if (last?.role === 'user') { last.content.push(resultBlock); @@ -147,12 +149,13 @@ function fromBedrockMessage(msg: any): AIMessage { const tool_calls: any[] = []; for (const block of msg.content ?? []) { - if (block.type === 'text') text += block.text; - if (block.type === 'toolUse') { + // Converse API uses nested format: { text: '...' } or { toolUse: { ... } } + if (block.text) text += block.text; + if (block.toolUse) { tool_calls.push({ - id: block.toolUseId, - name: block.name, - args: block.input, + id: block.toolUse.toolUseId, + name: block.toolUse.name, + args: block.toolUse.input, type: 'tool_call' as const, }); } @@ -262,15 +265,31 @@ async function* parseNDJSON( for (const line of lines) { if (line.trim()) { try { - yield JSON.parse(line); - } catch { /* skip malformed line */ } + const parsed = JSON.parse(line); + // Handle error frames forwarded by the proxy server + if (parsed.__error) { + throw new Error(`Bedrock: ${parsed.__error.message || parsed.__error.type || 'stream error'}`); + } + yield parsed; + } catch (e) { + // Re-throw Bedrock errors, skip malformed JSON + if (e instanceof Error && e.message.startsWith('Bedrock:')) throw e; + } } } } // Process any remaining data if (buffer.trim()) { - try { yield JSON.parse(buffer); } catch { /* skip */ } + try { + const parsed = JSON.parse(buffer); + if (parsed.__error) { + throw new Error(`Bedrock: ${parsed.__error.message || parsed.__error.type || 'stream error'}`); + } + yield parsed; + } catch (e) { + if (e instanceof Error && e.message.startsWith('Bedrock:')) throw e; + } } } finally { reader.releaseLock(); @@ -285,7 +304,6 @@ export class ChatBedrockBrowser extends BaseChatModel { private modelId: string; private temperature: number; private maxTokens?: number; - private boundTools?: BedrockTool[]; private credentials: BedrockCredentials; private proxyBaseUrl?: string; readonly streaming: boolean; @@ -312,13 +330,11 @@ export class ChatBedrockBrowser extends BaseChatModel { return 'bedrock'; } - bindTools(tools: StructuredToolInterface[]): this { - const bound = Object.create(this) as this; - (bound as any).boundTools = toBedrockTools(tools); - return bound; + bindTools(tools: StructuredToolInterface[]) { + return this.withConfig({ tools: toBedrockTools(tools) } as any); } - private buildBody(messages: BaseMessage[]): string { + private buildBody(messages: BaseMessage[], tools?: BedrockTool[]): string { const { system, messages: bedrockMessages } = toBedrockMessages(messages); const body: any = { messages: bedrockMessages, @@ -328,31 +344,44 @@ export class ChatBedrockBrowser extends BaseChatModel { }, }; if (system.length > 0) body.system = system; - if (this.boundTools && this.boundTools.length > 0) { - body.toolConfig = { tools: this.boundTools }; + if (tools && tools.length > 0) { + body.toolConfig = { tools }; } return JSON.stringify(body); } private async proxyFetch(endpoint: 'converse' | 'converse-stream', bodyJson: string): Promise { - return fetch(`${this.proxyBaseUrl}/api/bedrock/${endpoint}`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - region: this.region, - credentials: this.credentials, - model: this.modelId, - body: JSON.parse(bodyJson), - }), - }); + // Timeout for getting initial response headers (not the stream body) + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 180_000); // 3 minutes + try { + const resp = await fetch(`${this.proxyBaseUrl}/api/bedrock/${endpoint}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + region: this.region, + credentials: this.credentials, + model: this.modelId, + body: JSON.parse(bodyJson), + }), + signal: controller.signal, + }); + clearTimeout(timeout); + return resp; + } catch (err: any) { + clearTimeout(timeout); + if (err.name === 'AbortError') throw new Error('Bedrock proxy request timed out'); + throw err; + } } async _generate( messages: BaseMessage[], - _options: BaseChatModelCallOptions, + options: BaseChatModelCallOptions, _runManager?: CallbackManagerForLLMRun ): Promise { - const bodyJson = this.buildBody(messages); + const tools = (options as any).tools as BedrockTool[] | undefined; + const bodyJson = this.buildBody(messages, tools); let resp: Response; if (this.proxyBaseUrl) { @@ -382,10 +411,11 @@ export class ChatBedrockBrowser extends BaseChatModel { async *_streamResponseChunks( messages: BaseMessage[], - _options: BaseChatModelCallOptions, + options: BaseChatModelCallOptions, runManager?: CallbackManagerForLLMRun ): AsyncGenerator { - const bodyJson = this.buildBody(messages); + const tools = (options as any).tools as BedrockTool[] | undefined; + const bodyJson = this.buildBody(messages, tools); let resp: Response; if (this.proxyBaseUrl) { @@ -416,7 +446,7 @@ export class ChatBedrockBrowser extends BaseChatModel { if (event.contentBlockDelta?.delta?.text) { const text: string = event.contentBlockDelta.delta.text; await runManager?.handleLLMNewToken(text); - yield new ChatGenerationChunk({ text, message: new AIMessageChunk(text) }); + yield new ChatGenerationChunk({ text, message: new AIMessageChunk({ content: text }) }); } else if (event.contentBlockStart?.start?.toolUse) { const { contentBlockIndex } = event.contentBlockStart; @@ -433,13 +463,21 @@ export class ChatBedrockBrowser extends BaseChatModel { const idx = event.contentBlockStop.contentBlockIndex; const acc = toolAccumulator[idx]; if (acc) { - let args: Record = {}; - try { args = JSON.parse(acc.input); } catch { /* ignore */ } - const tool_calls = [{ id: acc.id, name: acc.name, args, type: 'tool_call' as const }]; - yield new ChatGenerationChunk({ + // Use tool_call_chunks (not tool_calls) so AIMessageChunk.concat() preserves them. + // tool_call_chunks use string args; the constructor converts to parsed tool_calls. + const tool_call_chunks = [{ + id: acc.id, + name: acc.name, + args: acc.input, + index: idx, + type: 'tool_call_chunk' as const, + }]; + const toolChunk = new ChatGenerationChunk({ text: '', - message: new AIMessageChunk({ content: '', tool_calls }), + message: new AIMessageChunk({ content: '', tool_call_chunks }), }); + await runManager?.handleLLMNewToken('', undefined, undefined, undefined, undefined, { chunk: toolChunk }); + yield toolChunk; delete toolAccumulator[idx]; } diff --git a/gitnexus-web/src/hooks/useAppState.tsx b/gitnexus-web/src/hooks/useAppState.tsx index db5d4ef4d8..75510bd30f 100644 --- a/gitnexus-web/src/hooks/useAppState.tsx +++ b/gitnexus-web/src/hooks/useAppState.tsx @@ -160,6 +160,7 @@ interface AppState { sendChatMessage: (message: string) => Promise; stopChatResponse: () => void; clearChat: () => void; + generateCypherQuery: (question: string) => Promise<{ query: string; explanation: string } | { error: string }>; // Code References Panel codeReferences: CodeReference[]; @@ -585,8 +586,11 @@ export const AppStateProvider = ({ children }: { children: ReactNode }) => { return; } - // Resolve effective backend URL (explicit override takes priority over state) - const effectiveBackendUrl = overrideBackendUrl ?? serverBaseUrl ?? undefined; + // Resolve effective backend URL (explicit override takes priority over state). + // normalizeServerUrl() appends "/api" for server-connection.ts calls (e.g. baseUrl + "/repos"), + // but the worker constructs full paths itself (e.g. backendUrl + "/api/search"), + // so we must strip the trailing "/api" to avoid double-prefix "/api/api/...". + const effectiveBackendUrl = (overrideBackendUrl ?? serverBaseUrl ?? '').replace(/\/api$/, '') || undefined; // Bedrock CANNOT be called directly from the browser (AWS blocks CORS). // Always route through the backend proxy. Use the connected server URL if available, @@ -1017,6 +1021,12 @@ export const AppStateProvider = ({ children }: { children: ReactNode }) => { setAgentError(null); }, []); + const generateCypherQuery = useCallback(async (question: string): Promise<{ query: string; explanation: string } | { error: string }> => { + const api = apiRef.current; + if (!api) return { error: 'Worker not initialized' }; + return api.generateCypherQuery(question); + }, []); + // Switch to a different repo on the connected server const switchRepo = useCallback(async (repoName: string) => { if (!serverBaseUrl) return; @@ -1214,6 +1224,7 @@ export const AppStateProvider = ({ children }: { children: ReactNode }) => { sendChatMessage, stopChatResponse, clearChat, + generateCypherQuery, // Code References Panel codeReferences, isCodePanelOpen, diff --git a/gitnexus-web/src/workers/ingestion.worker.ts b/gitnexus-web/src/workers/ingestion.worker.ts index 2de63ee3de..4b5c121879 100644 --- a/gitnexus-web/src/workers/ingestion.worker.ts +++ b/gitnexus-web/src/workers/ingestion.worker.ts @@ -758,6 +758,96 @@ const workerApi = { currentProviderConfig = null; }, + /** + * Generate a Cypher query from a natural language question using the current LLM + */ + async generateCypherQuery(naturalLanguage: string): Promise<{ query: string; explanation: string } | { error: string }> { + if (!currentProviderConfig) { + return { error: 'No LLM provider configured. Please configure one in Settings.' }; + } + + try { + const model = createChatModel(currentProviderConfig); + + const prompt = `You are a Cypher query generator for a code knowledge graph. + +## Graph Schema +Nodes: File, Folder, Function, Class, Interface, Method, Community, Process +- All nodes have: id (string), name (string), filePath (string) +- Function/Method/Class also have: startLine (int), endLine (int) +- Community has: heuristicLabel, cohesion, symbolCount, description, keywords + +Relations: CodeRelation with \`type\` property: +- CONTAINS: Folder→File, File→Function/Class/Method +- DEFINES: File→Function/Class/Interface +- IMPORTS: File→File +- CALLS: Function/Method→Function/Method (includes constructor injection) +- EXTENDS: Class→Class +- IMPLEMENTS: Class→Interface +- MEMBER_OF: Function/Class→Community +- STEP_IN_PROCESS: Function/Method→Process + +## Example Queries +- All functions: MATCH (n:Function) RETURN n.name, n.filePath LIMIT 50 +- Function calls: MATCH (a)-[r:CodeRelation {type: 'CALLS'}]->(b) RETURN a.name AS caller, b.name AS callee LIMIT 50 +- Import deps: MATCH (a:File)-[r:CodeRelation {type: 'IMPORTS'}]->(b:File) RETURN a.name AS from, b.name AS imports LIMIT 50 +- Classes extending: MATCH (a:Class)-[r:CodeRelation {type: 'EXTENDS'}]->(b:Class) RETURN a.name AS child, b.name AS parent +- Find by name: MATCH (n) WHERE n.name CONTAINS 'Auth' RETURN labels(n)[0] AS type, n.name, n.filePath + +## Rules +- Return ONLY valid Cypher. No markdown fences. +- Use CodeRelation with {type: '...'} for edge filtering. +- Default LIMIT 50 unless the user asks for more. +- First line: the Cypher query +- Second line: empty +- Third line onwards: brief explanation (1-2 sentences) + +## Question +${naturalLanguage}`; + + const response = await model.invoke([ + { role: 'user', content: prompt }, + ]); + + const text = typeof response.content === 'string' + ? response.content + : Array.isArray(response.content) + ? response.content.filter((b: any) => typeof b === 'string' || b.type === 'text').map((b: any) => typeof b === 'string' ? b : b.text).join('') + : String(response.content); + + // Parse: first non-empty block is query, rest is explanation + const lines = text.trim().split('\n'); + const queryLines: string[] = []; + const explanationLines: string[] = []; + let foundGap = false; + + for (const line of lines) { + // Strip markdown code fences if LLM wraps them anyway + const stripped = line.replace(/^```\w*$/, '').replace(/^```$/, ''); + if (!foundGap && stripped.trim() === '' && queryLines.length > 0) { + foundGap = true; + continue; + } + if (!foundGap) { + if (stripped.trim()) queryLines.push(stripped); + } else { + explanationLines.push(stripped); + } + } + + const query = queryLines.join('\n').trim(); + const explanation = explanationLines.join('\n').trim() || 'Generated Cypher query'; + + if (!query) { + return { error: 'LLM returned empty query. Try rephrasing your question.' }; + } + + return { query, explanation }; + } catch (err: any) { + return { error: err.message || 'Failed to generate query' }; + } + }, + /** * Enrich community clusters using LLM */ diff --git a/gitnexus/src/server/api.ts b/gitnexus/src/server/api.ts index 6a96e85c66..cfedce5419 100644 --- a/gitnexus/src/server/api.ts +++ b/gitnexus/src/server/api.ts @@ -420,6 +420,17 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => /** Streaming Converse proxy — parses AWS Event Stream binary and forwards as NDJSON */ app.post('/api/bedrock/converse-stream', async (req, res) => { + let aborted = false; + let reader: ReadableStreamDefaultReader | null = null; + + // Detect client disconnect — abort the AWS stream immediately + res.on('close', () => { + if (!res.writableEnded) { + aborted = true; + try { reader?.cancel(); } catch { /* already closed */ } + } + }); + try { const { region, credentials, model, body } = req.body; if (!region || !credentials?.accessKeyId || !credentials?.secretAccessKey || !model || !body) { @@ -436,20 +447,29 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => }); const url = `https://bedrock-runtime.${region}.amazonaws.com/model/${encodeURIComponent(model)}/converse-stream`; - const awsResp = await aws.fetch(url, { + + // Timeout for the initial AWS response (model may take time to start generating) + const fetchTimeout = 120_000; // 2 minutes + const awsRespPromise = aws.fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), }); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Bedrock request timed out')), fetchTimeout) + ); + const awsResp = await Promise.race([awsRespPromise, timeoutPromise]) as Response; + + if (aborted) return; if (!awsResp.ok) { const errBody = await awsResp.text(); - res.status(awsResp.status).json({ error: errBody }); + if (!res.headersSent) res.status(awsResp.status).json({ error: errBody }); return; } if (!awsResp.body) { - res.status(502).json({ error: 'No response body from Bedrock' }); + if (!res.headersSent) res.status(502).json({ error: 'No response body from Bedrock' }); return; } @@ -461,18 +481,25 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => res.setHeader('X-Accel-Buffering', 'no'); // disable nginx buffering if present res.flushHeaders(); - const reader = (awsResp.body as ReadableStream).getReader(); + reader = (awsResp.body as ReadableStream).getReader(); const decoder = new TextDecoder(); let buf = new Uint8Array(0); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; + // Timeout for individual chunk reads — if Bedrock goes silent for too long, abort + const CHUNK_TIMEOUT = 120_000; // 2 minutes between chunks - const merged = new Uint8Array(buf.length + value.length); + try { + while (!aborted) { + // Race reader.read() against a timeout + const chunkTimeoutPromise = new Promise<{ done: true; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('Bedrock stream chunk timed out')), CHUNK_TIMEOUT) + ); + const { done, value } = await Promise.race([reader.read(), chunkTimeoutPromise]); + if (done || aborted) break; + + const merged = new Uint8Array(buf.length + value!.length); merged.set(buf); - merged.set(value, buf.length); + merged.set(value!, buf.length); buf = merged; // Parse complete AWS Event Stream frames @@ -480,6 +507,11 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => while (buf.length >= 12) { const view = new DataView(buf.buffer, buf.byteOffset, buf.byteLength); const totalLen = view.getUint32(0); + if (totalLen < 16 || totalLen > 16 * 1024 * 1024) { + // Invalid frame — corrupted stream, skip remaining buffer + buf = new Uint8Array(0); + break; + } if (buf.length < totalLen) break; const headersLen = view.getUint32(4); @@ -487,8 +519,10 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => const payloadStart = 12 + headersLen; const payloadLen = totalLen - headersLen - 16; - // Parse binary headers to extract :event-type + // Parse binary headers to extract :event-type, :message-type, :exception-type let eventType = ''; + let messageType = ''; + let exceptionType = ''; let offset = headersStart; const headersEnd = headersStart + headersLen; while (offset < headersEnd) { @@ -499,6 +533,8 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => const valLen = (buf[offset] << 8) | buf[offset + 1]; offset += 2; const val = decoder.decode(buf.slice(offset, offset + valLen)); offset += valLen; if (name === ':event-type') eventType = val; + else if (name === ':message-type') messageType = val; + else if (name === ':exception-type') exceptionType = val; } else if (valueType === 6) { // bytes const valLen = (buf[offset] << 8) | buf[offset + 1]; offset += 2; offset += valLen; @@ -513,10 +549,19 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => } } - if (payloadLen > 0) { + if (payloadLen > 0 && !aborted) { const payload = buf.slice(payloadStart, payloadStart + payloadLen); try { const data = JSON.parse(decoder.decode(payload)); + + // Handle exception frames — forward as NDJSON error and stop + if (messageType === 'exception' || exceptionType) { + const errMsg = data.message || data.Message || exceptionType || 'Bedrock stream exception'; + res.write(JSON.stringify({ __error: { type: exceptionType || eventType, message: errMsg } }) + '\n'); + aborted = true; + break; + } + // Wrap payload with event type to match SDK format: // {"contentBlockDelta": {"delta": {"text": "..."}, "contentBlockIndex": 0}} const wrapped = eventType ? { [eventType]: data } : data; @@ -528,16 +573,20 @@ export const createServer = async (port: number, host: string = '127.0.0.1') => } } } finally { - reader.releaseLock(); + try { reader.releaseLock(); } catch { /* already released */ } } - res.end(); + if (!res.writableEnded) res.end(); } catch (err: any) { - // If headers already sent, we can't change the status code + if (aborted) return; // client already gone if (!res.headersSent) { res.status(500).json({ error: err.message || 'Bedrock stream failed' }); } else { - res.end(); + // Stream already started — send error as NDJSON so client can see it + try { + res.write(JSON.stringify({ __error: { type: 'proxy_error', message: err.message || 'Bedrock stream failed' } }) + '\n'); + } catch { /* write failed, client gone */ } + if (!res.writableEnded) res.end(); } } });