diff --git a/.changeset/old-buses-pump.md b/.changeset/old-buses-pump.md new file mode 100644 index 00000000000..7b84ca46d49 --- /dev/null +++ b/.changeset/old-buses-pump.md @@ -0,0 +1,5 @@ +--- +"@fuel-ts/account": patch +--- + +feat: generalize subscription event parsing diff --git a/packages/account/src/providers/fuel-graphql-subscriber.ts b/packages/account/src/providers/fuel-graphql-subscriber.ts index acabd3b6c5a..bba03f3910f 100644 --- a/packages/account/src/providers/fuel-graphql-subscriber.ts +++ b/packages/account/src/providers/fuel-graphql-subscriber.ts @@ -34,6 +34,9 @@ export class FuelGraphqlSubscriber implements AsyncIterator { this.stream = response.body!.getReader(); } + private events: Array<{ data: unknown; errors?: { message: string }[] }> = []; + private parsingLeftover = ''; + async next(): Promise> { if (!this.stream) { await this.setStream(); @@ -41,46 +44,56 @@ export class FuelGraphqlSubscriber implements AsyncIterator { // eslint-disable-next-line no-constant-condition while (true) { + if (this.events.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const { data, errors } = this.events.shift()!; + if (Array.isArray(errors)) { + throw new FuelError( + FuelError.CODES.INVALID_REQUEST, + errors.map((err) => err.message).join('\n\n') + ); + } + return { value: data, done: false }; + } const { value, done } = await this.stream.read(); if (done) { return { value, done }; } - const text = FuelGraphqlSubscriber.textDecoder.decode(value); - /** - * We don't care about responses that don't start with 'data:' like keep-alive messages. - * The only responses that I came across from the node are either 200 responses with data or keep-alive messages. + * We don't care about keep-alive messages. + * The only responses that I came across from the node are either 200 responses with "data:.*" or keep-alive messages. * You can find the keep-alive message in the fuel-core codebase (latest permalink as of writing): * https://github.com/FuelLabs/fuel-core/blob/e1e631902f762081d2124d9c457ddfe13ac366dc/crates/fuel-core/src/graphql_api/service.rs#L247 * To get the actual latest info you need to check out the master branch (might change): * https://github.com/FuelLabs/fuel-core/blob/master/crates/fuel-core/src/graphql_api/service.rs#L247 * */ - if (!text.startsWith('data:')) { + const decoded = FuelGraphqlSubscriber.textDecoder + .decode(value) + .replace(':keep-alive-text\n\n', ''); + + if (decoded === '') { // eslint-disable-next-line no-continue continue; } - let data; - let errors; + const text = `${this.parsingLeftover}${decoded}`; + const regex = /data:.*\n\n/g; - try { - ({ data, errors } = JSON.parse(text.replace(/^data:/, ''))); - } catch (e) { - throw new FuelError( - ErrorCode.STREAM_PARSING_ERROR, - `Error while parsing stream data response: ${text}` - ); - } + const matches = [...text.matchAll(regex)].flatMap((match) => match); - if (Array.isArray(errors)) { - throw new FuelError( - FuelError.CODES.INVALID_REQUEST, - errors.map((err) => err.message).join('\n\n') - ); - } + matches.forEach((match) => { + try { + this.events.push(JSON.parse(match.replace(/^data:/, ''))); + } catch (e) { + throw new FuelError( + ErrorCode.STREAM_PARSING_ERROR, + `Error while parsing stream data response: ${text}` + ); + } + }); - return { value: data, done: false }; + this.parsingLeftover = text.replace(matches.join(), ''); } } diff --git a/packages/account/src/providers/provider.test.ts b/packages/account/src/providers/provider.test.ts index 23805c2525f..6d855354051 100644 --- a/packages/account/src/providers/provider.test.ts +++ b/packages/account/src/providers/provider.test.ts @@ -1035,7 +1035,7 @@ describe('Provider', () => { await Promise.all(promises); }); - it('should not throw if the subscription stream data string contains more than one "data:"', async () => { + it('subscriptions: does not throw when stream contains more than one "data:"', async () => { const provider = await Provider.create(FUEL_NETWORK_URL); vi.spyOn(global, 'fetch').mockImplementationOnce(() => { @@ -1070,10 +1070,247 @@ describe('Provider', () => { } }); - it('should throw if the subscription stream data string parsing fails for some reason', async () => { + test('subscriptions: ignores keep-alive messages', async () => { const provider = await Provider.create(FUEL_NETWORK_URL); - const badResponse = 'data: whatever'; + const fetchSpy = vi.spyOn(global, 'fetch'); + + const readableStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + + controller.enqueue( + encoder.encode(`data:${JSON.stringify({ data: { submitAndAwait: { a: 0 } } })}\n\n`) + ); + controller.enqueue(encoder.encode(':keep-alive-text\n\n')); + controller.enqueue( + encoder.encode(`data:${JSON.stringify({ data: { submitAndAwait: { a: 1 } } })}\n\n`) + ); + controller.close(); + }, + }); + fetchSpy.mockImplementationOnce(() => Promise.resolve(new Response(readableStream))); + + let numberOfEvents = 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const { submitAndAwait } of provider.operations.submitAndAwait({ + encodedTransaction: "it's mocked so doesn't matter", + })) { + numberOfEvents += 1; + } + + expect(numberOfEvents).toEqual(2); + }); + + it('subscriptions: does not throw when stream has two events in the same chunk', async () => { + const provider = await Provider.create(FUEL_NETWORK_URL); + + vi.spyOn(global, 'fetch').mockImplementationOnce(() => { + const event1 = { + data: { + submitAndAwait: { + type: 'SubmittedStatus', + }, + }, + }; + const event2 = { + data: { + submitAndAwait: { + type: 'SuccessStatus', + }, + }, + }; + const encoder = new TextEncoder(); + const streamedResponse = new Uint8Array([ + ...encoder.encode(`data:${JSON.stringify(event1)}\n\n`), + ...encoder.encode(`data:${JSON.stringify(event2)}\n\n`), + ]); + return Promise.resolve( + new Response( + new ReadableStream({ + start: (controller) => { + controller.enqueue(streamedResponse); + controller.close(); + }, + }) + ) + ); + }); + + let numberOfEvents = 0; + + for await (const { submitAndAwait } of provider.operations.submitAndAwait({ + encodedTransaction: "it's mocked so doesn't matter", + })) { + numberOfEvents += 1; + + if (numberOfEvents === 1) { + expect(submitAndAwait.type).toEqual('SubmittedStatus'); + } + if (numberOfEvents === 2) { + expect(submitAndAwait.type).toEqual('SuccessStatus'); + } + } + + expect(numberOfEvents).toEqual(2); + }); + it('subscriptions: does not throw when an event is streamed in multiple chunks', async () => { + const provider = await Provider.create(FUEL_NETWORK_URL); + + vi.spyOn(global, 'fetch').mockImplementationOnce(() => { + const responseObject = JSON.stringify({ + data: { + submitAndAwait: { + type: 'SuccessStatus', + }, + }, + }); + + const encoder = new TextEncoder(); + + const chunk1 = encoder.encode(`data:${responseObject.slice(0, 10)}`); + const chunk2 = encoder.encode(`${responseObject.slice(10, 20)}`); + const chunk3 = encoder.encode(`${responseObject.slice(20)}\n\n`); + + return Promise.resolve( + new Response( + new ReadableStream({ + start: (controller) => { + controller.enqueue(chunk1); + controller.enqueue(chunk2); + controller.enqueue(chunk3); + controller.close(); + }, + }) + ) + ); + }); + + for await (const { submitAndAwait } of provider.operations.submitAndAwait({ + encodedTransaction: "it's mocked so doesn't matter", + })) { + expect(submitAndAwait.type).toEqual('SuccessStatus'); + } + }); + + it('subscriptions: does not throw when chunk has a full and partial event in it', async () => { + const provider = await Provider.create(FUEL_NETWORK_URL); + + vi.spyOn(global, 'fetch').mockImplementationOnce(() => { + const event1 = { + data: { + submitAndAwait: { + type: 'SubmittedStatus', + }, + }, + }; + const event2 = JSON.stringify({ + data: { + submitAndAwait: { + type: 'SuccessStatus', + }, + }, + }); + + const encoder = new TextEncoder(); + const chunk1 = new Uint8Array([ + ...encoder.encode(`data:${JSON.stringify(event1)}\n\n`), + ...encoder.encode(`data:${event2.slice(0, 25)}`), + ]); + const chunk2 = encoder.encode(`${event2.slice(25)}\n\n`); + return Promise.resolve( + new Response( + new ReadableStream({ + start: (controller) => { + controller.enqueue(chunk1); + controller.enqueue(chunk2); + controller.close(); + }, + }) + ) + ); + }); + + let numberOfEvents = 0; + + for await (const { submitAndAwait } of provider.operations.submitAndAwait({ + encodedTransaction: "it's mocked so doesn't matter", + })) { + numberOfEvents += 1; + + if (numberOfEvents === 1) { + expect(submitAndAwait.type).toEqual('SubmittedStatus'); + } + if (numberOfEvents === 2) { + expect(submitAndAwait.type).toEqual('SuccessStatus'); + } + } + + expect(numberOfEvents).toEqual(2); + }); + + it('subscriptions: does not throw when multiple chunks contain multiple events with a keep-alive message in-between', async () => { + const provider = await Provider.create(FUEL_NETWORK_URL); + + vi.spyOn(global, 'fetch').mockImplementationOnce(() => { + const event1 = JSON.stringify({ + data: { + submitAndAwait: { + type: 'SubmittedStatus', + }, + }, + }); + const event2 = JSON.stringify({ + data: { + submitAndAwait: { + type: 'SuccessStatus', + }, + }, + }); + + const encoder = new TextEncoder(); + return Promise.resolve( + new Response( + new ReadableStream({ + start: (controller) => { + controller.enqueue(encoder.encode(`data:${event1.slice(0, 25)}`)); + controller.enqueue(encoder.encode(':keep-alive-text\n\n')); + + controller.enqueue( + encoder.encode( + `${event1.slice(25)}\n\ndata:${event2.slice(0, 30)}:keep-alive-text\n\n` + ) + ); + controller.enqueue(encoder.encode(`${event2.slice(30)}\n\n`)); + controller.close(); + }, + }) + ) + ); + }); + + let numberOfEvents = 0; + + for await (const { submitAndAwait } of provider.operations.submitAndAwait({ + encodedTransaction: "it's mocked so doesn't matter", + })) { + numberOfEvents += 1; + + if (numberOfEvents === 1) { + expect(submitAndAwait.type).toEqual('SubmittedStatus'); + } + if (numberOfEvents === 2) { + expect(submitAndAwait.type).toEqual('SuccessStatus'); + } + } + + expect(numberOfEvents).toEqual(2); + }); + + it('subscriptions: throws if the stream data string parsing fails for some reason', async () => { + const provider = await Provider.create(FUEL_NETWORK_URL); + + const badResponse = 'data: {f: {}\n\n'; vi.spyOn(global, 'fetch').mockImplementationOnce(() => { const streamResponse = new TextEncoder().encode(badResponse); return Promise.resolve(