Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/old-buses-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fuel-ts/account": patch
---

feat: generalize subscription event parsing
57 changes: 35 additions & 22 deletions packages/account/src/providers/fuel-graphql-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,53 +34,66 @@ export class FuelGraphqlSubscriber implements AsyncIterator<unknown> {
this.stream = response.body!.getReader();
}

private events: Array<{ data: unknown; errors?: { message: string }[] }> = [];
private parsingLeftover = '';

async next(): Promise<IteratorResult<unknown, unknown>> {
if (!this.stream) {
await this.setStream();
}

// eslint-disable-next-line no-constant-condition
while (true) {
if (this.events.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { data, errors } = this.events.shift()!;
if (Array.isArray(errors)) {
throw new FuelError(
FuelError.CODES.INVALID_REQUEST,
errors.map((err) => err.message).join('\n\n')
);
}
return { value: data, done: false };
}
const { value, done } = await this.stream.read();
if (done) {
return { value, done };
}

const text = FuelGraphqlSubscriber.textDecoder.decode(value);

/**
* We don't care about responses that don't start with 'data:' like keep-alive messages.
* The only responses that I came across from the node are either 200 responses with data or keep-alive messages.
* We don't care about keep-alive messages.
* The only responses that I came across from the node are either 200 responses with "data:.*" or keep-alive messages.
* You can find the keep-alive message in the fuel-core codebase (latest permalink as of writing):
* https://github.com/FuelLabs/fuel-core/blob/e1e631902f762081d2124d9c457ddfe13ac366dc/crates/fuel-core/src/graphql_api/service.rs#L247
* To get the actual latest info you need to check out the master branch (might change):
* https://github.com/FuelLabs/fuel-core/blob/master/crates/fuel-core/src/graphql_api/service.rs#L247
* */
if (!text.startsWith('data:')) {
const decoded = FuelGraphqlSubscriber.textDecoder
.decode(value)
.replace(':keep-alive-text\n\n', '');

if (decoded === '') {
// eslint-disable-next-line no-continue
continue;
}

let data;
let errors;
const text = `${this.parsingLeftover}${decoded}`;
const regex = /data:.*\n\n/g;

try {
({ data, errors } = JSON.parse(text.replace(/^data:/, '')));
} catch (e) {
throw new FuelError(
ErrorCode.STREAM_PARSING_ERROR,
`Error while parsing stream data response: ${text}`
);
}
const matches = [...text.matchAll(regex)].flatMap((match) => match);

if (Array.isArray(errors)) {
throw new FuelError(
FuelError.CODES.INVALID_REQUEST,
errors.map((err) => err.message).join('\n\n')
);
}
matches.forEach((match) => {
try {
this.events.push(JSON.parse(match.replace(/^data:/, '')));
} catch (e) {
throw new FuelError(
ErrorCode.STREAM_PARSING_ERROR,
`Error while parsing stream data response: ${text}`
);
}
});

return { value: data, done: false };
this.parsingLeftover = text.replace(matches.join(), '');
}
}

Expand Down
243 changes: 240 additions & 3 deletions packages/account/src/providers/provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ describe('Provider', () => {
await Promise.all(promises);
});

it('should not throw if the subscription stream data string contains more than one "data:"', async () => {
it('subscriptions: does not throw when stream contains more than one "data:"', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

vi.spyOn(global, 'fetch').mockImplementationOnce(() => {
Expand Down Expand Up @@ -1070,10 +1070,247 @@ describe('Provider', () => {
}
});

it('should throw if the subscription stream data string parsing fails for some reason', async () => {
test('subscriptions: ignores keep-alive messages', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

const badResponse = 'data: whatever';
const fetchSpy = vi.spyOn(global, 'fetch');

const readableStream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();

controller.enqueue(
encoder.encode(`data:${JSON.stringify({ data: { submitAndAwait: { a: 0 } } })}\n\n`)
);
controller.enqueue(encoder.encode(':keep-alive-text\n\n'));
controller.enqueue(
encoder.encode(`data:${JSON.stringify({ data: { submitAndAwait: { a: 1 } } })}\n\n`)
);
controller.close();
},
});
fetchSpy.mockImplementationOnce(() => Promise.resolve(new Response(readableStream)));

let numberOfEvents = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const { submitAndAwait } of provider.operations.submitAndAwait({
encodedTransaction: "it's mocked so doesn't matter",
})) {
numberOfEvents += 1;
}

expect(numberOfEvents).toEqual(2);
});

it('subscriptions: does not throw when stream has two events in the same chunk', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

vi.spyOn(global, 'fetch').mockImplementationOnce(() => {
const event1 = {
data: {
submitAndAwait: {
type: 'SubmittedStatus',
},
},
};
const event2 = {
data: {
submitAndAwait: {
type: 'SuccessStatus',
},
},
};
const encoder = new TextEncoder();
const streamedResponse = new Uint8Array([
...encoder.encode(`data:${JSON.stringify(event1)}\n\n`),
...encoder.encode(`data:${JSON.stringify(event2)}\n\n`),
]);
return Promise.resolve(
new Response(
new ReadableStream({
start: (controller) => {
controller.enqueue(streamedResponse);
controller.close();
},
})
)
);
});

let numberOfEvents = 0;

for await (const { submitAndAwait } of provider.operations.submitAndAwait({
encodedTransaction: "it's mocked so doesn't matter",
})) {
numberOfEvents += 1;

if (numberOfEvents === 1) {
expect(submitAndAwait.type).toEqual('SubmittedStatus');
}
if (numberOfEvents === 2) {
expect(submitAndAwait.type).toEqual('SuccessStatus');
}
}

expect(numberOfEvents).toEqual(2);
});
it('subscriptions: does not throw when an event is streamed in multiple chunks', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

vi.spyOn(global, 'fetch').mockImplementationOnce(() => {
const responseObject = JSON.stringify({
data: {
submitAndAwait: {
type: 'SuccessStatus',
},
},
});

const encoder = new TextEncoder();

const chunk1 = encoder.encode(`data:${responseObject.slice(0, 10)}`);
const chunk2 = encoder.encode(`${responseObject.slice(10, 20)}`);
const chunk3 = encoder.encode(`${responseObject.slice(20)}\n\n`);

return Promise.resolve(
new Response(
new ReadableStream({
start: (controller) => {
controller.enqueue(chunk1);
controller.enqueue(chunk2);
controller.enqueue(chunk3);
controller.close();
},
})
)
);
});

for await (const { submitAndAwait } of provider.operations.submitAndAwait({
encodedTransaction: "it's mocked so doesn't matter",
})) {
expect(submitAndAwait.type).toEqual('SuccessStatus');
}
});

it('subscriptions: does not throw when chunk has a full and partial event in it', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

vi.spyOn(global, 'fetch').mockImplementationOnce(() => {
const event1 = {
data: {
submitAndAwait: {
type: 'SubmittedStatus',
},
},
};
const event2 = JSON.stringify({
data: {
submitAndAwait: {
type: 'SuccessStatus',
},
},
});

const encoder = new TextEncoder();
const chunk1 = new Uint8Array([
...encoder.encode(`data:${JSON.stringify(event1)}\n\n`),
...encoder.encode(`data:${event2.slice(0, 25)}`),
]);
const chunk2 = encoder.encode(`${event2.slice(25)}\n\n`);
return Promise.resolve(
new Response(
new ReadableStream({
start: (controller) => {
controller.enqueue(chunk1);
controller.enqueue(chunk2);
controller.close();
},
})
)
);
});

let numberOfEvents = 0;

for await (const { submitAndAwait } of provider.operations.submitAndAwait({
encodedTransaction: "it's mocked so doesn't matter",
})) {
numberOfEvents += 1;

if (numberOfEvents === 1) {
expect(submitAndAwait.type).toEqual('SubmittedStatus');
}
if (numberOfEvents === 2) {
expect(submitAndAwait.type).toEqual('SuccessStatus');
}
}

expect(numberOfEvents).toEqual(2);
});

it('subscriptions: does not throw when multiple chunks contain multiple events with a keep-alive message in-between', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

vi.spyOn(global, 'fetch').mockImplementationOnce(() => {
const event1 = JSON.stringify({
data: {
submitAndAwait: {
type: 'SubmittedStatus',
},
},
});
const event2 = JSON.stringify({
data: {
submitAndAwait: {
type: 'SuccessStatus',
},
},
});

const encoder = new TextEncoder();
return Promise.resolve(
new Response(
new ReadableStream({
start: (controller) => {
controller.enqueue(encoder.encode(`data:${event1.slice(0, 25)}`));
controller.enqueue(encoder.encode(':keep-alive-text\n\n'));

controller.enqueue(
encoder.encode(
`${event1.slice(25)}\n\ndata:${event2.slice(0, 30)}:keep-alive-text\n\n`
)
);
controller.enqueue(encoder.encode(`${event2.slice(30)}\n\n`));
controller.close();
},
})
)
);
});

let numberOfEvents = 0;

for await (const { submitAndAwait } of provider.operations.submitAndAwait({
encodedTransaction: "it's mocked so doesn't matter",
})) {
numberOfEvents += 1;

if (numberOfEvents === 1) {
expect(submitAndAwait.type).toEqual('SubmittedStatus');
}
if (numberOfEvents === 2) {
expect(submitAndAwait.type).toEqual('SuccessStatus');
}
}

expect(numberOfEvents).toEqual(2);
});

it('subscriptions: throws if the stream data string parsing fails for some reason', async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);

const badResponse = 'data: {f: {}\n\n';
vi.spyOn(global, 'fetch').mockImplementationOnce(() => {
const streamResponse = new TextEncoder().encode(badResponse);
return Promise.resolve(
Expand Down