Skip to content

Commit

Permalink
Wait for the race condition of parsing before closing the readable st…
Browse files Browse the repository at this point in the history
…ream controller
  • Loading branch information
nichochar committed Oct 31, 2024
1 parent 51d5265 commit b0ce4b0
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions packages/api/ai/plan-parser.mts
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,28 @@ export function getPackagesToInstall(plan: Plan): string[] {
)
.flatMap((action) => action.packages);
}

export async function streamParsePlan(
stream: AsyncIterable<string>,
app: DBAppType,
_query: string,
planId: string,
) {
let parser: StreamingXMLParser;
const parsePromises: Promise<void>[] = [];

return new ReadableStream({
async pull(controller) {
if (parser === undefined) {
parser = new StreamingXMLParser({
async onTag(tag) {
if (tag.name === 'planDescription' || tag.name === 'action') {
const chunk = await toStreamingChunk(app, tag, planId);
if (chunk) {
controller.enqueue(JSON.stringify(chunk) + '\n');
}
const promise = (async () => {
const chunk = await toStreamingChunk(app, tag, planId);
if (chunk) {
controller.enqueue(JSON.stringify(chunk) + '\n');
}
})();
parsePromises.push(promise);
}
},
});
Expand All @@ -197,6 +200,8 @@ export async function streamParsePlan(
for await (const chunk of stream) {
parser.parse(chunk);
}
// Wait for all pending parse operations to complete before closing
await Promise.all(parsePromises);
controller.close();
} catch (error) {
console.error(error);
Expand Down

0 comments on commit b0ce4b0

Please sign in to comment.