From b0ce4b0b6dd9347b40f7e47e695b820041531925 Mon Sep 17 00:00:00 2001 From: Nicholas Charriere Date: Thu, 31 Oct 2024 15:32:15 -0700 Subject: [PATCH] Wait for the race condition of parsing before closing the readable stream controller --- packages/api/ai/plan-parser.mts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/packages/api/ai/plan-parser.mts b/packages/api/ai/plan-parser.mts index 5e54fae6..ac00e33f 100644 --- a/packages/api/ai/plan-parser.mts +++ b/packages/api/ai/plan-parser.mts @@ -169,7 +169,6 @@ export function getPackagesToInstall(plan: Plan): string[] { ) .flatMap((action) => action.packages); } - export async function streamParsePlan( stream: AsyncIterable, app: DBAppType, @@ -177,6 +176,7 @@ export async function streamParsePlan( planId: string, ) { let parser: StreamingXMLParser; + const parsePromises: Promise[] = []; return new ReadableStream({ async pull(controller) { @@ -184,10 +184,13 @@ export async function streamParsePlan( parser = new StreamingXMLParser({ async onTag(tag) { if (tag.name === 'planDescription' || tag.name === 'action') { - const chunk = await toStreamingChunk(app, tag, planId); - if (chunk) { - controller.enqueue(JSON.stringify(chunk) + '\n'); - } + const promise = (async () => { + const chunk = await toStreamingChunk(app, tag, planId); + if (chunk) { + controller.enqueue(JSON.stringify(chunk) + '\n'); + } + })(); + parsePromises.push(promise); } }, }); @@ -197,6 +200,8 @@ export async function streamParsePlan( for await (const chunk of stream) { parser.parse(chunk); } + // Wait for all pending parse operations to complete before closing + await Promise.all(parsePromises); controller.close(); } catch (error) { console.error(error);